Connect to Pulsar cluster using Python client
This document provides examples about how to use the Pulsar Python client to connect to a Pulsar cluster through a token or an OAuth2 credential file.
Note
This document assumes that you have created a service account, and have granted the service account
produce
andconsume
permissions to the namespace for the target topic.You need to install Python 3.0 or higher and the Pulsar Python client. Use the following command to install the Python client:
python -m pip install pulsar-client
Connect to a Pulsar cluster using a token
This section describes how to connect to you Pulsar cluster using a token.
Prerequisites
Steps
To connect to your Pulsar cluster using a token, follow these steps.
Connect to the Pulsar cluster.
import os from pulsar import Client, AuthenticationToken client = Client(os.environ.get("SERVICE_URL"), authentication=AuthenticationToken(os.environ.get("AUTH_PARAMS"))) client.close()
SERVICE_URL
: the broker service URL of your Pulsar cluster.AUTH_PARAMS
: the token of your service account.
Create a Python consumer and use the Python consumer to consume messages.
import os from pulsar import Client, AuthenticationToken client = Client(os.environ.get("SERVICE_URL"), authentication=AuthenticationToken(os.environ.get("AUTH_PARAMS"))) consumer = client.subscribe("my-topic", "my-subscription") while True: msg = consumer.receive() try: print("Received message '{}' id='{}'".format(msg.data(), msg.message_id())) # Acknowledge successful processing of the message consumer.acknowledge(msg) except: # Message failed to be processed consumer.negative_acknowledge(msg)
Create a Python producer and use the Python producer to produce messages.
import os from pulsar import Client, AuthenticationToken client = Client(os.environ.get("SERVICE_URL"), authentication=AuthenticationToken(os.environ.get("AUTH_PARAMS"))) producer = client.create_producer("my-topic") for i in range(10): producer.send(("Hello-%d" % i).encode("utf-8")) print("send msg "hello-%d"" % i) client.close()
Connect to a Pulsar cluster using an OAuth2 credential file
To connect to your Pulsar cluster using an OAuth2 credential file, follow these steps.
Generate the App credentials by following similar instructions in configure OAuth2 authentication.
Save the App credentials into an OAuth2 credential file.
Expose your authentication credentials.
export AUTH_PARAMS='{ "issuer_url": "your-issuer-url", "private_key": "/absolute/path/to/key/file.json", "audience": "your-audience" }'
issuer_url
: the URL of your OAuth2 identity provider.private_key
: the path to your OAuth2 credential file.audience
: theaudience
of your Pulsar cluster.
Connect to a Pulsar cluster through the OAuth2 credential file.
import os from pulsar import Client, AuthenticationOauth2 client = Client(os.environ.get("your-service-url"), authentication=AuthenticationOauth2(os.environ.get("AUTH_PARAMS"))) client.close()
Create a Python consumer and use the Python consumer to consume messages.
import os from pulsar import Client, AuthenticationOauth2 client = Client(os.environ.get("your-broker-service-url"), authentication=AuthenticationOauth2(os.environ.get("AUTH_PARAMS"))) consumer = client.subscribe("your-topic", "your-sub-name") while True: msg = consumer.receive() try: print("Received message '{}' id='{}'".format(msg.data().decode('utf-8'), msg.message_id())) # Acknowledge successful processing of the message consumer.acknowledge(msg) except: # Message failed to be processed consumer.negative_acknowledge(msg)
Create a Python producer and use the Python producer to produce messages.
import os from pulsar import Client, AuthenticationOauth2 client = Client(os.environ.get("your-service-url"), authentication=AuthenticationOauth2(os.environ.get("AUTH_PARAMS"))) producer = client.create_producer("your-topic") for i in range(10): producer.send(("Hello-%d" % i).encode("utf-8")) print("send msg \"hello-%d\"" % i) client.close()