The second alternative consuming data program-controlled is to use the Kafka Streams API to build a streaming processing application. Follow the steps below to set it up. Use the Kafka library in the application with the adequate configuration to consume the data from the stream as described below.

JAVA Dependencies Information

The Kafka Client Dependency is required to consume from Kafka. Please check your favourite Build Tool documentation on how to add Maven Dependencies to your build.

SCALA Dependencies Information

The Kafka Streams Dependency is required in order to consume from Kafka. Please check your favourite Build Tool documentation on how to add Maven Dependencies to your build.

Examples

The examples provided for reading AVRO topics produce IndexedRecord / GenericRecord as values which are a generic way of reading AVRO in a "map"-like way.

Mapp will also provide soon a Java Library containing POJO classes representing the AVRO Schema. If you are interested on this please contact us.

See section Configuration Properties for the Kafka Clients for how to set up the configurations for the Kafka Client properly.

Java Code Example using the Kafka Streams API when consuming a JSON topic:

package mypackage;
 
import java.util.Properties;
 
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
 
import static java.lang.Thread.*;
 
class MyClass {
 
    public static void main(String[] args) {
        MyKafkaStreamsFactory myKafkaStreamsFactory = new MyKafkaStreamsFactory();
        KafkaStreams kafkaStreams = myKafkaStreamsFactory.getKafkaStreams();
        kafkaStreams.setUncaughtExceptionHandler(myKafkaStreamsFactory.getUncaughtExceptionHandler());
        kafkaStreams.start();
 
        // Close the Stream when necessary
        // kafkaStreams.close()
    }
 
    private static class MyKafkaStreamsFactory {
 
        private String clientId = "myApplicationName";
        private String groupId = "mygroupId";
        private String endpoints = "host:port";
        private String topic = "mytopic";
        private String autoOffsetResetPolicy = "earliest";
        private String streamsNumOfThreads = "3";
        private String securityProtocol = "SASL_SSL";
        private String securitySaslMechanism = "SCRAM-SHA-256";
        private String keySerde = "org.apache.kafka.common.serialization.Serdes$LongSerde";
        private String valueSerde = "org.apache.kafka.common.serialization.Serdes$StringSerde";
        private String deserializationExceptionHandler = LogAndContinueExceptionHandler.class.getCanonicalName();
 
        private KafkaStreams getKafkaStreams() {
 
            StreamsBuilder streamBuilder = new StreamsBuilder();
            KStream<Long, String> stream = streamBuilder.stream(topic, Consumed.with(Serdes.Long(), Serdes.String()));
            stream.foreach((key, value) -> System.out.printf("key = %d, value = %s%n", key, value));
 
            return new KafkaStreams(streamBuilder.build(), getProperties());
        }
 
        private Properties getProperties() {
            Properties props = new Properties();
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetPolicy);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keySerde);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueSerde);
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, clientId);
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, endpoints);
            props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, streamsNumOfThreads);
            props.put(StreamsConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol);
            props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, deserializationExceptionHandler);
            props.put(SaslConfigs.SASL_MECHANISM, securitySaslMechanism);
            return props;
        }
 
        private UncaughtExceptionHandler getUncaughtExceptionHandler() {
            return (thread, exception) -> System.out.println("Exception running the Stream " + exception.getMessage());
        }
 
    }
 
}
JAVA

See section Configuration Properties for the Kafka Clients for how to set up the configurations for the Kafka Client properly.

Java Code Example using the Kafka Streams API when consuming an AVRO topic:

package mypackage;

import java.util.Collections;
import java.util.Map;
import java.util.Properties;

import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.subject.*;
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;

import static java.lang.Thread.*;

