> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Connect to Pulsar cluster using WebSocket API

This example shows how to use the WebSocket API to connect to a Pulsar cluster and then produce and consume messages to and from the Pulsar cluster.

<Note title="Note">
  * This document assumes that you have created a service account, and have granted the service account `produce` and `consume` permissions to the namespace for the target topic.
  * To use the WebSocket API to connect to a Pulsar cluster, you need to enable the WebSocket service in advance. For details, see [enable WebSocket service](/private-cloud/v1/operating-streamnative-platform/protocols/configure-websocket).
  * If Web Socket Secure (WSS) protocol is used, you should use the port `9443`.
</Note>

### Prerequisites

* [Get the service URL](/private-cloud/v1/build/connect-prepare#get-the-service-url).
* [Get a service account token](/private-cloud/v1/build/connect-prepare#get-a-service-account-token).

### Steps

1. Connect to the Pulsar cluster.

   ```python theme={null}
   import websocket, base64, json

   # TOPIC = 'ws://CLUSTER_HOST:9090/ws/v2/consumer/persistent/public/default/test/sub'
   TOPIC = 'wss://CLUSTER_HOST:9443/ws/v2/consumer/persistent/public/default/test/sub'
   token = "YOUR_TOKEN"
   header = ["Authorization:Bearer" + token]
   ws = websocket.create_connection(TOPIC, header=header)
   while True:
     msg = json.loads(ws.recv())
     if not msg: break
     print("Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])))
     # Acknowledge successful processing
     ws.send(json.dumps({'messageId' : msg['messageId']}))

   ws.close()
   ```

   Replace the `CLUSTER_HOST` parameter with the domain name of the cluster.

2. Create a consumer and use the consumer to consume messages.

   ```python theme={null}
   import websocket, base64, json

   # TOPIC = 'ws://CLUSTER_HOST:9090/ws/v2/consumer/persistent/public/default/test/sub'
   TOPIC = 'wss://CLUSTER_HOST:9443/ws/v2/consumer/persistent/public/default/test/sub'
   token = "YOUR_TOKEN"
   header = ["Authorization:Bearer " + token]
   ws = websocket.create_connection(TOPIC, header=header)
   while True:
     msg = json.loads(ws.recv())
     if not msg: break
     print("Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])))
     # Acknowledge successful processing
     ws.send(json.dumps({'messageId' : msg['messageId']}))

   ws.close()
   ```

3. Create a producer and use the producer to produce messages.

   ```python theme={null}
   import websocket, base64, json

   # TOPIC = 'ws://CLUSTER_HOST:9090/ws/v2/producer/persistent/public/default/test'
   TOPIC = 'wss://CLUSTER_HOST:9443/ws/v2/producer/persistent/public/default/test'

   token = "YOUR_TOKEN"

   header = ["Authorization:Bearer " + token]

   ws = websocket.create_connection(TOPIC, header=header)

   # Send one message as JSON
   ws.send(json.dumps({
   'payload' : base64.b64encode('Hello World'),
   'properties': {
     'key1' : 'VALUE1',
     'key2' : 'VALUE2'
   },
   'context' : 5
   }))

   response =  json.loads(ws.recv())
   if response['result'] == 'ok':
     print('Message published successfully')
   else:
     print('Failed to publish message:', response)
   ws.close()
   ```
