- StreamNative Cloud
- Connect
Connect to cluster through WebSocket API
This document describes how to connect to a cluster through a WebSocket API, and use the WebSocket producer and consumer to produce and consume messages to and from a topic. The WebSocket API supports connecting to a Pulsar cluster either through the OAuth2 authentication plugin or Token authentication plugin. Before connecting a cluster through the WebSocket API,use the pip install websocket-client
command to install all dependencies. For details, see the Pulsar Websocket documentation.
This document assumes that you have created a Pulsar cluster and a service account, and have granted the service account produce
and consume
permissions to the namespace for the target topic.
Connect to cluster through OAuth2 authentication plugin
Get the service URL of your Pulsar cluster. For details, see get a service URL.
Get the OAuth2 credential file of your service account. For details, see get an OAuth2 credential file.
Expose OAuth2 authentication parameters.
export AUTH_PARAMS='{ "issuer_url": "https://auth.streamnative.cloud", "private_key": "file:///absolute path/to/key/file.json", "audience": "you_audience" }'
issuer_url
: the URL of your OAuth2 authentication provider. You can get the value from your downloaded OAuth2 credential file.private_key
: the path to your downloaded OAuth2 credential file. TheprivateKey
parameter supports the following three pattern formats:file:///path/to/file
file:/path/to/file
data:application/json;base64,<base64-encoded value>
audience
: theaudience
parameter is the Uniform Resource Name (URN), which is a combination of theurn:sn:pulsar
, the organization name, and the Pulsar instance name, in this formaturn:sn:pulsar:<org_name>:<instance_name>
.
Get the Token through OAuth2 key file.
curl --request POST [issuer-url]/oauth/token \ --header 'content-type: application/json' \ß --data '{"type":"SN_SERVICE_ACCOUNT","client_id":"CLIENT_ID","client_secret":"CLIENT_SECRET","client_email":"CLIENT_EMAIL","issuer_url":"ISSUER_URL", "grant_type":"client_credentials", "audience":"AUDIENCE"}' ß
Replace the OAuth2 authentication parameters in this command with the values from your downloaded OAuth2 credential file.
Create a consumer and use the consumer to consume messages.
import websocket, base64, json # TOPIC = 'ws://CLUSTER_HOST:9090/ws/v2/consumer/persistent/public/default/test/sub' TOPIC = 'wss://CLUSTER_HOST:9443/ws/v2/consumer/persistent/public/default/test/sub' token = "YOUR_TOKEN" header = ["Authorization:Bearer " + token] ws = websocket.create_connection(TOPIC, header=header) while True: msg = json.loads(ws.recv()) if not msg: break print("Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload']))) # Acknowledge successful processing ws.send(json.dumps({'messageId' : msg['messageId']})) ws.close()
Create a producer and use the producer to produce messages.
import websocket, base64, json # TOPIC = 'ws://CLUSTER_HOST:9090/ws/v2/producer/persistent/public/default/test' TOPIC = 'wss://CLUSTER_HOST:9443/ws/v2/producer/persistent/public/default/test' token = "YOUR_TOKEN" header = ["Authorization:Bearer " + token] ws = websocket.create_connection(TOPIC, header=header) # Send one message as JSON ws.send(json.dumps({ 'payload' : base64.b64encode('Hello World'), 'properties': { 'key1' : 'VALUE1', 'key2' : 'VALUE2' }, 'context' : 5 })) response = json.loads(ws.recv()) if response['result'] == 'ok': print('Message published successfully') else: print('Failed to publish message:', response) ws.close()
- Replace the
CLUSTER_HOST
with the domain name of the cluster. To get the domain name of the target cluster, click Manage > Cluster on the StreamNative Cloud Console. - If Web Socket Secure (WSS) protocol is used, you should use the port
9443
.
Connect to cluster through Token authentication plugin
To connect a cluster through the WebSocket API, follow these steps.
Get the service URL of your Pulsar cluster. For details, see get a service URL.
Get the token of your service account. For details, see get a token.
Create a consumer and use the consumer to consume messages.
import websocket, base64, json # TOPIC = 'ws://CLUSTER_HOST:9090/ws/v2/consumer/persistent/public/default/test/sub' TOPIC = 'wss://CLUSTER_HOST:9443/ws/v2/consumer/persistent/public/default/test/sub' token = "YOUR_TOKEN" header = ["Authorization:Bearer " + token] ws = websocket.create_connection(TOPIC, header=header) while True: msg = json.loads(ws.recv()) if not msg: break print("Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload']))) # Acknowledge successful processing ws.send(json.dumps({'messageId' : msg['messageId']})) ws.close()
Create a producer and use the producer to produce messages.
import websocket, base64, json # TOPIC = 'ws://CLUSTER_HOST:9090/ws/v2/producer/persistent/public/default/test' TOPIC = 'wss://CLUSTER_HOST:9443/ws/v2/producer/persistent/public/default/test' token = "YOUR_TOKEN" header = ["Authorization:Bearer " + token] ws = websocket.create_connection(TOPIC, header=header) # Send one message as JSON ws.send(json.dumps({ 'payload' : base64.b64encode('Hello World'), 'properties': { 'key1' : 'VALUE1', 'key2' : 'VALUE2' }, 'context' : 5 })) response = json.loads(ws.recv()) if response['result'] == 'ok': print('Message published successfully') else: print('Failed to publish message:', response) ws.close()
- Replace the
CLUSTER_HOST
with the domain name of the cluster. To get the domain name of the target cluster, click Manage > Cluster on the StreamNative Cloud Console. - If Web Socket Secure (WSS) protocol is used, you should use the port
9443
.