class MyClass {

public static void main(String[] args) {
MyKafkaStreamsFactory myKafkaStreamsFactory = new MyKafkaStreamsFactory();
KafkaStreams kafkaStreams = myKafkaStreamsFactory.getKafkaStreams();
kafkaStreams.setUncaughtExceptionHandler(myKafkaStreamsFactory.getUncaughtExceptionHandler());
kafkaStreams.start();

// Close the Stream when necessary
// kafkaStreams.close()
}

private static class MyKafkaStreamsFactory {

private String clientId = "myApplicationName";
private String groupId = "mygroupId";
private String endpoints = "host:port";
private String topic = "myAvroTopic";
private String autoOffsetResetPolicy = "earliest";
private String streamsNumOfThreads = "3";
private String securityProtocol = "SASL_SSL";
private String securitySaslMechanism = "SCRAM-SHA-256";
private String schemaRegistryUrl = "host:port";
private String keySerde = "org.apache.kafka.common.serialization.Serdes$LongSerde";
private String valueSerde = GenericAvroSerde.class.getCanonicalName();
private String deserializationExceptionHandler = LogAndContinueExceptionHandler.class.getCanonicalName();

// If TOPIC is a Root Stream
private String valueSubjectNamingStrategy = RecordNameStrategy.class.getCanonicalName();
// If TOPIC is a Custom Stream
//private String valueSubjectNamingStrategy = TopicNameStrategy.class.getCanonicalName();

private KafkaStreams getKafkaStreams() {
StreamsBuilder streamBuilder = new StreamsBuilder();

Serde<GenericRecord> valueAvroSerde = new GenericAvroSerde();
valueAvroSerde.configure(getSerdeProperties(), false);

KStream<Long, GenericRecord> stream = streamBuilder.stream(topic, Consumed.with(Serdes.Long(), valueAvroSerde));
stream.foreach((key, value) -> System.out.printf("key = %d, value = %s%n", key, value));

return new KafkaStreams(streamBuilder.build(), getProperties());
}

private Properties getProperties() {
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetPolicy);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, clientId);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerde);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerde);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, endpoints);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, streamsNumOfThreads);
props.put(StreamsConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol);
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, deserializationExceptionHandler);
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(KafkaAvroDeserializerConfig.VALUE_SUBJECT_NAME_STRATEGY, valueSubjectNamingStrategy);
props.put(SaslConfigs.SASL_MECHANISM, securitySaslMechanism);
return props;
}

private Map<String, String> getSerdeProperties() {
return Collections.singletonMap(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
}

private UncaughtExceptionHandler getUncaughtExceptionHandler() {
return (thread, exception) -> exception.printStackTrace();
}

}

}

JAVA

See section Configuration Properties for the Kafka Clients for how to set up the configurations for the Kafka Client properly.

Scala Code Example using the Kafka Streams API when consuming a JSON topic:

package mypackage
 
import java.lang.Thread.UncaughtExceptionHandler
import java.util.Properties
 
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
 
import scala.util.control.NonFatal
 
 
object MyClass {
 
  private val clientId = "myApplicationName"
  private val groupId = "mygroupId"
  private val endpoints = "host:port"
  private val topic = "mytopic"
  private val autoOffsetResetPolicy = "earliest"
  private val streamsNumOfThreads = "3"
  private val securityProtocol = "SASL_SSL"
  private val securitySaslMechanism = "SCRAM-SHA-256"
  private val keySerde = "org.apache.kafka.common.serialization.Serdes$LongSerde"
  private val valueSerde = "org.apache.kafka.common.serialization.Serdes$StringSerde"
  private val deserializationExceptionHandler = classOf[LogAndContinueExceptionHandler]
   
  def main(args: Array[String]): Unit = {
    val kafkaStreams = getKafkaStreams
    kafkaStreams.setUncaughtExceptionHandler(getUncaughtExceptionHandler)
    kafkaStreams.start()
 
    // Close the Stream when necessary
    // kafkaStreams.close()
  }
 
  private def getKafkaStreams: KafkaStreams = {
    import org.apache.kafka.streams.scala.ImplicitConversions._
    import org.apache.kafka.streams.scala.Serdes._
 
    val streamBuilder = new StreamsBuilder()
 
    streamBuilder
      .stream[Long, String](topic)
      .foreach {
        case (key, value) =>
          println(s"key = $key, value = $value")
      }
 
    new KafkaStreams(streamBuilder.build(), getProperties)
  }
 
