This document provides examples about how to use the Pulsar Python client to connect to a Pulsar cluster through a token or an OAuth2 credential file.
-
This document assumes that you have created a service account, and have granted the service account produce
and consume
permissions to the namespace for the target topic.
-
You need to install Python 3.0 or higher and the Pulsar Python client. Use the following command to install the Python client:
python -m pip install pulsar-client
Connect to a Pulsar cluster using a token
This section describes how to connect to you Pulsar cluster using a token.
Prerequisites
Steps
To connect to your Pulsar cluster using a token, follow these steps.
-
Connect to the Pulsar cluster.
import os
from pulsar import Client, AuthenticationToken
client = Client(os.environ.get("SERVICE_URL"), authentication=AuthenticationToken(os.environ.get("AUTH_PARAMS")))
client.close()
SERVICE_URL
: the broker service URL of your Pulsar cluster.
AUTH_PARAMS
: the token of your service account.
-
Create a Python consumer and use the Python consumer to consume messages.
import os
from pulsar import Client, AuthenticationToken
client = Client(os.environ.get("SERVICE_URL"), authentication=AuthenticationToken(os.environ.get("AUTH_PARAMS")))
consumer = client.subscribe("my-topic", "my-subscription")
while True:
msg = consumer.receive()
try:
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except:
# Message failed to be processed
consumer.negative_acknowledge(msg)
-
Create a Python producer and use the Python producer to produce messages.
import os
from pulsar import Client, AuthenticationToken
client = Client(os.environ.get("SERVICE_URL"), authentication=AuthenticationToken(os.environ.get("AUTH_PARAMS")))
producer = client.create_producer("my-topic")
for i in range(10):
producer.send(("Hello-%d" % i).encode("utf-8"))
print("send msg "hello-%d"" % i)
client.close()
Connect to a Pulsar cluster using an OAuth2 credential file
To connect to your Pulsar cluster using an OAuth2 credential file, follow these steps.
-
Generate the App credentials by following similar instructions in configure OAuth2 authentication.
-
Save the App credentials into an OAuth2 credential file.
-
Expose your authentication credentials.
export AUTH_PARAMS='{
"issuer_url": "your-issuer-url",
"private_key": "/absolute/path/to/key/file.json",
"audience": "your-audience"
}'
issuer_url
: the URL of your OAuth2 identity provider.
private_key
: the path to your OAuth2 credential file.
audience
: the audience
of your Pulsar cluster.
-
Connect to a Pulsar cluster through the OAuth2 credential file.
import os
from pulsar import Client, AuthenticationOauth2
client = Client(os.environ.get("your-service-url"), authentication=AuthenticationOauth2(os.environ.get("AUTH_PARAMS")))
client.close()
-
Create a Python consumer and use the Python consumer to consume messages.
import os
from pulsar import Client, AuthenticationOauth2
client = Client(os.environ.get("your-broker-service-url"), authentication=AuthenticationOauth2(os.environ.get("AUTH_PARAMS")))
consumer = client.subscribe("your-topic", "your-sub-name")
while True:
msg = consumer.receive()
try:
print("Received message '{}' id='{}'".format(msg.data().decode('utf-8'), msg.message_id()))
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except:
# Message failed to be processed
consumer.negative_acknowledge(msg)
-
Create a Python producer and use the Python producer to produce messages.
import os
from pulsar import Client, AuthenticationOauth2
client = Client(os.environ.get("your-service-url"), authentication=AuthenticationOauth2(os.environ.get("AUTH_PARAMS")))
producer = client.create_producer("your-topic")
for i in range(10):
producer.send(("Hello-%d" % i).encode("utf-8"))
print("send msg \"hello-%d\"" % i)
client.close()