1. Build Applications
  2. Kafka Clients

Connect to your cluster using the Kafka Python client

Note

This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account produce and consume permissions to a namespace for the target topic.

This document describes how to connect to your StreamNative cluster using the Kafka Python client using API Keys authentication.

Before you begin

Note

  • Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
  • The password for different utilities as kcat will be equal to token:<API KEY>.

You can follow the instructions to create an API key for the service account you choose to use.

Steps

  1. Install the Kafka Python client.

    pip install confluent-kafka
    
  2. Build a Python application to produce and consume messages.

    from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
    
    # Step 1: replace with your configurations
    serverUrl = "SERVER-URL"
    jwtToken = "YOUR-API-KEY"
    topicName = "test-py-topic"
    namespace = "public/default"
    password = "token:" + jwtToken
    
    def error_cb(err):
        print("Client error: {}".format(err))
        if err.code() == KafkaError._ALL_BROKERS_DOWN or \
                err.code() == KafkaError._AUTHENTICATION:
            raise KafkaException(err)
    
    
    # Step 2: create a producer to send messages
    p = Producer({
        'bootstrap.servers': serverUrl,
        'sasl.mechanism': 'PLAIN',
        'security.protocol': 'SASL_SSL',
        'sasl.username': namespace,
        'sasl.password': password,
    })
    
    
    def acked(err, msg):
        if err is not None:
            print('Failed to deliver message: {}'.format(err.str()))
        else:
            print('Produced to: {} [{}] @ {}'.format(msg.topic(), msg.partition(), msg.offset()))
    
    
    # send messages
    p.produce(topicName, value='hello python', callback=acked)
    p.flush(10)
    
    # Step 3: create a consumer to consume messages
    c = Consumer({
        'bootstrap.servers': serverUrl,
        'sasl.mechanism': 'PLAIN',
        'security.protocol': 'SASL_SSL',
        'sasl.username': namespace,
        'sasl.password': password,
        'group.id': 'test_group_id',  # this will create a new consumer group on each invocation.
        'auto.offset.reset': 'earliest',
        'error_cb': error_cb,
        'isolation.level': 'read_uncommitted', # Note: Ursa does not support read_committed for now
    })
    
    c.subscribe([topicName])
    
    try:
        while True:
            print('polling...')
            # Wait for message or event/error
            msg = c.poll(1)
            if msg is None:
                continue
            print('Consumed: {}'.format(msg.value()))
            break
    
    except KeyboardInterrupt:
        pass
    
    finally:
        c.close()
    
    • SERVER-URL: the Kafka service URL of your StreamNative cluster.
    • YOUR-API-KEY: an API key of your service account.
  3. Run the Python application and you should see the following output:

    Produced to: test-py-topic [0] @ 30
    polling...
    polling...
    polling...
    polling...
    polling...
    polling...
    Consumed: b'hello world'
    
Previous
Kafka C