  private def getProperties: Properties = {
    val props = new Properties()
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetPolicy)
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keySerde)
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueSerde)
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, clientId)
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, endpoints)
    props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, streamsNumOfThreads)
    props.put(StreamsConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol)
    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, deserializationExceptionHandler)
    props.put(SaslConfigs.SASL_MECHANISM, securitySaslMechanism)
    props
  }
 
  private val getUncaughtExceptionHandler: UncaughtExceptionHandler = {
    case (_, NonFatal(ex)) =>
      println(s"Exception running the Stream $ex")
      ()
  }
 
}
JAVA

See section Configuration Properties for the Kafka Clients for how to set up the configurations for the Kafka Client properly.

Scala Code Example using the Kafka Streams API when consuming an AVRO topic:

package mypackage
 
import java.lang.Thread.UncaughtExceptionHandler
import java.util.{Collections, Properties}
 
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig
import io.confluent.kafka.serializers.subject._
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
 
import scala.util.control.NonFatal
 
 
object MyClass {
 
  private val clientId = "myApplicationName"
  private val groupId = "mygroupId"
  private val endpoints = "host:port"
  private val topic = "myAvroTopic"
  private val autoOffsetResetPolicy = "earliest"
  private val streamsNumOfThreads = "3"
  private val securityProtocol = "SASL_SSL"
  private val securitySaslMechanism = "SCRAM-SHA-256"
  private val schemaRegistryUrl = "host:port"
  private val keySerde: String = "org.apache.kafka.common.serialization.Serdes$LongSerde"
  private val valueSerde: String = classOf[GenericAvroSerde].getCanonicalName
  private val deserializationExceptionHandler = classOf[LogAndContinueExceptionHandler]
 
  // If TOPIC is a Root Stream
  private val valueSubjectNamingStrategy = classOf[RecordNameStrategy].getCanonicalName
  // If TOPIC is a Custom Stream
  //private val valueSubjectNamingStrategy = classOf[TopicNameStrategy].getCanonicalName
 
  implicit val valueAvroSerde: Serde[GenericRecord] = {
    val gas = new GenericAvroSerde
    gas.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl), false)
    gas
  }
 
  def main(args: Array[String]): Unit = {
    val kafkaStreams = getKafkaStreams
    kafkaStreams.setUncaughtExceptionHandler(getUncaughtExceptionHandler)
    kafkaStreams.start()
 
    // Close the Stream when necessary
    // kafkaStreams.close()
  }
 
  private def getKafkaStreams: KafkaStreams = {
    import org.apache.kafka.streams.scala.ImplicitConversions._
    import org.apache.kafka.streams.scala.Serdes._
 
    val streamBuilder = new StreamsBuilder()
 
    streamBuilder
      .stream[Long, GenericRecord](topic)
      .foreach {
        case (key, value) =>
          println(s"key = $key, value = $value")
      }
 
    new KafkaStreams(streamBuilder.build(), getProperties)
  }
 
  private def getProperties: Properties = {
    val props = new Properties()
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetPolicy)
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, clientId)
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerde)
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerde)
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, endpoints)
    props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, streamsNumOfThreads)
    props.put(StreamsConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol)
    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, deserializationExceptionHandler)
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)
    props.put(AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, valueSubjectNamingStrategy)
    props.put(SaslConfigs.SASL_MECHANISM, securitySaslMechanism)
    props
  }
 
  private val getUncaughtExceptionHandler: UncaughtExceptionHandler = {
    case (_, NonFatal(ex)) =>
      println(s"Exception running the Stream $ex")
      ()
  }
 
}
JAVA

This code prints the messages consumed from the topic for demonstration purposes. To further store or process the data, please set up your application accordingly.