1. StreamNative Cloud
  2. Connect

Connect to cluster through Python client

This example describes how to connect to a cluster through a Python client, and use the Python producer and consumer to produce and consume messages to and from a topic. The Python client supports to connect to a Pulsar cluster either through the OAuth2 authentication plugin or Token authentication plugin.

This document assumes that you have created a Pulsar cluster and a service account, and have granted the service account produce and consume permissions to the namespace for the target topic.

Prerequisites

You need to install Python 3.0 or higher and the Pulsar Python client. Use the following command to install the Python client:

python -m pip install pulsar-client

Connect to a cluster through OAuth2 authentication plugin

  1. Get the broker service URL of your Pulsar cluster. For details, see get a service URL.

  2. Get the OAuth2 credential file of your service account. For details, see get an OAuth2 credential file.

  3. Set the environment variables.

    Create the following env.sh file:

    export SERVICE_URL='your_broker_service_url'
    export AUTH_PARAMS='{
      "issuer_url": "https://auth.streamnative.cloud",
      "private_key": "/absolute/path/to/key/file.json",
      "audience": "your_audience"
    }'
    
    • SERVICE_URL: your broker service URL
    • issuer_url: the URL of your OAuth2 authentication provider. You can get the value from your downloaded OAuth2 credential file.
    • private_key: the path to your downloaded OAuth2 credential file.
    • audience: the audience parameter is the Uniform Resource Name (URN), which is a combination of the urn:sn:pulsar, the organization name, and the Pulsar instance name, in this format urn:sn:pulsar:<org_name>:<instance_name>.
  4. Open a terminal and expose the environment variables:

    source env.sh
    
  5. Create a Python consumer and use the Python consumer to consume messages.

    Create and run this Python file:

    import os
    from pulsar import Client, AuthenticationOauth2
    
    client = Client(os.environ.get('SERVICE_URL'), authentication=AuthenticationOauth2(os.environ.get('AUTH_PARAMS')))
    
    consumer = client.subscribe('my-topic', 'my-subscription')
    
    while True:
      msg = consumer.receive()
      try:
        print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
        # Acknowledge successful processing of the message
        consumer.acknowledge(msg)
      except:
        # Message failed to be processed
        consumer.negative_acknowledge(msg)
    
  6. Create a Python producer and use the Python producer to produce messages.

    Open a new terminal. Run source env.sh again. Then, create and run the following Python file:

    import os
    from pulsar import Client, AuthenticationOauth2
    
    client = Client(os.environ.get('SERVICE_URL'), authentication=AuthenticationOauth2(os.environ.get('AUTH_PARAMS')))
    
    producer = client.create_producer('my-topic')
    
    for i in range(10):
      producer.send(('Hello-%d' % i).encode('utf-8'))
      print('send msg "hello-%d"' % i)
    
    client.close()
    

Connect to cluster through Token authentication plugin

  1. Get the service URL of your Pulsar cluster. For details, see get a service URL.

  2. Get the token of your service account. For details, see get a token.

  3. Connect to a Pulsar cluster through the Token authentication plugin.

    import os
    from pulsar import Client, AuthenticationToken
    client = Client(os.environ.get('SERVICE_URL'), authentication=AuthenticationToken(os.environ.get('AUTH_PARAMS')))
    
    client.close()
    
    • SERVICE_URL: the broker service URL of your Pulsar cluster.
    • AUTH_PARAMS: the token of your service account.
  4. Create a Python consumer and use the Python consumer to consume messages.

    import os
    from pulsar import Client, AuthenticationToken
    
    client = Client(os.environ.get('SERVICE_URL'), authentication=AuthenticationToken(os.environ.get('AUTH_PARAMS')))
    
    consumer = client.subscribe('my-topic', 'my-subscription')
    
    while True:
      msg = consumer.receive()
      try:
        print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
        # Acknowledge successful processing of the message
        consumer.acknowledge(msg)
      except:
        # Message failed to be processed
        consumer.negative_acknowledge(msg)
    
  5. Create a Python producer and use the Python producer to produce messages.

    import os
    from pulsar import Client, AuthenticationToken
    
    client = Client(os.environ.get('SERVICE_URL'), authentication=AuthenticationToken(os.environ.get('AUTH_PARAMS')))
    
    producer = client.create_producer('my-topic')
    
    for i in range(10):
      producer.send(('Hello-%d' % i).encode('utf-8'))
      print('send msg "hello-%d"' % i)
    
    client.close()
    

For a complete example about how to connect to a cluster through the Pulsar Python client, see Python client examples.

Previous
Clients - Go