> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Connect to your cluster using the Kafka Python client

<Note title="Note">
  This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account `produce` and `consume` permissions to a namespace for the target topic.
</Note>

This document describes how to connect to your StreamNative cluster using the Kafka Python client using [API Keys](/cloud/security/authentication/service-accounts/use-api-keys/api-keys-overview#kafka-clients) authentication.

## Before you begin

<Note title="Note">
  * Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
  * The password for different utilities as `kcat` will be equal to `token:<API KEY>`.
</Note>

You can follow the instructions to [create an API key](/cloud/security/authentication/service-accounts/use-api-keys/api-keys-overview#using-api-keys-to-connect-to-your-cluster) for the service account you choose to use.

## Steps

1. Install the Kafka Python client.

   ```bash theme={null}
   pip install confluent-kafka
   ```

2. Build a Python application to produce and consume messages.

   ```python theme={null}
   from confluent_kafka import Producer, Consumer, KafkaError, KafkaException

   # Step 1: replace with your configurations
   serverUrl = "SERVER-URL"
   jwtToken = "YOUR-API-KEY"
   topicName = "test-py-topic"
   namespace = "public/default"
   password = "token:" + jwtToken

   def error_cb(err):
       print("Client error: {}".format(err))
       if err.code() == KafkaError._ALL_BROKERS_DOWN or \
               err.code() == KafkaError._AUTHENTICATION:
           raise KafkaException(err)


   # Step 2: create a producer to send messages
   p = Producer({
       'bootstrap.servers': serverUrl,
       'sasl.mechanism': 'PLAIN',
       'security.protocol': 'SASL_SSL',
       'sasl.username': namespace,
       'sasl.password': password,
   })


   def acked(err, msg):
       if err is not None:
           print('Failed to deliver message: {}'.format(err.str()))
       else:
           print('Produced to: {} [{}] @ {}'.format(msg.topic(), msg.partition(), msg.offset()))


   # send messages
   p.produce(topicName, value='hello python', callback=acked)
   p.flush(10)

   # Step 3: create a consumer to consume messages
   c = Consumer({
       'bootstrap.servers': serverUrl,
       'sasl.mechanism': 'PLAIN',
       'security.protocol': 'SASL_SSL',
       'sasl.username': namespace,
       'sasl.password': password,
       'group.id': 'test_group_id',  # this will create a new consumer group on each invocation.
       'auto.offset.reset': 'earliest',
       'error_cb': error_cb,
       'isolation.level': 'read_uncommitted', # Note: Ursa does not support read_committed for now
   })

   c.subscribe([topicName])

   try:
       while True:
           print('polling...')
           # Wait for message or event/error
           msg = c.poll(1)
           if msg is None:
               continue
           print('Consumed: {}'.format(msg.value()))
           break

   except KeyboardInterrupt:
       pass

   finally:
       c.close()
   ```

   * `SERVER-URL`: the Kafka service URL of your StreamNative cluster.
   * `YOUR-API-KEY`: an API key of your service account.

3. Run the Python application and you should see the following output:

   ```bash theme={null}
   Produced to: test-py-topic [0] @ 30
   polling...
   polling...
   polling...
   polling...
   polling...
   polling...
   Consumed: b'hello world'
   ```
