Kafka customizers

SDK provides base interface KafkaCustomizer that solution engineer can provide implementation for and customize Kafka Producer & Kafka Consumer used properties.

kafkaConsumerCustomizer & kafkaProducerCustomizer

  • Implement KafkaCustomizer interface from de.knowis.cards.sdk.domain.config.KafkaCustomizer where:

    1. de.knowis.cards is service project base path and solution acronym.

    2. sdk.domain.config is the configuration package in the sdk that contains KafkaCustomizer interface.

  • Override kafka consumer/producer configuration by implementing that interface.

    1. For kafka consumer customizer create a bean with qualifier "kafkaConsumerCustomizer".

    2. For kafka producer customizer create a bean with qualifier "kafkaProducerCustomizer".

  • KafkaCustomizer interface has only one method getConfig that gets called for each kafka topic, identified by topicAlias parameter. It will also get the KafkaBinding configuration for that topic.

  • Your implementation logic should return Map<String, Object> that contains the kafka configuration that you want to be used for creatng Kafka Producer / Kafka Consumer.

    public interface KafkaCustomizer {

      /**
      *
      * @param topicAlias Alias name of Event Topic
      * @param kafkaBindingConfig for that topic alias
      * @return Customized Configurations to use for that topic alias.
      */
      public Map<String, Object> getConfig(String topicAlias, KafkaBinding kafkaBindingConfig);

    }

Used Kafka properties from Kafka binding secret

These properties apply to both Kafka producer and consumer. Their values come from the messagehub binding secret that is linked with each topic via topic binding configuration.

security.protocol

Value comes from securityProtocol in your kafka binding secret properties, default value is "SASL_SSL".

sasl.mechanism

Value comes from saslMechanism in your kafka binding secret properties, default value is "SCRAM-SHA-512".

ssl.protocol

Value comes from sslProtocol in your kafka binding secret properties, default value is "TLSv1.2".

ssl.enabled.protocols

Value comes from sslEnabledProtocols in your kafka binding secret properties, default value is "TLSv1.2".

ssl.endpoint.identification.algorithm

Value comes from sslIdentificationAlgorithm your kafka binding secret properties, default value is "HTTPS".

sasl.jaas.config

Value is built using template "{} required username="{}" password="{}" where:

  1. First {} is replaced with saslJaasConfigLoginModuleQualifiedName value from kafka binding secret, default value is "org.apache.kafka.common.security.scram.ScramLoginModule"

  2. Second {} is replaced with "user" value from kafka binding secret.

  3. Third {} is replaced with "password" value from kafka binding secret.

bootstrap.servers

Value comes from kafka_brokers_sasl in your kafka binding secret properties.