- Build Applications
- Pulsar Clients
Connect to your cluster using the Pulsar WebSocket API
This document describes how to connect to a cluster through a WebSocket API, and use the WebSocket producer and consumer to produce and consume messages to and from a topic. The WebSocket API supports connecting to a StreamNative cluster using API Keys authentication.
To use the WebSocket API to connect to a StreamNative cluster, you need to enable the WebSocket service in advance. For details, see enable WebSocket service.
Note
This document assumes that you have created a StreamNative cluster and a service account, and have granted the service account produce
and consume
permissions to the namespace for the target topic.
Prerequisites
Use the pip install websocket-client
command to install all dependencies. For details, see the Pulsar WebSocket documentation.
Connect to your cluster using API keys
To connect a StreamNative cluster using API keys, follow these steps.
Step 1: Get the broker service URL of your cluster
To get the service URL of a Pulsar cluster through the StreamNative Console, follow these steps.
On the left navigation pane, in the Admin area, click Pulsar Clusters.
Select the Details tab, and in the Access Points area, click Copy at the end of the row of the service URL.
Step 2: Create an API key of your service account
Note
Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
You can follow the instructions to create an API key for the service account you choose to use.
Step 3: Connect to your cluster
Create a consumer to consume messages
import websocket, base64, json
# TOPIC = 'ws://CLUSTER_HOST:9090/ws/v2/consumer/persistent/public/default/test/sub'
TOPIC = 'wss://CLUSTER_HOST:443/ws/v2/consumer/persistent/public/default/test/sub'
token = "${apikey}"
header = ["Authorization:Bearer " + token]
ws = websocket.create_connection(TOPIC, header=header)
while True:
msg = json.loads(ws.recv())
if not msg: break
print("Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])))
# Acknowledge successful processing
ws.send(json.dumps({'messageId' : msg['messageId']}))
ws.close()
${apikey}
: an API key of your service account.
Create a producer to produce messages
import websocket, base64, json
# TOPIC = 'ws://CLUSTER_HOST:9090/ws/v2/producer/persistent/public/default/test'
TOPIC = 'wss://CLUSTER_HOST:443/ws/v2/producer/persistent/public/default/test'
token = "${apikey}"
header = ["Authorization:Bearer " + token]
ws = websocket.create_connection(TOPIC, header=header)
# Send one message as JSON
ws.send(json.dumps({
'payload' : base64.b64encode('Hello World'),
'properties': {
'key1' : 'VALUE1',
'key2' : 'VALUE2'
},
'context' : 5
}))
response = json.loads(ws.recv())
if response['result'] == 'ok':
print('Message published successfully')
else:
print('Failed to publish message:', response)
ws.close()
${apikey}
: an API key of your service account.
Note
- Replace the
CLUSTER_HOST
with the domain name of the cluster. To get the domain name of the target cluster, click Manage > Cluster on the StreamNative Cloud Console. - In SN cloud you should use the port
443
.