Connect to Pulsar cluster using Go client

This document provides examples about how to use the Pulsar Go client to connect to a Pulsar cluster through a token or an OAuth2 credential file.

Note

This document assumes that you have created a service account, and have granted the service account produce and consume permissions to the namespace for the target topic.

Connect to a Pulsar cluster using a token

This section describes how to connect to you Pulsar cluster using a token.

Prerequisites

Steps

To connect to your Pulsar cluster using a token, follow these steps.

  1. Connect to the Pulsar cluster.

    package main
    
    import (
      "log"
      "github.com/apache/pulsar-client-go/pulsar"
    )
    
    func main() {
      client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL:            "SERVICE_URL",
        Authentication: pulsar.NewAuthenticationToken("AUTH_PARAMS"),
      })
      if err != nil {
        log.Fatal(err)
      }
      defer client.Close()
    }
    

    Set the SERVICE_URL and AUTH_PARAMS parameters based on the descriptions in the prepare to connect to a Pulsar cluster user guide.

  2. Create a Go consumer and use the Go consumer to consume messages.

    func main() {
      client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL:            "SERVICE_URL",
        Authentication: pulsar.NewAuthenticationToken("AUTH_PARAMS"),
      })
      if err != nil {
        log.Fatal(err)
      }
      defer client.Close()
    
    
      consumer, err := client.Subscribe(pulsar.ConsumerOptions{
        Topic:            "your-topic",
        SubscriptionName: "your-sub-name",
        Type:             pulsar.Shared,
      })
      if err != nil {
        log.Fatal(err)
      }
      defer consumer.Close()
    
      for i := 0; i < 10; i++ {
        msg, err := consumer.Receive(context.Background())
        if err != nil {
          log.Fatal(err)
        }
    
        fmt.Printf("Received message msgId: %v -- content: '%s'\n",
          msg.ID(), string(msg.Payload()))
    
        consumer.Ack(msg)
      }
    
      if err := consumer.Unsubscribe(); err != nil {
        log.Fatal(err)
      }
    }
    
  3. Create a Go producer and use the Go producer to produce messages.

    func main() {
      client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL:            "SERVICE_URL",
        Authentication: pulsar.NewAuthenticationToken("AUTH_PARAMS"),
      })
      if err != nil {
        log.Fatal(err)
      }
      defer client.Close()
    
      producer, err := client.CreateProducer(pulsar.ProducerOptions{
        Topic: "your-topic",
      })
      if err != nil {
        log.Fatal(err)
      }
      defer producer.Close()
    
      for i := 0; i < 10; i++ {
        if msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
          Payload: []byte(fmt.Sprintf("hello-%d", i)),
        }); err != nil {
          log.Fatal(err)
        } else {
          fmt.Printf("Published message: %v \n", msgId)
        }
      }
    }
    

Connect to a Pulsar cluster using an OAuth2 credential file

To connect to your Pulsar cluster using an OAuth2 credential file, follow these steps.

  1. Generate the App credentials by following similar instructions in configure OAuth2 authentication.

  2. Save the App credentials into an OAuth2 credential file.

  3. Connect to your Pulsar cluster through the OAuth2 credential file.

    package main
    
    import (
      "github.com/apache/pulsar-client-go/pulsar"
      "log"
    )
    
    func main() {
      oauth := pulsar.NewAuthenticationOAuth2(map[string]string{
        "type":       "client_credentials",
        "issuerUrl":  "your-issuer-url",
        "audience":   "your-audience",
        "privateKey": "file:///path/to/private-key-file.json",
        "clientId":   "your-client-id",
      })
      client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL:            "your-service-url",
        Authentication: oauth,
      })
      if err != nil {
        log.Fatal(err)
      }
      defer client.Close()
    }
    
    • issuerUrl: the URL of your OAuth2 identity provider.
    • audience: the audience of your Pulsar cluster.
    • privateKey: the path to your OAuth2 credential file.
    • clientId: the Pulsar application client ID.
    • URL: the URL of your Pulsar cluster.
  4. Create a Go consumer and use the Go consumer to consume messages.

    func main() {
      oauth := pulsar.NewAuthenticationOAuth2(map[string]string{
        "type":       "client_credentials",
        "issuerUrl":  "your-issuer-url",
        "audience":   "your-audience",
        "privateKey": "file:///path/to/private-key-file.json",
        "clientId":   "your-client-id",
      })
    
      client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL:            "your-service-url",
        Authentication: oauth,
      })
      if err != nil {
        log.Fatal(err)
      }
    
      consumer, err := client.Subscribe(pulsar.ConsumerOptions{
        Topic:            "your-topic",
        SubscriptionName: "your-sub-name",
        Type:             pulsar.Shared,
      })
      if err != nil {
        log.Fatal(err)
      }
      defer consumer.Close()
    
      for i := 0; i < 10; i++ {
        msg, err := consumer.Receive(context.Background())
        if err != nil {
          log.Fatal(err)
        }
    
        fmt.Printf("Received message msgId: %v -- content: '%s'\n",
          msg.ID(), string(msg.Payload()))
    
        consumer.Ack(msg)
      }
    
      if err := consumer.Unsubscribe(); err != nil {
        log.Fatal(err)
      }
    }
    
  5. Create a Go producer and use the Go producer to produce messages.

    func main() {
      oauth := pulsar.NewAuthenticationOAuth2(map[string]string{
        "type":       "client_credentials",
        "issuerUrl":  "your-issuer-url",
        "audience":   "your-audience",
        "privateKey": "file:///path/to/private-key-file.json",
        "clientId":   "your-client-id",
      })
    
      client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL:            "your-service-url",
        Authentication: oauth,
      })
      if err != nil {
        log.Fatal(err)
      }
    
      producer, err := client.CreateProducer(pulsar.ProducerOptions{
        Topic: "your-topic",
      })
      if err != nil {
        log.Fatal(err)
      }
      defer producer.Close()
    
      for i := 0; i < 10; i++ {
        if msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
          Payload: []byte(fmt.Sprintf("hello-%d", i)),
        }); err != nil {
          log.Fatal(err)
        } else {
          fmt.Printf("Published message: %v \n", msgId)
        }
      }
      }
    }
    
Previous
Pulsar - Python