Consume Data with Python
This section provides an example on how to consume a data stream from Kafka using Python. Please note the following:
- We assume a UNIX enviroment for this guide.
- We provides examples for Python 3.7 (latest stable Pyhton version).
- We recommend you to upgrade if you are still using Pyhon 2.7 since it will reach EOL in 2020.
- We will use Confluent's Kafka Python Client to consume from Kafka.
Set up Python Virtualenv
# Creates the Virtual Environment python3 -m venv datastreams # Change Directory to the Virtual Environment cd datastreams # Activate the Virtual Environment source bin/activate
PYBecause the Confluent's Kafka Python Client uses the librdkafka C++ library under the hood you need to install the library on your system. Confluent's Kafka Python Client includes the necessary C++ bindings to make their client to communicate with
librdkafka
.# MacOS (using Brew Package Manager) brew install librdkafka # Debian-like GNU/Linux Distributions (Debian, Ubuntu, Mint...) apt install librdkafka-dev # RedHat-like GNU/Linux Distributions (RedHad Linux, CentOS, Fedora...) yum install librdkafka-devel python-devel
PYNow install Confluent's Kafka Python Client by upgrading PIP and install Confluent Kafka Python:
# Upgrade Python Package Manager (Optional) pip install --upgrade pip # Install Confluent's Kafka Python Client pip install confluent-kafka # Install Requests Package (needed by the AVRO Client) pip install requests # Install Avro Package pip install avro-python3
PY
The following example consumes from a given JSON topic using Confluent's Kafka Python Client.
from confluent_kafka import Consumer, KafkaError
def error_cb(error):
print(error)
c = Consumer({
'bootstrap.servers': 'host:port',
'auto.offset.reset': 'earliest',
'group.id' : 'myGroupId',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'SCRAM-SHA-256',
'sasl.username': 'myuser',
'sasl.password': 'mypassword',
'error_cb': error_cb
})
c.subscribe(['mytopic'])
while True:
msg = c.poll(10.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
At the moment consuming AVRO with Confluent's Kafka Python Client is not possible since it does not support having simple keys (instead of AVRO keys): https://github.com/confluentinc/confluent-kafka-python/issues/608