Working With Cloud Kafka In Python

Mahaboob Basha
4 min readJul 22, 2022

In this article we will see how to setup Kafka producer and consumer in cloud using python.

We will use below website to create Kafka in cloud.

https://customer.cloudkarafka.com/

We need to register in above mentioned and create Kafka cloud instance.

Click on create new instance to create.

Select developer version to create free Kafka instance.

Click on select region to select data center from AWS or GCP

Click on review to details provided.

Click on create instance for creating free Kafka instance in cloud.

Now our instance is created. Click on the test1 to details of Kafka instance and we will use those details to create consumer and producer.

Above we can see hostname , user and password.

Above are the connection details needs to be used in consumer and producer.

In the above screenshot we can see sample examples for our reference.

Depending upon the interest of the developer, you can choose any language which is provided above.

Now we will see examples of Kafka Producer and Kafka Consumer in python.

We need to use “confluent-kafka” library. below is the command for installing it.

pip install confluent-kafka

We will see Kafka Producer example first.

#Importing required packagesimport sys
import os
from confluent_kafka import Producer

We will set up required details to connect Kafka

We will use above mentioned connection details.

We need to create a topic in the Kafka instance.

Default topic will be the automatically and we will create test1 topic.

Below is the code for setting up variables.

#Setting up required variables
CLOUDKARAFKA_BROKERS="moped-01.srvs.cloudkafka.com:9094,moped-02.srvs.cloudkafka.com:9094,
moped-03.srvs.cloudkafka.com:9094"
CLOUDKARAFKA_USERNAME='v47metur'
CLOUDKARAFKA_PASSWORD='xxxxxxx'
CLOUDKARAFKA_PRODUCER_TOPIC=['v47metur-default','v47metur-test1']

Below is the code for setting up config file

#Setting up conif file 

conf = {
'bootstrap.servers': CLOUDKARAFKA_BROKERS,
'group.id': "%s-consumer" % CLOUDKARAFKA_USERNAME,
'session.timeout.ms': 6000,
'default.topic.config': {'auto.offset.reset': 'smallest'},
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'SCRAM-SHA-256',
'sasl.username': CLOUDKARAFKA_USERNAME,
'sasl.password': CLOUDKARAFKA_PASSWORD
}

Below is the code for initializing Kafka Producer

#Intitializing Kafka producerp = Producer(**conf)

Below is the code for creating a function which will return delivery notification of success or failed.

#Function for getting error or success messagedef delivery_callback(err, msg):
if err:
sys.stderr.write('%% Message failed delivery: %s\n' % err)
print('Failed')
else:
sys.stderr.write('%% Message delivered to %s [%d]\n' %
(msg.topic(), msg.partition()))
print('Success')

Below is the code for initializing Kafka Topic.

#Intitialzing topictopic=CLOUDKARAFKA_PRODUCER_TOPIC

Now we will produce messages for each topic. Below is the code

#Writing to topic default
p.produce(topic[0], 'Sample Message In Default Topic!', callback=delivery_callback)
#Writing to topic test1p.produce(topic[1], 'Sample Message in test1 Topic!', callback=delivery_callback)sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))p.flush()

Below are the details for Kafka Consumer and example.

Below is the code for initializing Kafka Consumer

#Intitializing Kafka consumerc = Consumer(**conf)

Below is the code for initializing Kafka topics

#Intitialzing topic
topics = CLOUDKARAFKA_CONSUMER_TOPIC.split(",")

Below is the code for subscribing topics

#Subscribing topicsc.subscribe(topics)

Below is the code for reading all messages which are published in topics

#Running while loop to read all messages from topics which are publishedwhile True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
# Error or event
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
# Error
raise KafkaException(msg.error())
else:
# Proper message
sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
(msg.topic(), msg.partition(), msg.offset(),
str(msg.key())))
print(msg.value())

In this article we have learnt how to create Kafka instance in cloud and use those details to create Kafka Producer and Kafka Consumer in python.

Below are the GitHub link for above provided examples.

https://github.com/bashamsc/kafka_producer_consumer

--

--