The first alternative to program-controlled consuming data is to use the Plain Consumer API by setting up a Kafka Consumer. Use the Kafka library in the application with the adequate configuration to consume the data from the stream as described below.

Dependencies information

  • The Kafka Client Dependency is required to consume from Kafka. Please check your favourite Build Tool documentation on how to add Maven Dependencies to your build.
    • The Kafka AVRO deserializer/ serializer from Confluent is required when consuming an AVRO topic, io.confluent.kafka-avro-serializer:5.2.2 (see Confluent's Maven Repository: https://packages.confluent.io/maven.)

Mapp will also provide soon a Java Library containing POJO classes representing the AVRO Schema. If you are interested on this please contact us.

Examples

See section Configuration Properties for the Kafka Clients for how to set up the configurations for the Kafka Client properly.

Java Code Example using the Kafka Plain Consumer

package mypackage;
 
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class MyClass {
 
    public static void main(String[] args) {
        MyKafkaConsumerFactory kafkaConsumerFactory = new MyKafkaConsumerFactory();
        KafkaConsumer<byte[], String> consumer = kafkaConsumerFactory.getConsumer();
 
        while (true) {
            ConsumerRecords<byte[], String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<byte[], String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
 
        // Close the consumer when necessary
        // consumer.close();
 
    }
 
    private static class MyKafkaConsumerFactory {
 
        private final String clientId = "myApplicationName";
        private final String groupId = "mygroupId";
        private final String endpoints = "host:port";
        private final String topic = "mytopic";
        private final String autoOffsetResetPolicy = "earliest";
        private final String securityProtocol = "SASL_SSL";
        private final String securitySaslMechanism = "SCRAM-SHA-256";
        private final String keyDeserializer = ByteArrayDeserializer.class.getCanonicalName();
        private final String valueDeserializer = StringDeserializer.class.getCanonicalName();
 
        public KafkaConsumer<byte[], String> getConsumer() {
            KafkaConsumer<byte[], String> consumer = new KafkaConsumer<>(getProperties());
            consumer.subscribe(Collections.singletonList(topic));
            return consumer;
        }
 
        private Properties getProperties() {
            Properties props = new Properties();
            props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoints);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetPolicy);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
            props.put(SaslConfigs.SASL_MECHANISM, securitySaslMechanism);
            return props;
        }
 
    }
 
}

JAVA

The examples provided for reading AVRO topics produce IndexedRecord / GenericRecord as values which are a generic way of reading AVRO in a "map"-like way.

See section Configuration Properties for the Kafka Clients for how to set up the configurations for the Kafka Client properly.

Java Code Example using the Kafka Plain Consumer when consuming an AVRO topic.

package mypackage;

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.subject.*;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class MyClass {

    public static void main(String[] args) {
        MyKafkaConsumerFactory kafkaConsumerFactory = new MyKafkaConsumerFactory();
        KafkaConsumer<byte[], IndexedRecord> consumer = kafkaConsumerFactory.getConsumer();

        while (true) {
            ConsumerRecords<byte[], IndexedRecord> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<byte[], IndexedRecord> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }

        // Close the consumer when necessary
        // consumer.close();

    }

    private static class MyKafkaConsumerFactory {

        private final String clientId = "myApplicationName";
        private final String groupId = "mygroupId";
        private final String endpoints = "host:port";
        private final String topic = "myAvroTopic";
        private final String autoOffsetResetPolicy = "latest";
        private final String securityProtocol = "SASL_SSL";
        private final String securitySaslMechanism = "SCRAM-SHA-256";
        private final String schemaRegistryUrl = "host:port";
        private final String keyDeserializer = ByteArrayDeserializer.class.getCanonicalName();
        private final String valueDeserializer = KafkaAvroDeserializer.class.getCanonicalName();

        // If TOPIC is a Root Stream
        private final String valueSubjectNameStrategy = RecordNameStrategy.class.getCanonicalName();
        // If TOPIC is a Custom Stream
        //private String valueSubjectNameStrategy = TopicNameStrategy.class.getCanonicalName();

        KafkaConsumer<byte[], IndexedRecord> getConsumer() {
            KafkaConsumer<byte[], IndexedRecord> consumer = new KafkaConsumer<>(getProperties());
            consumer.subscribe(Collections.singletonList(topic));
            return consumer;
        }

        private Properties getProperties() {
            Properties props = new Properties();
            props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoints);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetPolicy);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
            props.put(SaslConfigs.SASL_MECHANISM, securitySaslMechanism);
            props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
            props.put(KafkaAvroDeserializerConfig.VALUE_SUBJECT_NAME_STRATEGY, valueSubjectNameStrategy);
            return props;
        }

    }

}

