This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account produce
and consume
permissions to a namespace for the target topic.
This document describes how to connect to your StreamNative cluster using the Kafka Python client using API Keys authentication.
Before you begin
- Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
- The password for different utilities as
kcat
will be equal to token:<API KEY>
.
You can follow the instructions to create an API key for the service account you choose to use.
Steps
-
Install the Kafka Python client.
pip install confluent-kafka
-
Build a Python application to produce and consume messages.
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
# Step 1: replace with your configurations
serverUrl = "SERVER-URL"
jwtToken = "YOUR-API-KEY"
topicName = "test-py-topic"
namespace = "public/default"
password = "token:" + jwtToken
def error_cb(err):
print("Client error: {}".format(err))
if err.code() == KafkaError._ALL_BROKERS_DOWN or \
err.code() == KafkaError._AUTHENTICATION:
raise KafkaException(err)
# Step 2: create a producer to send messages
p = Producer({
'bootstrap.servers': serverUrl,
'sasl.mechanism': 'PLAIN',
'security.protocol': 'SASL_SSL',
'sasl.username': namespace,
'sasl.password': password,
})
def acked(err, msg):
if err is not None:
print('Failed to deliver message: {}'.format(err.str()))
else:
print('Produced to: {} [{}] @ {}'.format(msg.topic(), msg.partition(), msg.offset()))
# send messages
p.produce(topicName, value='hello python', callback=acked)
p.flush(10)
# Step 3: create a consumer to consume messages
c = Consumer({
'bootstrap.servers': serverUrl,
'sasl.mechanism': 'PLAIN',
'security.protocol': 'SASL_SSL',
'sasl.username': namespace,
'sasl.password': password,
'group.id': 'test_group_id', # this will create a new consumer group on each invocation.
'auto.offset.reset': 'earliest',
'error_cb': error_cb,
'isolation.level': 'read_uncommitted', # Note: Ursa does not support read_committed for now
})
c.subscribe([topicName])
try:
while True:
print('polling...')
# Wait for message or event/error
msg = c.poll(1)
if msg is None:
continue
print('Consumed: {}'.format(msg.value()))
break
except KeyboardInterrupt:
pass
finally:
c.close()
SERVER-URL
: the Kafka service URL of your StreamNative cluster.
YOUR-API-KEY
: an API key of your service account.
-
Run the Python application and you should see the following output:
Produced to: test-py-topic [0] @ 30
polling...
polling...
polling...
polling...
polling...
polling...
Consumed: b'hello world'