- Build Applications
- Kafka Client Guides
Connect to your cluster using the Kafka Python client
Note
This QuickStart assumes that you have created a Pulsar cluster with the Kafka protocol enabled, created a service account, and granted the service account produce
and consume
permissions to a namespace for the target topic.
This document describes how to connect to your Pulsar cluster using the Kafka Python client through Token authentication.
Before you begin
Note
- Before getting the token of a service account, verify that the service account is authorized as a superuser or an admin of the tenants and namespaces.
- A token has a system-defined Time-To-Live (TTL) of 7 days. Before a token expires, ensure that you generate a new token for your service account.
Get the JWT token.
On the left navigation pane, click Service Accounts.
In the row of the service account you want to use, in the Token column, click Generate new token, then click the Copy icon to copy the token to your clipboard.
Get the service URL of your Pulsar cluster.
- On the left navigation pane, in the Admin area, click Pulsar Clusters.
- Select the Overview tab, and in the Access Points area, click Copy at the end of the row of the Kafka Service URL (TCP).
Steps
Install the Kafka Python client.
pip install confluent-kafka
Build a Python application to produce and consume messages.
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException # Step 1: replace with your configurations serverUrl = "SERVER-URL" jwtToken = "YOUR-TOKEN" topicName = "persistent://public/default/connect-test" namespace = "public/default" password = "token:" + jwtToken def error_cb(err): print("Client error: {}".format(err)) if err.code() == KafkaError._ALL_BROKERS_DOWN or \ err.code() == KafkaError._AUTHENTICATION: raise KafkaException(err) # Step 2: create a producer to send messages p = Producer({ 'bootstrap.servers': serverUrl, 'sasl.mechanism': 'PLAIN', 'security.protocol': 'SASL_SSL', 'sasl.username': namespace, 'sasl.password': password, }) def acked(err, msg): if err is not None: print('Failed to deliver message: {}'.format(err.str())) else: print('Produced to: {} [{}] @ {}'.format(msg.topic(), msg.partition(), msg.offset())) # send messages p.produce(topicName, value='hello python', callback=acked) p.flush(10) # Step 3: create a consumer to consume messages c = Consumer({ 'bootstrap.servers': serverUrl, 'sasl.mechanism': 'PLAIN', 'security.protocol': 'SASL_SSL', 'sasl.username': namespace, 'sasl.password': password, 'group.id': 'test_group_id', # this will create a new consumer group on each invocation. 'auto.offset.reset': 'earliest', 'error_cb': error_cb, }) c.subscribe([topicName]) try: while True: print('polling...') # Wait for message or event/error msg = c.poll(1) if msg is None: continue print('Consumed: {}'.format(msg.value())) break except KeyboardInterrupt: pass finally: c.close()
serverUrl
: the Kafka service URL of your Pulsar cluster.jwtToken
: the token of your service account.
Run the Python application and you should see the following output:
Produced to: persistent://public/default/connect-test [0] @ 30 polling... polling... polling... polling... polling... polling... Consumed: b'hello world'