JAVA

See section Configuration Properties for the Kafka Clients for how to set up the configurations for the Kafka Client properly.

Scala Code Example using the Kafka Plain Consumer.

package mypackage

import java.time.Duration
import java.util.Properties

import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}

import scala.jdk.CollectionConverters._

object MyClass {

  private val clientId = "myApplicationName"
  private val groupId = "mygroupId"
  private val endpoints = "host:port"
  private val topic = "mytopic"
  private val autoOffsetResetPolicy = "earliest"
  private val securityProtocol = "SASL_SSL"
  private val securitySaslMechanism = "SCRAM-SHA-256"
  private val keyDeserializer: String = classOf[ByteArrayDeserializer].getCanonicalName
  private val valueDeserializer: String = classOf[StringDeserializer].getCanonicalName

  def main(args: Array[String]): Unit = {
    val consumer = new KafkaConsumer[Array[Byte], String](getProperties)
    consumer.subscribe(List(topic).asJava)

    while (true) {
      val records = consumer.poll(Duration.ofMillis(100))
      records.iterator().forEachRemaining { record =>
        println(s"offset = ${record.offset()}, key = ${record.key()}, value = ${record.value()}")
      }
    }
    // Close the Consumer when necessary
    // consumer.close()
  }

  private val getProperties: Properties = {
    val props = new Properties()
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoints)
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetPolicy)
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer)
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol)
    props.put(SaslConfigs.SASL_MECHANISM, securitySaslMechanism)
    props
  }
}

JAVA

The examples provided for reading AVRO topics produce IndexedRecord / GenericRecord as values which are a generic way of reading AVRO in a "map"-like way.

See section Configuration Properties for the Kafka Clients for how to set up the configurations for the Kafka Client properly.

Scala Code Example using the Kafka Plain Consumer when consuming an AVRO topic.

package mypackage

import java.time.Duration
import java.util.Properties

import io.confluent.kafka.serializers.subject._
import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig, KafkaAvroDeserializer}
import org.apache.avro.generic.IndexedRecord
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.serialization.ByteArrayDeserializer

import scala.jdk.CollectionConverters._

object MyClass {

  private val clientId = "myApplicationName"
  private val groupId = "mygroupId"
  private val endpoints = "host:port"
  private val topic = "mytopic"
  private val autoOffsetResetPolicy = "latest"
  private val securityProtocol = "SASL_SSL"
  private val securitySaslMechanism = "SCRAM-SHA-256"
  private val schemaRegistryUrl = "host:port"
  private val keyDeserializer: String = classOf[ByteArrayDeserializer].getCanonicalName
  private val valueDeserializer: String = classOf[KafkaAvroDeserializer].getCanonicalName

  // If TOPIC is a Root Stream
  private val valueSubjectNameStrategy: String = classOf[RecordNameStrategy].getCanonicalName

  // If TOPIC is a Custom Stream
  //private val valueSubjectNameStrategy: String = classOf[TopicNameStrategy].getCanonicalName

  def main(args: Array[String]): Unit = {
    val consumer = new KafkaConsumer[Array[Byte], IndexedRecord](getProperties)
    consumer.subscribe(List(topic).asJava)

    while (true) {
      val records = consumer.poll(Duration.ofMillis(100))
      records.iterator().forEachRemaining { record =>
        println(s"offset = ${record.offset()}, key = ${record.key()}, value = ${record.value()}")
      }
    }
    // Close the Consumer when necessary
    // consumer.close()
  }

  private val getProperties: Properties = {
    val props = new Properties()
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoints)
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetPolicy)
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer)
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol)
    props.put(SaslConfigs.SASL_MECHANISM, securitySaslMechanism)
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)
    props.put(AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, valueSubjectNameStrategy)
    props
  }

}

JAVA

This code prints the messages consumed from the topic for demonstration purposes. To further store or process the data, please set up your application accordingly.