- StreamNative Platform
- Connect
Connect to Pulsar cluster using Python client
This document provides examples about how to use the Pulsar Python client to connect to a Pulsar cluster through a token or an OAuth2 credential file.
This document assumes that you have created a Pulsar cluster and a service account, and have granted the service account produce
and consume
permissions to the namespace for the target topic.
Prerequisites
You need to install Python 3.0 or higher and the Pulsar Python client. Use the following command to install the Python client:
python -m pip install pulsar-client
Connect to a Pulsar cluster using a token
To connect to your Pulsar cluster using a token, follow these steps.
Connect to the Pulsar cluster.
import os from pulsar import Client, AuthenticationToken client = Client(os.environ.get("SERVICE_URL"), authentication=AuthenticationToken(os.environ.get("AUTH_PARAMS"))) client.close()
SERVICE_URL
: the broker service URL of your Pulsar cluster.AUTH_PARAMS
: the token of your service account.
Create a Python consumer and use the Python consumer to consume messages.
import os from pulsar import Client, AuthenticationToken client = Client(os.environ.get("SERVICE_URL"), authentication=AuthenticationToken(os.environ.get("AUTH_PARAMS"))) consumer = client.subscribe("my-topic", "my-subscription") while True: msg = consumer.receive() try: print("Received message '{}' id='{}'".format(msg.data(), msg.message_id())) # Acknowledge successful processing of the message consumer.acknowledge(msg) except: # Message failed to be processed consumer.negative_acknowledge(msg)
Create a Python producer and use the Python producer to produce messages.
import os from pulsar import Client, AuthenticationToken client = Client(os.environ.get("SERVICE_URL"), authentication=AuthenticationToken(os.environ.get("AUTH_PARAMS"))) producer = client.create_producer("my-topic") for i in range(10): producer.send(("Hello-%d" % i).encode("utf-8")) print("send msg "hello-%d"" % i) client.close()
Connect to a Pulsar cluster using an OAuth2 credential file
To connect to your Pulsar cluster using an OAuth2 credential file, follow these steps.
Generate the App credentials by following the similar instructions in the prepare to connect to a Pulsar cluster user guide.
Save the App credentials into an OAuth2 credential file.
Expose your authentication credentials.
export AUTH_PARAMS='{ "issuer_url": "your-issuer-url", "private_key": "/absolute/path/to/key/file.json", "audience": "your-audience" }'
issuer_url
: the URL of your OAuth2 identity provider.private_key
: the path to your OAuth2 credential file.audience
: theaudience
of your Pulsar cluster.
Connect to a Pulsar cluster through the OAuth2 credential file.
import os from pulsar import Client, AuthenticationOauth2 client = Client(os.environ.get("your-service-url"), authentication=AuthenticationOauth2(os.environ.get("AUTH_PARAMS"))) client.close()
Create a Python consumer and use the Python consumer to consume messages.
import os from pulsar import Client, AuthenticationOauth2 client = Client(os.environ.get("your-broker-service-url"), authentication=AuthenticationOauth2(os.environ.get("AUTH_PARAMS"))) consumer = client.subscribe("your-topic", "your-sub-name") while True: msg = consumer.receive() try: print("Received message '{}' id='{}'".format(msg.data(), msg.message_id())) # Acknowledge successful processing of the message consumer.acknowledge(msg) except: # Message failed to be processed consumer.negative_acknowledge(msg)
Create a Python producer and use the Python producer to produce messages.
import os from pulsar import Client, AuthenticationOauth2 client = Client(os.environ.get("your-service-url"), authentication=AuthenticationOauth2(os.environ.get("AUTH_PARAMS"))) producer = client.create_producer("your-topic") for i in range(10): producer.send(("Hello-%d" % i).encode("utf-8")) print("send msg "hello-%d"" % i) client.close()