1. Build Applications
  2. Pulsar Clients

Connect to your cluster using the Pulsar WebSocket API

This document describes how to connect to a cluster through a WebSocket API, and use the WebSocket producer and consumer to produce and consume messages to and from a topic. The WebSocket API supports connecting to a StreamNative cluster using API Keys authentication.

To use the WebSocket API to connect to a StreamNative cluster, you need to enable the WebSocket service in advance. For details, see enable WebSocket service.

Note

This document assumes that you have created a StreamNative cluster and a service account, and have granted the service account produce and consume permissions to the namespace for the target topic.

Prerequisites

Use the pip install websocket-client command to install all dependencies. For details, see the Pulsar WebSocket documentation.

Connect to your cluster using API keys

To connect a StreamNative cluster using API keys, follow these steps.

Step 1: Get the broker service URL of your cluster

To get the service URL of a Pulsar cluster through the StreamNative Console, follow these steps.

  1. On the left navigation pane, in the Admin area, click Pulsar Clusters.

  2. Select the Details tab, and in the Access Points area, click Copy at the end of the row of the service URL.

Step 2: Create an API key of your service account

Note

Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.

You can follow the instructions to create an API key for the service account you choose to use.

Step 3: Connect to your cluster

Create a consumer to consume messages

import websocket, base64, json

# TOPIC = 'ws://CLUSTER_HOST:9090/ws/v2/consumer/persistent/public/default/test/sub'
TOPIC = 'wss://CLUSTER_HOST:443/ws/v2/consumer/persistent/public/default/test/sub'
token = "${apikey}"
header = ["Authorization:Bearer " + token]
ws = websocket.create_connection(TOPIC, header=header)
while True:
  msg = json.loads(ws.recv())
  if not msg: break
  print("Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])))
  # Acknowledge successful processing
  ws.send(json.dumps({'messageId' : msg['messageId']}))

ws.close()
  • ${apikey}: an API key of your service account.

Create a producer to produce messages

import websocket, base64, json

# TOPIC = 'ws://CLUSTER_HOST:9090/ws/v2/producer/persistent/public/default/test'
TOPIC = 'wss://CLUSTER_HOST:443/ws/v2/producer/persistent/public/default/test'

token = "${apikey}"

header = ["Authorization:Bearer " + token]

ws = websocket.create_connection(TOPIC, header=header)

# Send one message as JSON
ws.send(json.dumps({
'payload' : base64.b64encode('Hello World'),
'properties': {
  'key1' : 'VALUE1',
  'key2' : 'VALUE2'
},
'context' : 5
}))

response =  json.loads(ws.recv())
if response['result'] == 'ok':
  print('Message published successfully')
else:
  print('Failed to publish message:', response)
ws.close()
  • ${apikey}: an API key of your service account.

Note

  • Replace the CLUSTER_HOST with the domain name of the cluster. To get the domain name of the target cluster, click Manage > Cluster on the StreamNative Cloud Console.
  • In SN cloud you should use the port 443.
Previous
Spring