> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Connect to your cluster using the Pulsar WebSocket API

This document describes how to connect to a cluster through a WebSocket API, and use the WebSocket producer and consumer to produce and consume messages to and from a topic. The WebSocket API supports connecting to a StreamNative cluster using [API Keys](#use-apikeys) authentication.

To use the WebSocket API to connect to a StreamNative cluster, you need to enable the WebSocket service in advance. For details, see [enable WebSocket service](/cloud/clusters/manage-clusters/cluster#create-a-cluster).

<Note title="Note">
  This document assumes that you have created a StreamNative cluster and a service account, and have granted the service account `produce` and `consume` permissions to the namespace for the target topic.
</Note>

## Prerequisites

Use the `pip install websocket-client` command to install all dependencies. For details, see the [Pulsar WebSocket documentation](http://pulsar.apache.org/docs/client-libraries-websocket/#python).

## Connect to your cluster using API keys

<span id="use-apikeys" />

To connect a StreamNative cluster using [API keys](/cloud/security/authentication/service-accounts/use-api-keys/api-keys-overview), follow these steps.

### Step 1: Get the broker service URL of your cluster

To get the service URL(s) of a StreamNative cluster, follow these steps.

<Tabs>
  <Tab title="StreamNative Console">
    1. Navigate to the **Cluster Dashboard** page by [switching to the cluster workspace](/cloud/get-started/cloud-console#switch-a-cluster).

    2. On the **Cluster Dashboard** page, click **Details** tab.

    3. You will see the available service URLs in the **Access Points** area.

    4. You can click **Copy** at the end of the row of the service URL that you want to use.
  </Tab>
</Tabs>

### Step 2: Create an API key of your service account

<Note title="Note">
  Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
</Note>

You can follow the instructions to [create an API key](/cloud/security/authentication/service-accounts/use-api-keys/api-keys-overview#using-api-keys-to-connect-to-your-cluster) for the service account you choose to use.

### Step 3: Connect to your cluster

#### Create a consumer to consume messages

```python theme={null}
import websocket, base64, json

# TOPIC = 'ws://CLUSTER_HOST:9090/ws/v2/consumer/persistent/public/default/test/sub'
TOPIC = 'wss://CLUSTER_HOST:443/ws/v2/consumer/persistent/public/default/test/sub'
token = "${apikey}"
header = ["Authorization:Bearer " + token]
ws = websocket.create_connection(TOPIC, header=header)
while True:
  msg = json.loads(ws.recv())
  if not msg: break
  print("Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])))
  # Acknowledge successful processing
  ws.send(json.dumps({'messageId' : msg['messageId']}))

ws.close()
```

* `${apikey}`: an API key of your service account.

#### Create a producer to produce messages

```python theme={null}
import websocket, base64, json

# TOPIC = 'ws://CLUSTER_HOST:9090/ws/v2/producer/persistent/public/default/test'
TOPIC = 'wss://CLUSTER_HOST:443/ws/v2/producer/persistent/public/default/test'

token = "${apikey}"

header = ["Authorization:Bearer " + token]

ws = websocket.create_connection(TOPIC, header=header)

# Send one message as JSON
ws.send(json.dumps({
'payload' : base64.b64encode('Hello World'),
'properties': {
  'key1' : 'VALUE1',
  'key2' : 'VALUE2'
},
'context' : 5
}))

response =  json.loads(ws.recv())
if response['result'] == 'ok':
  print('Message published successfully')
else:
  print('Failed to publish message:', response)
ws.close()
```

* `${apikey}`: an API key of your service account.

<Note title="Note">
  - Replace the `CLUSTER_HOST` with the domain name of the cluster. To get the domain name of the target cluster, click **Manage** > **Cluster** on the StreamNative Cloud Console.
  - In SN cloud you should use the port `443`.
</Note>
