Connect to Pulsar cluster using WebSocket API
This example shows how to use the WebSocket API to connect to a Pulsar cluster and then produce and consume messages to and from the Pulsar cluster.
Note
- This document assumes that you have created a service account, and have granted the service account
produce
andconsume
permissions to the namespace for the target topic. - To use the WebSocket API to connect to a Pulsar cluster, you need to enable the WebSocket service in advance. For details, see enable WebSocket service.
- If Web Socket Secure (WSS) protocol is used, you should use the port
9443
.
Prerequisites
Steps
Connect to the Pulsar cluster.
import websocket, base64, json # TOPIC = 'ws://CLUSTER_HOST:9090/ws/v2/consumer/persistent/public/default/test/sub' TOPIC = 'wss://CLUSTER_HOST:9443/ws/v2/consumer/persistent/public/default/test/sub' token = "YOUR_TOKEN" header = ["Authorization:Bearer" + token] ws = websocket.create_connection(TOPIC, header=header) while True: msg = json.loads(ws.recv()) if not msg: break print("Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload']))) # Acknowledge successful processing ws.send(json.dumps({'messageId' : msg['messageId']})) ws.close()
Replace the
CLUSTER_HOST
parameter with the domain name of the cluster.Create a consumer and use the consumer to consume messages.
import websocket, base64, json # TOPIC = 'ws://CLUSTER_HOST:9090/ws/v2/consumer/persistent/public/default/test/sub' TOPIC = 'wss://CLUSTER_HOST:9443/ws/v2/consumer/persistent/public/default/test/sub' token = "YOUR_TOKEN" header = ["Authorization:Bearer " + token] ws = websocket.create_connection(TOPIC, header=header) while True: msg = json.loads(ws.recv()) if not msg: break print("Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload']))) # Acknowledge successful processing ws.send(json.dumps({'messageId' : msg['messageId']})) ws.close()
Create a producer and use the producer to produce messages.
import websocket, base64, json # TOPIC = 'ws://CLUSTER_HOST:9090/ws/v2/producer/persistent/public/default/test' TOPIC = 'wss://CLUSTER_HOST:9443/ws/v2/producer/persistent/public/default/test' token = "YOUR_TOKEN" header = ["Authorization:Bearer " + token] ws = websocket.create_connection(TOPIC, header=header) # Send one message as JSON ws.send(json.dumps({ 'payload' : base64.b64encode('Hello World'), 'properties': { 'key1' : 'VALUE1', 'key2' : 'VALUE2' }, 'context' : 5 })) response = json.loads(ws.recv()) if response['result'] == 'ok': print('Message published successfully') else: print('Failed to publish message:', response) ws.close()