Kafka customizers
SDK provides base interface KafkaCustomizer
that solution engineer can provide implementation for and customize Kafka Producer & Kafka Consumer used properties.
kafkaConsumerCustomizer & kafkaProducerCustomizer
Implement
KafkaCustomizer
interface fromde.knowis.cards.sdk.domain.config.KafkaCustomizer
where:de.knowis.cards
is service project base path and solution acronym.sdk.domain.config
is the configuration package in the sdk that containsKafkaCustomizer
interface.
Override kafka consumer/producer configuration by implementing that interface.
For kafka consumer customizer create a bean with qualifier "kafkaConsumerCustomizer".
For kafka producer customizer create a bean with qualifier "kafkaProducerCustomizer".
KafkaCustomizer
interface has only one methodgetConfig
that gets called for each kafka topic, identified bytopicAlias
parameter. It will also get theKafkaBinding
configuration for that topic.Your implementation logic should return
Map<String, Object>
that contains the kafka configuration that you want to be used for creatng Kafka Producer / Kafka Consumer.
public interface KafkaCustomizer {
/**
*
* @param topicAlias Alias name of Event Topic
* @param kafkaBindingConfig for that topic alias
* @return Customized Configurations to use for that topic alias.
*/
public Map<String, Object> getConfig(String topicAlias, KafkaBinding kafkaBindingConfig);
}
Used Kafka properties from Kafka binding secret
These properties apply to both Kafka producer and consumer. Their values come from the messagehub binding secret that is linked with each topic via topic binding configuration.
security.protocol
Value comes from securityProtocol
in your kafka binding secret properties, default value is "SASL_SSL".
sasl.mechanism
Value comes from saslMechanism
in your kafka binding secret properties, default value is "SCRAM-SHA-512".
ssl.protocol
Value comes from sslProtocol
in your kafka binding secret properties, default value is "TLSv1.2".
ssl.enabled.protocols
Value comes from sslEnabledProtocols
in your kafka binding secret properties, default value is "TLSv1.2".
ssl.endpoint.identification.algorithm
Value comes from sslIdentificationAlgorithm
your kafka binding secret properties, default value is "HTTPS".
sasl.jaas.config
Value is built using template "{} required username="{}" password="{}"
where:
First {} is replaced with
saslJaasConfigLoginModuleQualifiedName
value from kafka binding secret, default value is "org.apache.kafka.common.security.scram.ScramLoginModule"Second {} is replaced with "user" value from kafka binding secret.
Third {} is replaced with "password" value from kafka binding secret.
bootstrap.servers
Value comes from kafka_brokers_sasl
in your kafka binding secret properties.