> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Connect to Pulsar cluster using Go client

This document provides examples about how to use the Pulsar Go client to connect to a Pulsar cluster through a token or an OAuth2 credential file.

<Note title="Note">
  This document assumes that you have created a service account, and have granted the service account `produce` and `consume` permissions to the namespace for the target topic.
</Note>

## Connect to a Pulsar cluster using a token

This section describes how to connect to you Pulsar cluster using a token.

### Prerequisites

* [Get the service URL](/private-cloud/v1/build/connect-prepare#get-the-service-url).
* [Get a service account token](/private-cloud/v1/build/connect-prepare#get-a-service-account-token).

### Steps

To connect to your Pulsar cluster using a token, follow these steps.

1. Connect to the Pulsar cluster.

   ```go theme={null}
   package main

   import (
     "log"
     "github.com/apache/pulsar-client-go/pulsar"
   )

   func main() {
     client, err := pulsar.NewClient(pulsar.ClientOptions{
       URL:            "SERVICE_URL",
       Authentication: pulsar.NewAuthenticationToken("AUTH_PARAMS"),
     })
     if err != nil {
       log.Fatal(err)
     }
     defer client.Close()
   }
   ```

   Set the `SERVICE_URL` and `AUTH_PARAMS` parameters based on the descriptions in the [prepare to connect to a Pulsar cluster](/private-cloud/v1/build/connect-prepare) user guide.

2. Create a Go consumer and use the Go consumer to consume messages.

   ```go theme={null}
   func main() {
     client, err := pulsar.NewClient(pulsar.ClientOptions{
       URL:            "SERVICE_URL",
       Authentication: pulsar.NewAuthenticationToken("AUTH_PARAMS"),
     })
     if err != nil {
       log.Fatal(err)
     }
     defer client.Close()


     consumer, err := client.Subscribe(pulsar.ConsumerOptions{
       Topic:            "your-topic",
       SubscriptionName: "your-sub-name",
       Type:             pulsar.Shared,
     })
     if err != nil {
       log.Fatal(err)
     }
     defer consumer.Close()

     for i := 0; i < 10; i++ {
       msg, err := consumer.Receive(context.Background())
       if err != nil {
         log.Fatal(err)
       }

       fmt.Printf("Received message msgId: %v -- content: '%s'\n",
         msg.ID(), string(msg.Payload()))

       consumer.Ack(msg)
     }

     if err := consumer.Unsubscribe(); err != nil {
       log.Fatal(err)
     }
   }
   ```

3. Create a Go producer and use the Go producer to produce messages.

   ```go theme={null}
   func main() {
     client, err := pulsar.NewClient(pulsar.ClientOptions{
       URL:            "SERVICE_URL",
       Authentication: pulsar.NewAuthenticationToken("AUTH_PARAMS"),
     })
     if err != nil {
       log.Fatal(err)
     }
     defer client.Close()

     producer, err := client.CreateProducer(pulsar.ProducerOptions{
       Topic: "your-topic",
     })
     if err != nil {
       log.Fatal(err)
     }
     defer producer.Close()

     for i := 0; i < 10; i++ {
       if msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
         Payload: []byte(fmt.Sprintf("hello-%d", i)),
       }); err != nil {
         log.Fatal(err)
       } else {
         fmt.Printf("Published message: %v \n", msgId)
       }
     }
   }
   ```

## Connect to a Pulsar cluster using an OAuth2 credential file

To connect to your Pulsar cluster using an OAuth2 credential file, follow these steps.

1. Generate the App credentials by following similar instructions in [configure OAuth2 authentication](/private-cloud/v1/operating-streamnative-platform/security/oauth2-auth).

2. Save the App credentials into an OAuth2 credential file.

3. Connect to your Pulsar cluster through the OAuth2 credential file.

   ```go theme={null}
   package main

   import (
     "github.com/apache/pulsar-client-go/pulsar"
     "log"
   )

   func main() {
     oauth := pulsar.NewAuthenticationOAuth2(map[string]string{
       "type":       "client_credentials",
       "issuerUrl":  "your-issuer-url",
       "audience":   "your-audience",
       "privateKey": "file:///path/to/private-key-file.json",
       "clientId":   "your-client-id",
     })
     client, err := pulsar.NewClient(pulsar.ClientOptions{
       URL:            "your-service-url",
       Authentication: oauth,
     })
     if err != nil {
       log.Fatal(err)
     }
     defer client.Close()
   }
   ```

   * `issuerUrl`: the URL of your OAuth2 identity provider.
   * `audience`: the `audience` of your Pulsar cluster.
   * `privateKey`: the path to your OAuth2 credential file.
   * `clientId`: the Pulsar application client ID.
   * `URL`: the URL of your Pulsar cluster.

4. Create a Go consumer and use the Go consumer to consume messages.

   ```go theme={null}
   func main() {
     oauth := pulsar.NewAuthenticationOAuth2(map[string]string{
       "type":       "client_credentials",
       "issuerUrl":  "your-issuer-url",
       "audience":   "your-audience",
       "privateKey": "file:///path/to/private-key-file.json",
       "clientId":   "your-client-id",
     })

     client, err := pulsar.NewClient(pulsar.ClientOptions{
       URL:            "your-service-url",
       Authentication: oauth,
     })
     if err != nil {
       log.Fatal(err)
     }

     consumer, err := client.Subscribe(pulsar.ConsumerOptions{
       Topic:            "your-topic",
       SubscriptionName: "your-sub-name",
       Type:             pulsar.Shared,
     })
     if err != nil {
       log.Fatal(err)
     }
     defer consumer.Close()

     for i := 0; i < 10; i++ {
       msg, err := consumer.Receive(context.Background())
       if err != nil {
         log.Fatal(err)
       }

       fmt.Printf("Received message msgId: %v -- content: '%s'\n",
         msg.ID(), string(msg.Payload()))

       consumer.Ack(msg)
     }

     if err := consumer.Unsubscribe(); err != nil {
       log.Fatal(err)
     }
   }
   ```

5. Create a Go producer and use the Go producer to produce messages.

   ```go theme={null}
   func main() {
     oauth := pulsar.NewAuthenticationOAuth2(map[string]string{
       "type":       "client_credentials",
       "issuerUrl":  "your-issuer-url",
       "audience":   "your-audience",
       "privateKey": "file:///path/to/private-key-file.json",
       "clientId":   "your-client-id",
     })

     client, err := pulsar.NewClient(pulsar.ClientOptions{
       URL:            "your-service-url",
       Authentication: oauth,
     })
     if err != nil {
       log.Fatal(err)
     }

     producer, err := client.CreateProducer(pulsar.ProducerOptions{
       Topic: "your-topic",
     })
     if err != nil {
       log.Fatal(err)
     }
     defer producer.Close()

     for i := 0; i < 10; i++ {
       if msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
         Payload: []byte(fmt.Sprintf("hello-%d", i)),
       }); err != nil {
         log.Fatal(err)
       } else {
         fmt.Printf("Published message: %v \n", msgId)
       }
     }
     }
   }
   ```
