1. StreamNative Cloud
  2. Connect

Connect to cluster through Go client

This example shows how to connect to a cluster through a Go client and use the Go producer and consumer to produce and consume messages to and from a topic. The Go client supports connecting to a Pulsar cluster either through the OAuth2 authentication plugin or the Token authentication plugin.

This document assumes that you have created a Pulsar cluster and a service account, and have granted the service account produce and consume permissions to the namespace for the target topic.

Prerequisites

  • Go 1.11 or higher version
  • Go client 0.1.1+ (without 0.1.1)

If you have not installed Go, install it according to the installation instructions.

Connect to a cluster through the OAuth2 authentication plugin

To connect to a cluster through the OAuth2 authentication plugin, follow these steps.

  1. Get the service URL of your Pulsar cluster. For details, see get a service URL.

  2. Get the OAuth2 authentication parameters. For details, see get an OAuth2 credential file.

  3. Connect to a Pulsar cluster through the OAuth2 authentication plugin.

    func main() {
      oauth := pulsar.NewAuthenticationOAuth2(map[string]string{
        "type":       "client_credentials",
        "issuerUrl":  "https://auth.streamnative.cloud",
        "audience":   "your_audience",
        "privateKey": "file:///absolute path/to/key/file.json",
        "clientId":   "your_client_id",
      })
    
      client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL:            "broker-service-url",
        Authentication: oauth,
      })
      if err != nil {
        log.Fatal(err)
      }
      defer client.Close()
    }
    
    • issuerUrl: the URL of your OAuth2 authentication provider. You can get the value from your downloaded OAuth2 credential file.
    • audience: the audience parameter is the Uniform Resource Name (URN), which is a combination of the urn:sn:pulsar, the organization name, and the Pulsar instance name, in this format urn:sn:pulsar:<org_name>:<instance_name>.
    • privateKey: the path to your downloaded OAuth2 credential file. The privateKey parameter supports the following three pattern formats:
      • file:///path/to/file
      • file:/path/to/file
      • data:application/json;base64,<base64-encoded value>
    • clientID: the client ID of your application. You can get the value from your downloaded OAuth2 credential file.
    • URL: the broker service URL of your Pulsar cluster.
  4. Create a Go consumer and use the Go consumer to consume messages.

    func main() {
      oauth := pulsar.NewAuthenticationOAuth2(map[string]string{
        "type":       "client_credentials",
        "issuerUrl":  "https://auth.streamnative.cloud",
        "audience":   "your_audience",
        "privateKey": "file:///absolute path/to/key/file.json",
        "clientId":   "your_client_id",
      })
      client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL:            "broker-service-url",
        Authentication: oauth,
      })
      if err != nil {
        log.Fatal(err)
      }
    
      consumer, err := client.Subscribe(pulsar.ConsumerOptions{
        Topic:            "topic-1",
        SubscriptionName: "my-sub",
        Type:             pulsar.Shared,
      })
      if err != nil {
        log.Fatal(err)
      }
      defer consumer.Close()
    
      for i := 0; i < 10; i++ {
        msg, err := consumer.Receive(context.Background())
        if err != nil {
          log.Fatal(err)
        }
    
        fmt.Printf("Received message msgId: %v -- content: '%s'\n",
          msg.ID(), string(msg.Payload()))
    
        consumer.Ack(msg)
      }
    
      if err := consumer.Unsubscribe(); err != nil {
        log.Fatal(err)
      }
    }
    
  5. Create a Go producer and use the Go producer to produce messages.

    func main() {
      oauth := pulsar.NewAuthenticationOAuth2(map[string]string{
        "type":       "client_credentials",
        "issuerUrl":  "https://auth.streamnative.cloud",
        "audience":   "your_audience",
        "privateKey": "file:///absolute path/to/key/file.json",
        "clientId":   "your_client_id",
      })
      client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL:            "broker-service-url",
        Authentication: oauth,
      })
      if err != nil {
        log.Fatal(err)
      }
    
      producer, err := client.CreateProducer(pulsar.ProducerOptions{
        Topic: "topic-1",
      })
      if err != nil {
        log.Fatal(err)
      }
      defer producer.Close()
    
      for i := 0; i < 10; i++ {
        if msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
          Payload: []byte(fmt.Sprintf("hello-%d", i)),
        }); err != nil {
          log.Fatal(err)
        } else {
          fmt.Printf("Published message: %v \n", msgId)
        }
      }
    }
    

Connect to a cluster through the Token authentication plugin

To connect to a cluster through the Token authentication plugin, follow these steps.

  1. Get the Pulsar service URLs. For details, see get a service URL.

  2. Get the Token authentication parameters. For details, see get a token.

  3. Connect to a Pulsar cluster through the Token authentication plugin.

    func main() {
      client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL:            SERVICE_URL,
        Authentication: pulsar.NewAuthenticationToken(AUTH_PARAMS),
      })
      if err != nil {
        log.Fatal(err)
      }
      defer client.Close()
    }
    
    • SERVICE_URL: the broker service URL of your Pulsar cluster.
    • AUTH_PARAMS: the token of your service account.

For a complete example about how to connect to a cluster through the Pulsar Go client, see Go client examples.

Previous
Clients - C++