> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Connect to your cluster using the Pulsar Go client

This example shows how to connect to a cluster using a Go client and use the Go producer and consumer to produce and consume messages to and from a topic. The Go client supports connecting to a StreamNative cluster using either [OAuth2](#use-oauth2) or [API Keys](#use-apikeys) authentication.

<Note title="Note">
  This document assumes that you have created a StreamNative cluster and a service account, and have granted the service account `produce` and `consume` permissions to the namespace for the target topic.
</Note>

## Prerequisites

* Go 1.11 or higher version
* Go client 0.1.1+ (without 0.1.1)

For more information, see the [installation instructions](http://golang.org/doc/install).

## Connect to your cluster using API keys

<span id="use-apikeys" />

To connect to a StreamNative cluster using [API keys](/cloud/security/authentication/service-accounts/use-api-keys/api-keys-overview), follow these steps.

### Step 1: Get the broker service URL of your cluster

To get the service URL(s) of a StreamNative cluster, follow these steps.

<Tabs>
  <Tab title="StreamNative Console">
    1. Navigate to the **Cluster Dashboard** page by [switching to the cluster workspace](/cloud/get-started/cloud-console#switch-a-cluster).

    2. On the **Cluster Dashboard** page, click **Details** tab.

    3. You will see the available service URLs in the **Access Points** area.

    4. You can click **Copy** at the end of the row of the service URL that you want to use.
  </Tab>
</Tabs>

### Step 2: Create an API key of your service account

<Note title="Note">
  Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
</Note>

You can follow the instructions to [create an API key](/cloud/security/authentication/service-accounts/use-api-keys/api-keys-overview#using-api-keys-to-connect-to-your-cluster) for the service account you choose to use.

### Step 3: Connect to your cluster

For a complete example of how to connect to a cluster using the Pulsar Go client, see [Go client examples](https://github.com/streamnative/cloud-manager/tree/master/ui/src/data/code/clients/golang).

#### Create a Go consumer to consume messages

You can create and configure a Go consumer to consume messages using Token authentication as follows. For more information about the placeholders in the code sample, see [parameters for Token authentication](/cloud/build/pulsar-clients/cloud-connect-go#parameters-for-token-authentication).

```go theme={null}
package main

import (
	"context"
	"fmt"
	"github.com/apache/pulsar-client-go/pulsar"
	"log"
)

func main() {
    client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL:               "${brokerServiceURL}",
        Authentication:    pulsar.NewAuthenticationToken("${apikey}"),
    })

    if err != nil {
        log.Fatalf("Could not instantiate Pulsar client: %v", err)
    }

    defer client.Close()

    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
        Topic:            "persistent://${tenant}/${namespace}/${topic}",
        SubscriptionName: "${subscription}",
        SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
    })

    if err != nil {
        log.Fatal(err)
    }

    defer consumer.Close()

    for i := 0; i < 10; i++ {
        msg, err := consumer.Receive(context.Background())
        if err != nil {
            log.Fatal(err)
        }

        fmt.Printf("Received message msgId: %v -- content: '%s'\n",
            msg.ID(), string(msg.Payload()))

        consumer.Ack(msg)
    }

    if err := consumer.Unsubscribe(); err != nil {
        log.Fatal(err)
    }
}
```

#### Create a Go producer to produce messages

You can create and configure a Go producer to produce messages using Token authentication as follows. For more information about the placeholders in the code sample, see [parameters for Token authentication](/cloud/build/pulsar-clients/cloud-connect-go#parameters-for-token-authentication).

```go theme={null}
package main

import (
	"context"
	"fmt"
	"github.com/apache/pulsar-client-go/pulsar"
	"log"
)

func main() {
    client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL:               "${brokerServiceURL}",
        Authentication:    pulsar.NewAuthenticationToken("${apikey}"),
    })

    if err != nil {
        log.Fatalf("Could not instantiate Pulsar client: %v", err)
    }

    defer client.Close()

    producer, err := client.CreateProducer(pulsar.ProducerOptions{
        Topic: "persistent://${tenant}/${namespace}/${topic}",
    })

    if err != nil {
        log.Fatal(err)
    }

    defer producer.Close()

    for i := 0; i < 10; i++ {
        if msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
            Payload: []byte(fmt.Sprintf("hello-%d", i)),
        }); err != nil {
            log.Fatal(err)
        } else {
            fmt.Printf("Published message: %v \n", msgId)
        }
    }
}
```

#### Parameters for Token authentication

* `${brokerServiceURL}`: the broker service URL of your StreamNative cluster.
* `${apikey}`: an API key of your service account.
* `${tenant}/${namespace}/${topic}`: the full name of the topic for message production & consumption. It is a combination of the tenant name, the namespace name and the topic name.
* `${subscription}`: the name of the subscription that will determine how messages are delivered.

## Connect to your cluster using OAuth2 authentication

<span id="use-oauth2" />

To connect to a StreamNative cluster through OAuth2 authentication, follow these steps.

### Step 1: Get the broker service URL of your cluster

To get the service URL(s) of a StreamNative cluster, follow these steps.

<Tabs>
  <Tab title="StreamNative Console">
    1. Navigate to the **Cluster Dashboard** page by [switching to the cluster workspace](/cloud/get-started/cloud-console#switch-a-cluster).

    2. On the **Cluster Dashboard** page, click **Details** tab.

    3. You will see the available service URLs in the **Access Points** area.

    4. You can click **Copy** at the end of the row of the service URL that you want to use.
  </Tab>
</Tabs>

### Step 2: Get the OAuth2 credential file of your service account

To get an OAuth2 credential file of a service account through the StreamNative Console, follow these steps.

1. On the left navigation pane, click **Service Accounts**.

2. In the row of the service account you want to use, in the **Key File** column, click the **Download** icon to download the OAuth2 credential file to your local directory.

   The OAuth2 credential file should be something like this:

   ```json theme={null}
   {
     "type": "SN_SERVICE_ACCOUNT",
     "client_id": "CLIENT_ID",
     "client_secret": "CLIENT_SECRET",
     "client_email": "test@auth.streamnative.cloud",
     "issuer_url": "https://auth.streamnative.cloud"
   }
   ```

### Step 3: Connect to your cluster

For a complete example of how to connect to a cluster using the Pulsar Go client, see [Go client examples](https://github.com/streamnative/cloud-manager/tree/master/ui/src/data/code/clients/golang).

#### Create a Go consumer to consume messages

You can create and configure a Go consumer to consume messages using the OAuth2 credential file as follows. For more information about the placeholders in the code sample, see [parameters for OAuth2 authentication](/cloud/build/pulsar-clients/cloud-connect-go#parameters-for-oauth2-authentication).

```go theme={null}
package main

import (
	"context"
	"fmt"
	"github.com/apache/pulsar-client-go/pulsar"
	"log"
)

func main() {
    oauth := pulsar.NewAuthenticationOAuth2(map[string]string{
        "type":       "client_credentials",
        "issuerUrl":  "https://auth.streamnative.cloud/",
        "audience":   "urn:sn:pulsar:${orgName}:${instanceName}",
        "privateKey": "file:///YOUR-KEY-FILE-PATH",  // Absolute path of your downloaded key file
    })

    client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL:               "${brokerServiceURL}",
        Authentication:    oauth,
    })

    if err != nil {
        log.Fatalf("Could not instantiate Pulsar client: %v", err)
    }

    defer client.Close()

    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
        Topic:            "persistent://${tenant}/${namespace}/${topic}",
        SubscriptionName: "${subscription}",
        SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
    })

    if err != nil {
        log.Fatal(err)
    }

    defer consumer.Close()

    for i := 0; i < 10; i++ {
        msg, err := consumer.Receive(context.Background())
        if err != nil {
            log.Fatal(err)
        }

        fmt.Printf("Received message msgId: %v -- content: '%s'\n",
            msg.ID(), string(msg.Payload()))

        consumer.Ack(msg)
    }

    if err := consumer.Unsubscribe(); err != nil {
        log.Fatal(err)
    }
}
```

#### Create a Go producer to produce messages

You can create and configure a Go producer to produce messages using the OAuth2 credential file as follows. For more information about the placeholders in the code sample, see [parameters for OAuth2 authentication](/cloud/build/pulsar-clients/cloud-connect-go#parameters-for-oauth2-authentication).

```go theme={null}
package main

import (
	"context"
	"fmt"
	"github.com/apache/pulsar-client-go/pulsar"
	"log"
)

func main() {
    oauth := pulsar.NewAuthenticationOAuth2(map[string]string{
        "type":       "client_credentials",
        "issuerUrl":  "https://auth.streamnative.cloud/",
        "audience":   "urn:sn:pulsar:${orgName}:${instanceName}",
        "privateKey": "file:///YOUR-KEY-FILE-PATH",  // Absolute path of your downloaded key file
    })

    client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL:               "${brokerServiceURL}",
        Authentication:    oauth,
    })

    if err != nil {
        log.Fatalf("Could not instantiate Pulsar client: %v", err)
    }

    defer client.Close()

    producer, err := client.CreateProducer(pulsar.ProducerOptions{
        Topic: "persistent://${tenant}/${namespace}/${topic}",
    })

    if err != nil {
        log.Fatal(err)
    }

    defer producer.Close()

    for i := 0; i < 10; i++ {
        if msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
            Payload: []byte(fmt.Sprintf("hello-%d", i)),
        }); err != nil {
            log.Fatal(err)
        } else {
            fmt.Printf("Published message: %v \n", msgId)
        }
    }
}
```

#### Parameters for OAuth2 authentication

* `privateKey`: your downloaded OAuth2 credential. This parameter supports the following two pattern formats:
  * `file:///path/to/file`: the path to your downloaded OAuth2 credential file.
  * `data:application/json;base64,<base64-encoded value>`: the credential file content encoded into Base64 format.
* `audience`: the [Uniform Resource Name (URN)](/cloud/references/glossary#urn), which is a combination of the `urn:sn:pulsar`, your organization name, and your Pulsar instance name.
  * `${orgName}`: the name of your [organization](/cloud/references/glossary#organization).
  * `${instanceName}`: the name of your [instance](/cloud/references/glossary#instance).
* `${brokerServiceURL}`: the broker service URL of your StreamNative cluster.
* `${tenant}/${namespace}/${topic}`: the full name of the topic for message production & consumption. It is a combination of the tenant name, the namespace name and the topic name.
* `${subscription}`: the name of the subscription that will determine how messages are delivered.
