This document provides examples about how to use the Pulsar Go client to connect to a Pulsar cluster through a token or an OAuth2 credential file.
This document assumes that you have created a service account, and have granted the service account produce
and consume
permissions to the namespace for the target topic.
Connect to a Pulsar cluster using a token
This section describes how to connect to you Pulsar cluster using a token.
Prerequisites
Steps
To connect to your Pulsar cluster using a token, follow these steps.
-
Connect to the Pulsar cluster.
package main
import (
"log"
"github.com/apache/pulsar-client-go/pulsar"
)
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "SERVICE_URL",
Authentication: pulsar.NewAuthenticationToken("AUTH_PARAMS"),
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
}
Set the SERVICE_URL
and AUTH_PARAMS
parameters based on the descriptions in the prepare to connect to a Pulsar cluster user guide.
-
Create a Go consumer and use the Go consumer to consume messages.
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "SERVICE_URL",
Authentication: pulsar.NewAuthenticationToken("AUTH_PARAMS"),
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "your-topic",
SubscriptionName: "your-sub-name",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
consumer.Ack(msg)
}
if err := consumer.Unsubscribe(); err != nil {
log.Fatal(err)
}
}
-
Create a Go producer and use the Go producer to produce messages.
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "SERVICE_URL",
Authentication: pulsar.NewAuthenticationToken("AUTH_PARAMS"),
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "your-topic",
})
if err != nil {
log.Fatal(err)
}
defer producer.Close()
for i := 0; i < 10; i++ {
if msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
log.Fatal(err)
} else {
fmt.Printf("Published message: %v \n", msgId)
}
}
}
Connect to a Pulsar cluster using an OAuth2 credential file
To connect to your Pulsar cluster using an OAuth2 credential file, follow these steps.
-
Generate the App credentials by following similar instructions in configure OAuth2 authentication.
-
Save the App credentials into an OAuth2 credential file.
-
Connect to your Pulsar cluster through the OAuth2 credential file.
package main
import (
"github.com/apache/pulsar-client-go/pulsar"
"log"
)
func main() {
oauth := pulsar.NewAuthenticationOAuth2(map[string]string{
"type": "client_credentials",
"issuerUrl": "your-issuer-url",
"audience": "your-audience",
"privateKey": "file:///path/to/private-key-file.json",
"clientId": "your-client-id",
})
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "your-service-url",
Authentication: oauth,
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
}
issuerUrl
: the URL of your OAuth2 identity provider.
audience
: the audience
of your Pulsar cluster.
privateKey
: the path to your OAuth2 credential file.
clientId
: the Pulsar application client ID.
URL
: the URL of your Pulsar cluster.
-
Create a Go consumer and use the Go consumer to consume messages.
func main() {
oauth := pulsar.NewAuthenticationOAuth2(map[string]string{
"type": "client_credentials",
"issuerUrl": "your-issuer-url",
"audience": "your-audience",
"privateKey": "file:///path/to/private-key-file.json",
"clientId": "your-client-id",
})
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "your-service-url",
Authentication: oauth,
})
if err != nil {
log.Fatal(err)
}
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "your-topic",
SubscriptionName: "your-sub-name",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
consumer.Ack(msg)
}
if err := consumer.Unsubscribe(); err != nil {
log.Fatal(err)
}
}
-
Create a Go producer and use the Go producer to produce messages.
func main() {
oauth := pulsar.NewAuthenticationOAuth2(map[string]string{
"type": "client_credentials",
"issuerUrl": "your-issuer-url",
"audience": "your-audience",
"privateKey": "file:///path/to/private-key-file.json",
"clientId": "your-client-id",
})
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "your-service-url",
Authentication: oauth,
})
if err != nil {
log.Fatal(err)
}
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "your-topic",
})
if err != nil {
log.Fatal(err)
}
defer producer.Close()
for i := 0; i < 10; i++ {
if msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
log.Fatal(err)
} else {
fmt.Printf("Published message: %v \n", msgId)
}
}
}
}