Connect to Pulsar cluster using Go client
This document provides examples about how to use the Pulsar Go client to connect to a Pulsar cluster through a token or an OAuth2 credential file.
Note
This document assumes that you have created a service account, and have granted the service account produce
and consume
permissions to the namespace for the target topic.
Connect to a Pulsar cluster using a token
This section describes how to connect to you Pulsar cluster using a token.
Prerequisites
Steps
To connect to your Pulsar cluster using a token, follow these steps.
Connect to the Pulsar cluster.
package main import ( "log" "github.com/apache/pulsar-client-go/pulsar" ) func main() { client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "SERVICE_URL", Authentication: pulsar.NewAuthenticationToken("AUTH_PARAMS"), }) if err != nil { log.Fatal(err) } defer client.Close() }
Set the
SERVICE_URL
andAUTH_PARAMS
parameters based on the descriptions in the prepare to connect to a Pulsar cluster user guide.Create a Go consumer and use the Go consumer to consume messages.
func main() { client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "SERVICE_URL", Authentication: pulsar.NewAuthenticationToken("AUTH_PARAMS"), }) if err != nil { log.Fatal(err) } defer client.Close() consumer, err := client.Subscribe(pulsar.ConsumerOptions{ Topic: "your-topic", SubscriptionName: "your-sub-name", Type: pulsar.Shared, }) if err != nil { log.Fatal(err) } defer consumer.Close() for i := 0; i < 10; i++ { msg, err := consumer.Receive(context.Background()) if err != nil { log.Fatal(err) } fmt.Printf("Received message msgId: %v -- content: '%s'\n", msg.ID(), string(msg.Payload())) consumer.Ack(msg) } if err := consumer.Unsubscribe(); err != nil { log.Fatal(err) } }
Create a Go producer and use the Go producer to produce messages.
func main() { client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "SERVICE_URL", Authentication: pulsar.NewAuthenticationToken("AUTH_PARAMS"), }) if err != nil { log.Fatal(err) } defer client.Close() producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: "your-topic", }) if err != nil { log.Fatal(err) } defer producer.Close() for i := 0; i < 10; i++ { if msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-%d", i)), }); err != nil { log.Fatal(err) } else { fmt.Printf("Published message: %v \n", msgId) } } }
Connect to a Pulsar cluster using an OAuth2 credential file
To connect to your Pulsar cluster using an OAuth2 credential file, follow these steps.
Generate the App credentials by following similar instructions in configure OAuth2 authentication.
Save the App credentials into an OAuth2 credential file.
Connect to your Pulsar cluster through the OAuth2 credential file.
package main import ( "github.com/apache/pulsar-client-go/pulsar" "log" ) func main() { oauth := pulsar.NewAuthenticationOAuth2(map[string]string{ "type": "client_credentials", "issuerUrl": "your-issuer-url", "audience": "your-audience", "privateKey": "file:///path/to/private-key-file.json", "clientId": "your-client-id", }) client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "your-service-url", Authentication: oauth, }) if err != nil { log.Fatal(err) } defer client.Close() }
issuerUrl
: the URL of your OAuth2 identity provider.audience
: theaudience
of your Pulsar cluster.privateKey
: the path to your OAuth2 credential file.clientId
: the Pulsar application client ID.URL
: the URL of your Pulsar cluster.
Create a Go consumer and use the Go consumer to consume messages.
func main() { oauth := pulsar.NewAuthenticationOAuth2(map[string]string{ "type": "client_credentials", "issuerUrl": "your-issuer-url", "audience": "your-audience", "privateKey": "file:///path/to/private-key-file.json", "clientId": "your-client-id", }) client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "your-service-url", Authentication: oauth, }) if err != nil { log.Fatal(err) } consumer, err := client.Subscribe(pulsar.ConsumerOptions{ Topic: "your-topic", SubscriptionName: "your-sub-name", Type: pulsar.Shared, }) if err != nil { log.Fatal(err) } defer consumer.Close() for i := 0; i < 10; i++ { msg, err := consumer.Receive(context.Background()) if err != nil { log.Fatal(err) } fmt.Printf("Received message msgId: %v -- content: '%s'\n", msg.ID(), string(msg.Payload())) consumer.Ack(msg) } if err := consumer.Unsubscribe(); err != nil { log.Fatal(err) } }
Create a Go producer and use the Go producer to produce messages.
func main() { oauth := pulsar.NewAuthenticationOAuth2(map[string]string{ "type": "client_credentials", "issuerUrl": "your-issuer-url", "audience": "your-audience", "privateKey": "file:///path/to/private-key-file.json", "clientId": "your-client-id", }) client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "your-service-url", Authentication: oauth, }) if err != nil { log.Fatal(err) } producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: "your-topic", }) if err != nil { log.Fatal(err) } defer producer.Close() for i := 0; i < 10; i++ { if msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-%d", i)), }); err != nil { log.Fatal(err) } else { fmt.Printf("Published message: %v \n", msgId) } } } }