Documentation Index
Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
Use this file to discover all available pages before exploring further.
This example shows how to use the WebSocket API to connect to a Pulsar cluster and then produce and consume messages to and from the Pulsar cluster.
- This document assumes that you have created a service account, and have granted the service account
produce and consume permissions to the namespace for the target topic.
- To use the WebSocket API to connect to a Pulsar cluster, you need to enable the WebSocket service in advance. For details, see enable WebSocket service.
- If Web Socket Secure (WSS) protocol is used, you should use the port
9443.
Prerequisites
Steps
-
Connect to the Pulsar cluster.
import websocket, base64, json
# TOPIC = 'ws://CLUSTER_HOST:9090/ws/v2/consumer/persistent/public/default/test/sub'
TOPIC = 'wss://CLUSTER_HOST:9443/ws/v2/consumer/persistent/public/default/test/sub'
token = "YOUR_TOKEN"
header = ["Authorization:Bearer" + token]
ws = websocket.create_connection(TOPIC, header=header)
while True:
msg = json.loads(ws.recv())
if not msg: break
print("Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])))
# Acknowledge successful processing
ws.send(json.dumps({'messageId' : msg['messageId']}))
ws.close()
Replace the CLUSTER_HOST parameter with the domain name of the cluster.
-
Create a consumer and use the consumer to consume messages.
import websocket, base64, json
# TOPIC = 'ws://CLUSTER_HOST:9090/ws/v2/consumer/persistent/public/default/test/sub'
TOPIC = 'wss://CLUSTER_HOST:9443/ws/v2/consumer/persistent/public/default/test/sub'
token = "YOUR_TOKEN"
header = ["Authorization:Bearer " + token]
ws = websocket.create_connection(TOPIC, header=header)
while True:
msg = json.loads(ws.recv())
if not msg: break
print("Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])))
# Acknowledge successful processing
ws.send(json.dumps({'messageId' : msg['messageId']}))
ws.close()
-
Create a producer and use the producer to produce messages.
import websocket, base64, json
# TOPIC = 'ws://CLUSTER_HOST:9090/ws/v2/producer/persistent/public/default/test'
TOPIC = 'wss://CLUSTER_HOST:9443/ws/v2/producer/persistent/public/default/test'
token = "YOUR_TOKEN"
header = ["Authorization:Bearer " + token]
ws = websocket.create_connection(TOPIC, header=header)
# Send one message as JSON
ws.send(json.dumps({
'payload' : base64.b64encode('Hello World'),
'properties': {
'key1' : 'VALUE1',
'key2' : 'VALUE2'
},
'context' : 5
}))
response = json.loads(ws.recv())
if response['result'] == 'ok':
print('Message published successfully')
else:
print('Failed to publish message:', response)
ws.close()