This section provides an example on how to consume a data stream from Kafka using Python. Please note the following:

  1. Set up Python Virtualenv 

    # Creates the Virtual Environment
    python3 -m venv datastreams 
    
    # Change Directory to the Virtual Environment
    cd datastreams 
    
    # Activate the Virtual Environment
    source bin/activate
    PY
  2.  Because the Confluent's Kafka Python Client uses the librdkafka C++ library under the hood you need to install the library on your system. Confluent's Kafka Python Client includes the necessary C++ bindings to make their client to communicate with librdkafka

    # MacOS (using Brew Package Manager)
    brew install librdkafka
     
    # Debian-like GNU/Linux Distributions (Debian, Ubuntu, Mint...)
    apt install librdkafka-dev
     
    # RedHat-like GNU/Linux Distributions (RedHad Linux, CentOS, Fedora...)
    yum install librdkafka-devel python-devel
    
    
    PY
  3. Now install Confluent's Kafka Python Client by upgrading PIP and install Confluent Kafka Python:

    # Upgrade Python Package Manager (Optional)
    pip install --upgrade pip 
    
    # Install Confluent's Kafka Python Client
    pip install confluent-kafka 
    
    # Install Requests Package (needed by the AVRO Client)
    pip install requests 
    
    # Install Avro Package
    pip install avro-python3
    PY


     

The following example consumes from a given JSON topic using Confluent's Kafka Python Client.

from confluent_kafka import Consumer, KafkaError
 
def error_cb(error):
    print(error)
 
c = Consumer({
     'bootstrap.servers': 'host:port',
     'auto.offset.reset': 'earliest',
     'group.id' : 'myGroupId',
     'security.protocol': 'SASL_SSL',
     'sasl.mechanism':   'SCRAM-SHA-256',
     'sasl.username':  'myuser',
     'sasl.password':  'mypassword',
     'error_cb': error_cb
})
 
c.subscribe(['mytopic'])
 
while True:
    msg = c.poll(10.0)
 
    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
 
    print('Received message: {}'.format(msg.value().decode('utf-8')))
 
c.close()
PY



At the moment consuming AVRO with Confluent's Kafka Python Client is not possible since it does not support having simple keys (instead of AVRO keys): https://github.com/confluentinc/confluent-kafka-python/issues/608