- StreamNative Cloud
- Connect
Connect to cluster through Python client
This example describes how to connect to a cluster through a Python client, and use the Python producer and consumer to produce and consume messages to and from a topic. The Python client supports to connect to a Pulsar cluster either through the OAuth2 authentication plugin or Token authentication plugin.
This document assumes that you have created a Pulsar cluster and a service account, and have granted the service account produce
and consume
permissions to the namespace for the target topic.
Prerequisites
You need to install Python 3.0 or higher and the Pulsar Python client. Use the following command to install the Python client:
python -m pip install pulsar-client
Connect to a cluster through OAuth2 authentication plugin
Get the broker service URL of your Pulsar cluster. For details, see get a service URL.
Get the OAuth2 credential file of your service account. For details, see get an OAuth2 credential file.
Set the environment variables.
Create the following
env.sh
file:export SERVICE_URL='your_broker_service_url' export AUTH_PARAMS='{ "issuer_url": "https://auth.streamnative.cloud", "private_key": "/absolute/path/to/key/file.json", "audience": "your_audience" }'
SERVICE_URL
: your broker service URLissuer_url
: the URL of your OAuth2 authentication provider. You can get the value from your downloaded OAuth2 credential file.private_key
: the path to your downloaded OAuth2 credential file.audience
: theaudience
parameter is the Uniform Resource Name (URN), which is a combination of theurn:sn:pulsar
, the organization name, and the Pulsar instance name, in this formaturn:sn:pulsar:<org_name>:<instance_name>
.
Open a terminal and expose the environment variables:
source env.sh
Create a Python consumer and use the Python consumer to consume messages.
Create and run this Python file:
import os from pulsar import Client, AuthenticationOauth2 client = Client(os.environ.get('SERVICE_URL'), authentication=AuthenticationOauth2(os.environ.get('AUTH_PARAMS'))) consumer = client.subscribe('my-topic', 'my-subscription') while True: msg = consumer.receive() try: print("Received message '{}' id='{}'".format(msg.data(), msg.message_id())) # Acknowledge successful processing of the message consumer.acknowledge(msg) except: # Message failed to be processed consumer.negative_acknowledge(msg)
Create a Python producer and use the Python producer to produce messages.
Open a new terminal. Run
source env.sh
again. Then, create and run the following Python file:import os from pulsar import Client, AuthenticationOauth2 client = Client(os.environ.get('SERVICE_URL'), authentication=AuthenticationOauth2(os.environ.get('AUTH_PARAMS'))) producer = client.create_producer('my-topic') for i in range(10): producer.send(('Hello-%d' % i).encode('utf-8')) print('send msg "hello-%d"' % i) client.close()
Connect to cluster through Token authentication plugin
Get the service URL of your Pulsar cluster. For details, see get a service URL.
Get the token of your service account. For details, see get a token.
Connect to a Pulsar cluster through the Token authentication plugin.
import os from pulsar import Client, AuthenticationToken client = Client(os.environ.get('SERVICE_URL'), authentication=AuthenticationToken(os.environ.get('AUTH_PARAMS'))) client.close()
SERVICE_URL
: the broker service URL of your Pulsar cluster.AUTH_PARAMS
: the token of your service account.
Create a Python consumer and use the Python consumer to consume messages.
import os from pulsar import Client, AuthenticationToken client = Client(os.environ.get('SERVICE_URL'), authentication=AuthenticationToken(os.environ.get('AUTH_PARAMS'))) consumer = client.subscribe('my-topic', 'my-subscription') while True: msg = consumer.receive() try: print("Received message '{}' id='{}'".format(msg.data(), msg.message_id())) # Acknowledge successful processing of the message consumer.acknowledge(msg) except: # Message failed to be processed consumer.negative_acknowledge(msg)
Create a Python producer and use the Python producer to produce messages.
import os from pulsar import Client, AuthenticationToken client = Client(os.environ.get('SERVICE_URL'), authentication=AuthenticationToken(os.environ.get('AUTH_PARAMS'))) producer = client.create_producer('my-topic') for i in range(10): producer.send(('Hello-%d' % i).encode('utf-8')) print('send msg "hello-%d"' % i) client.close()
For a complete example about how to connect to a cluster through the Pulsar Python client, see Python client examples.