- Build Applications
- Kafka Clients
Connect to your cluster using the Kafka Python client
Note
This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account produce
and consume
permissions to a namespace for the target topic.
This document describes how to connect to your StreamNative cluster using the Kafka Python client using API Keys authentication.
Before you begin
Note
- Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
- The password for different utilities as
kcat
will be equal totoken:<API KEY>
.
You can follow the instructions to create an API key for the service account you choose to use.
Steps
Install the Kafka Python client.
pip install confluent-kafka
Build a Python application to produce and consume messages.
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException # Step 1: replace with your configurations serverUrl = "SERVER-URL" jwtToken = "YOUR-API-KEY" topicName = "test-py-topic" namespace = "public/default" password = "token:" + jwtToken def error_cb(err): print("Client error: {}".format(err)) if err.code() == KafkaError._ALL_BROKERS_DOWN or \ err.code() == KafkaError._AUTHENTICATION: raise KafkaException(err) # Step 2: create a producer to send messages p = Producer({ 'bootstrap.servers': serverUrl, 'sasl.mechanism': 'PLAIN', 'security.protocol': 'SASL_SSL', 'sasl.username': namespace, 'sasl.password': password, }) def acked(err, msg): if err is not None: print('Failed to deliver message: {}'.format(err.str())) else: print('Produced to: {} [{}] @ {}'.format(msg.topic(), msg.partition(), msg.offset())) # send messages p.produce(topicName, value='hello python', callback=acked) p.flush(10) # Step 3: create a consumer to consume messages c = Consumer({ 'bootstrap.servers': serverUrl, 'sasl.mechanism': 'PLAIN', 'security.protocol': 'SASL_SSL', 'sasl.username': namespace, 'sasl.password': password, 'group.id': 'test_group_id', # this will create a new consumer group on each invocation. 'auto.offset.reset': 'earliest', 'error_cb': error_cb, 'isolation.level': 'read_uncommitted', # Note: Ursa does not support read_committed for now }) c.subscribe([topicName]) try: while True: print('polling...') # Wait for message or event/error msg = c.poll(1) if msg is None: continue print('Consumed: {}'.format(msg.value())) break except KeyboardInterrupt: pass finally: c.close()
SERVER-URL
: the Kafka service URL of your StreamNative cluster.YOUR-API-KEY
: an API key of your service account.
Run the Python application and you should see the following output:
Produced to: test-py-topic [0] @ 30 polling... polling... polling... polling... polling... polling... Consumed: b'hello world'