Connect to Pulsar cluster using Python client

This document provides examples about how to use the Pulsar Python client to connect to a Pulsar cluster through a token or an OAuth2 credential file.

Note

  • This document assumes that you have created a service account, and have granted the service account produce and consume permissions to the namespace for the target topic.

  • You need to install Python 3.0 or higher and the Pulsar Python client. Use the following command to install the Python client:

    python -m pip install pulsar-client
    

Connect to a Pulsar cluster using a token

This section describes how to connect to you Pulsar cluster using a token.

Prerequisites

Steps

To connect to your Pulsar cluster using a token, follow these steps.

  1. Connect to the Pulsar cluster.

    import os
    from pulsar import Client, AuthenticationToken
    client = Client(os.environ.get("SERVICE_URL"), authentication=AuthenticationToken(os.environ.get("AUTH_PARAMS")))
    
    client.close()
    
    • SERVICE_URL: the broker service URL of your Pulsar cluster.
    • AUTH_PARAMS: the token of your service account.
  2. Create a Python consumer and use the Python consumer to consume messages.

    import os
    from pulsar import Client, AuthenticationToken
    
    client = Client(os.environ.get("SERVICE_URL"), authentication=AuthenticationToken(os.environ.get("AUTH_PARAMS")))
    
    consumer = client.subscribe("my-topic", "my-subscription")
    
    while True:
      msg = consumer.receive()
      try:
        print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
        # Acknowledge successful processing of the message
        consumer.acknowledge(msg)
      except:
        # Message failed to be processed
        consumer.negative_acknowledge(msg)
    
  3. Create a Python producer and use the Python producer to produce messages.

    import os
    from pulsar import Client, AuthenticationToken
    
    client = Client(os.environ.get("SERVICE_URL"), authentication=AuthenticationToken(os.environ.get("AUTH_PARAMS")))
    
    producer = client.create_producer("my-topic")
    
    for i in range(10):
      producer.send(("Hello-%d" % i).encode("utf-8"))
      print("send msg "hello-%d"" % i)
    
    client.close()
    

Connect to a Pulsar cluster using an OAuth2 credential file

To connect to your Pulsar cluster using an OAuth2 credential file, follow these steps.

  1. Generate the App credentials by following similar instructions in configure OAuth2 authentication.

  2. Save the App credentials into an OAuth2 credential file.

  3. Expose your authentication credentials.

    export AUTH_PARAMS='{
      "issuer_url": "your-issuer-url",
      "private_key": "/absolute/path/to/key/file.json",
      "audience": "your-audience"
    }'
    
    • issuer_url: the URL of your OAuth2 identity provider.
    • private_key: the path to your OAuth2 credential file.
    • audience: the audience of your Pulsar cluster.
  4. Connect to a Pulsar cluster through the OAuth2 credential file.

    import os
    from pulsar import Client, AuthenticationOauth2
    client = Client(os.environ.get("your-service-url"), authentication=AuthenticationOauth2(os.environ.get("AUTH_PARAMS")))
    
    client.close()
    
  5. Create a Python consumer and use the Python consumer to consume messages.

    import os
    from pulsar import Client, AuthenticationOauth2
    
    client = Client(os.environ.get("your-broker-service-url"), authentication=AuthenticationOauth2(os.environ.get("AUTH_PARAMS")))
    
    consumer = client.subscribe("your-topic", "your-sub-name")
    
    while True:
      msg = consumer.receive()
      try:
        print("Received message '{}' id='{}'".format(msg.data().decode('utf-8'), msg.message_id()))
        # Acknowledge successful processing of the message
        consumer.acknowledge(msg)
      except:
        # Message failed to be processed
        consumer.negative_acknowledge(msg)
    
  6. Create a Python producer and use the Python producer to produce messages.

    import os
    from pulsar import Client, AuthenticationOauth2
    
    client = Client(os.environ.get("your-service-url"), authentication=AuthenticationOauth2(os.environ.get("AUTH_PARAMS")))
    
    producer = client.create_producer("your-topic")
    
    for i in range(10):
      producer.send(("Hello-%d" % i).encode("utf-8"))
      print("send msg \"hello-%d\"" % i)
    
    client.close()
    
Previous
Pulsar - C++