Connect to your cluster using the WebSocket API

This document describes how to connect to a cluster through a WebSocket API, and use the WebSocket producer and consumer to produce and consume messages to and from a topic. The WebSocket API supports connecting to a Pulsar cluster through either OAuth2 or Token authentication.

To use the WebSocket API to connect to a Pulsar cluster, you need to enable the WebSocket service in advance. For details, see enable WebSocket service.

Note

This document assumes that you have created a Pulsar cluster and a service account, and have granted the service account produce and consume permissions to the namespace for the target topic.

Prerequisites

Use the pip install websocket-client command to install all dependencies. For details, see the Pulsar WebSocket documentation.

Connect to cluster through OAuth2 authentication

Step 1: Get the broker service URL of your cluster

To get the service URL of a Pulsar cluster through the StreamNative Console, follow these steps.

  1. On the left navigation pane, in the Admin area, click Pulsar Clusters.

  2. Select the Details tab, and in the Access Points area, click Copy at the end of the row of the service URL.

Step 2: Get the OAuth2 credential file of your service account

To get an OAuth2 credential file of a service account through the StreamNative Console, follow these steps.

  1. On the left navigation pane, click Service Accounts.

  2. In the row of the service account you want to use, in the Key File column, click the Download icon to download the OAuth2 credential file to your local directory.

    The OAuth2 credential file should be something like this:

    {
      "type": "SN_SERVICE_ACCOUNT",
      "client_id": "CLIENT_ID",
      "client_secret": "CLIENT_SECRET",
      "client_email": "[email protected]",
      "issuer_url": "https://auth.streamnative.cloud"
    }
    

Step 3: Connect to your cluster

  1. Expose OAuth2 authentication parameters.

    export AUTH_PARAMS='{
      "issuer_url": "https://auth.streamnative.cloud",
      "private_key": "file:///absolute path/to/key/file.json",
      "audience": "you_audience"
    }'
    
    • issuer_url: the URL of your OAuth2 authentication provider. You can get the value from your downloaded OAuth2 credential file.
    • private_key: the path to your downloaded OAuth2 credential file. The privateKey parameter supports the following three pattern formats:
      • file:///path/to/file
      • file:/path/to/file
      • data:application/json;base64,<base64-encoded value>
    • audience: the audience parameter is the Uniform Resource Name (URN), which is a combination of the urn:sn:pulsar, the organization name, and the Pulsar instance name, in this format urn:sn:pulsar:<org_name>:<instance_name>.
  2. Get the Token through OAuth2 key file.

    curl --request POST [issuer-url]/oauth/token \
      --header 'content-type: application/json' \ß
      --data '{"type":"SN_SERVICE_ACCOUNT","client_id":"CLIENT_ID","client_secret":"CLIENT_SECRET","client_email":"CLIENT_EMAIL","issuer_url":"ISSUER_URL", "grant_type":"client_credentials", "audience":"AUDIENCE"}' ß
    

    Replace the OAuth2 authentication parameters in this command with the values from your downloaded OAuth2 credential file.

  3. Create a consumer and use the consumer to consume messages.

    import websocket, base64, json
    
    # TOPIC = 'ws://CLUSTER_HOST:9090/ws/v2/consumer/persistent/public/default/test/sub'
    TOPIC = 'wss://CLUSTER_HOST:443/ws/v2/consumer/persistent/public/default/test/sub'
    token = "YOUR_TOKEN"
    header = ["Authorization:Bearer " + token]
    ws = websocket.create_connection(TOPIC, header=header)
    while True:
      msg = json.loads(ws.recv())
      if not msg: break
      print("Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])))
      # Acknowledge successful processing
      ws.send(json.dumps({'messageId' : msg['messageId']}))
    
    ws.close()
    
  4. Create a producer and use the producer to produce messages.

    import websocket, base64, json
    
    # TOPIC = 'ws://CLUSTER_HOST:9090/ws/v2/producer/persistent/public/default/test'
    TOPIC = 'wss://CLUSTER_HOST:443/ws/v2/producer/persistent/public/default/test'
    
    token = "YOUR_TOKEN"
    
    header = ["Authorization:Bearer " + token]
    
    ws = websocket.create_connection(TOPIC, header=header)
    
    # Send one message as JSON
    ws.send(json.dumps({
    'payload' : base64.b64encode('Hello World'),
    'properties': {
      'key1' : 'VALUE1',
      'key2' : 'VALUE2'
    },
    'context' : 5
    }))
    
    response =  json.loads(ws.recv())
    if response['result'] == 'ok':
      print('Message published successfully')
    else:
      print('Failed to publish message:', response)
    ws.close()
    

Note

  • Replace the CLUSTER_HOST with the domain name of the cluster. To get the domain name of the target cluster, click Manage > Cluster on the StreamNative Cloud Console.
  • In SN cloud you should use the port 443.

Connect to cluster through Token authentication

To connect a Pulsar cluster through Token authentication, follow these steps.

Step 1: Get the broker service URL of your cluster

To get the service URL of a Pulsar cluster through the StreamNative Console, follow these steps.

  1. On the left navigation pane, in the Admin area, click Pulsar Clusters.

  2. Select the Details tab, and in the Access Points area, click Copy at the end of the row of the service URL.

Step 2: Get the Token of your service account

Note

  • Before getting the token of a service account, verify that the service account is authorized as a superuser or an admin of the tenants and namespaces.
  • A token has a system-defined Time-To-Live (TTL) of 7 days. Before a token expires, ensure that you generate a new token for your service account.

To get a token using the StreamNative Console, follow these steps.

  1. On the left navigation pane, click Service Accounts.

  2. In the row of the service account you want to use, in the Token column, click Generate new token, then click the Copy icon to copy the token to your clipboard.

Step 3: Connect to your cluster

Create a consumer to consume messages

import websocket, base64, json

# TOPIC = 'ws://CLUSTER_HOST:9090/ws/v2/consumer/persistent/public/default/test/sub'
TOPIC = 'wss://CLUSTER_HOST:443/ws/v2/consumer/persistent/public/default/test/sub'
token = "YOUR_TOKEN"
header = ["Authorization:Bearer " + token]
ws = websocket.create_connection(TOPIC, header=header)
while True:
  msg = json.loads(ws.recv())
  if not msg: break
  print("Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])))
  # Acknowledge successful processing
  ws.send(json.dumps({'messageId' : msg['messageId']}))

ws.close()

Create a producer to produce messages

import websocket, base64, json

# TOPIC = 'ws://CLUSTER_HOST:9090/ws/v2/producer/persistent/public/default/test'
TOPIC = 'wss://CLUSTER_HOST:443/ws/v2/producer/persistent/public/default/test'

token = "YOUR_TOKEN"

header = ["Authorization:Bearer " + token]

ws = websocket.create_connection(TOPIC, header=header)

# Send one message as JSON
ws.send(json.dumps({
'payload' : base64.b64encode('Hello World'),
'properties': {
  'key1' : 'VALUE1',
  'key2' : 'VALUE2'
},
'context' : 5
}))

response =  json.loads(ws.recv())
if response['result'] == 'ok':
  print('Message published successfully')
else:
  print('Failed to publish message:', response)
ws.close()

Note

  • Replace the CLUSTER_HOST with the domain name of the cluster. To get the domain name of the target cluster, click Manage > Cluster on the StreamNative Cloud Console.
  • In SN cloud you should use the port 443.
Previous
Spring