> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Connect to Pulsar cluster using Python client

This document provides examples about how to use the Pulsar Python client to connect to a Pulsar cluster through a token or an OAuth2 credential file.

<Note title="Note">
  * This document assumes that you have created a service account, and have granted the service account `produce` and `consume` permissions to the namespace for the target topic.
  * You need to install Python 3.0 or higher and the Pulsar Python client. Use the following command to install the Python client:

    ```bash theme={null}
    python -m pip install pulsar-client
    ```
</Note>

## Connect to a Pulsar cluster using a token

This section describes how to connect to you Pulsar cluster using a token.

### Prerequisites

* [Get the service URL](/private-cloud/v1/build/connect-prepare#get-the-service-url).
* [Get a service account token](/private-cloud/v1/build/connect-prepare#get-a-service-account-token).

### Steps

To connect to your Pulsar cluster using a token, follow these steps.

1. Connect to the Pulsar cluster.

   ```python theme={null}
   import os
   from pulsar import Client, AuthenticationToken
   client = Client(os.environ.get("SERVICE_URL"), authentication=AuthenticationToken(os.environ.get("AUTH_PARAMS")))

   client.close()
   ```

   * `SERVICE_URL`: the broker service URL of your Pulsar cluster.
   * `AUTH_PARAMS`: the token of your service account.

2. Create a Python consumer and use the Python consumer to consume messages.

   ```python theme={null}
   import os
   from pulsar import Client, AuthenticationToken

   client = Client(os.environ.get("SERVICE_URL"), authentication=AuthenticationToken(os.environ.get("AUTH_PARAMS")))

   consumer = client.subscribe("my-topic", "my-subscription")

   while True:
     msg = consumer.receive()
     try:
       print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
       # Acknowledge successful processing of the message
       consumer.acknowledge(msg)
     except:
       # Message failed to be processed
       consumer.negative_acknowledge(msg)
   ```

3. Create a Python producer and use the Python producer to produce messages.

   ```python theme={null}
   import os
   from pulsar import Client, AuthenticationToken

   client = Client(os.environ.get("SERVICE_URL"), authentication=AuthenticationToken(os.environ.get("AUTH_PARAMS")))

   producer = client.create_producer("my-topic")

   for i in range(10):
     producer.send(("Hello-%d" % i).encode("utf-8"))
     print("send msg "hello-%d"" % i)

   client.close()
   ```

## Connect to a Pulsar cluster using an OAuth2 credential file

To connect to your Pulsar cluster using an OAuth2 credential file, follow these steps.

1. Generate the App credentials by following similar instructions in [configure OAuth2 authentication](/private-cloud/v1/operating-streamnative-platform/security/oauth2-auth).

2. Save the App credentials into an OAuth2 credential file.

3. Expose your authentication credentials.

   ```shell theme={null}
   export AUTH_PARAMS='{
     "issuer_url": "your-issuer-url",
     "private_key": "/absolute/path/to/key/file.json",
     "audience": "your-audience"
   }'
   ```

   * `issuer_url`: the URL of your OAuth2 identity provider.
   * `private_key`: the path to your OAuth2 credential file.
   * `audience`: the `audience` of your Pulsar cluster.

4. Connect to a Pulsar cluster through the OAuth2 credential file.

   ```shell theme={null}
   import os
   from pulsar import Client, AuthenticationOauth2
   client = Client(os.environ.get("your-service-url"), authentication=AuthenticationOauth2(os.environ.get("AUTH_PARAMS")))

   client.close()
   ```

5. Create a Python consumer and use the Python consumer to consume messages.

   ```python theme={null}
   import os
   from pulsar import Client, AuthenticationOauth2

   client = Client(os.environ.get("your-broker-service-url"), authentication=AuthenticationOauth2(os.environ.get("AUTH_PARAMS")))

   consumer = client.subscribe("your-topic", "your-sub-name")

   while True:
     msg = consumer.receive()
     try:
       print("Received message '{}' id='{}'".format(msg.data().decode('utf-8'), msg.message_id()))
       # Acknowledge successful processing of the message
       consumer.acknowledge(msg)
     except:
       # Message failed to be processed
       consumer.negative_acknowledge(msg)
   ```

6. Create a Python producer and use the Python producer to produce messages.

   ```python theme={null}
   import os
   from pulsar import Client, AuthenticationOauth2

   client = Client(os.environ.get("your-service-url"), authentication=AuthenticationOauth2(os.environ.get("AUTH_PARAMS")))

   producer = client.create_producer("your-topic")

   for i in range(10):
     producer.send(("Hello-%d" % i).encode("utf-8"))
     print("send msg \"hello-%d\"" % i)

   client.close()
   ```
