- StreamNative Cloud
- Connect
Connect to cluster through Go client
This example shows how to connect to a cluster through a Go client and use the Go producer and consumer to produce and consume messages to and from a topic. The Go client supports connecting to a Pulsar cluster either through the OAuth2 authentication plugin or the Token authentication plugin.
This document assumes that you have created a Pulsar cluster and a service account, and have granted the service account produce
and consume
permissions to the namespace for the target topic.
Prerequisites
- Go 1.11 or higher version
- Go client 0.1.1+ (without 0.1.1)
If you have not installed Go, install it according to the installation instructions.
Connect to a cluster through the OAuth2 authentication plugin
To connect to a cluster through the OAuth2 authentication plugin, follow these steps.
Get the service URL of your Pulsar cluster. For details, see get a service URL.
Get the OAuth2 authentication parameters. For details, see get an OAuth2 credential file.
Connect to a Pulsar cluster through the OAuth2 authentication plugin.
func main() { oauth := pulsar.NewAuthenticationOAuth2(map[string]string{ "type": "client_credentials", "issuerUrl": "https://auth.streamnative.cloud", "audience": "your_audience", "privateKey": "file:///absolute path/to/key/file.json", "clientId": "your_client_id", }) client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "broker-service-url", Authentication: oauth, }) if err != nil { log.Fatal(err) } defer client.Close() }
issuerUrl
: the URL of your OAuth2 authentication provider. You can get the value from your downloaded OAuth2 credential file.audience
: theaudience
parameter is the Uniform Resource Name (URN), which is a combination of theurn:sn:pulsar
, the organization name, and the Pulsar instance name, in this formaturn:sn:pulsar:<org_name>:<instance_name>
.privateKey
: the path to your downloaded OAuth2 credential file. TheprivateKey
parameter supports the following three pattern formats:file:///path/to/file
file:/path/to/file
data:application/json;base64,<base64-encoded value>
clientID
: the client ID of your application. You can get the value from your downloaded OAuth2 credential file.URL
: the broker service URL of your Pulsar cluster.
Create a Go consumer and use the Go consumer to consume messages.
func main() { oauth := pulsar.NewAuthenticationOAuth2(map[string]string{ "type": "client_credentials", "issuerUrl": "https://auth.streamnative.cloud", "audience": "your_audience", "privateKey": "file:///absolute path/to/key/file.json", "clientId": "your_client_id", }) client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "broker-service-url", Authentication: oauth, }) if err != nil { log.Fatal(err) } consumer, err := client.Subscribe(pulsar.ConsumerOptions{ Topic: "topic-1", SubscriptionName: "my-sub", Type: pulsar.Shared, }) if err != nil { log.Fatal(err) } defer consumer.Close() for i := 0; i < 10; i++ { msg, err := consumer.Receive(context.Background()) if err != nil { log.Fatal(err) } fmt.Printf("Received message msgId: %v -- content: '%s'\n", msg.ID(), string(msg.Payload())) consumer.Ack(msg) } if err := consumer.Unsubscribe(); err != nil { log.Fatal(err) } }
Create a Go producer and use the Go producer to produce messages.
func main() { oauth := pulsar.NewAuthenticationOAuth2(map[string]string{ "type": "client_credentials", "issuerUrl": "https://auth.streamnative.cloud", "audience": "your_audience", "privateKey": "file:///absolute path/to/key/file.json", "clientId": "your_client_id", }) client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "broker-service-url", Authentication: oauth, }) if err != nil { log.Fatal(err) } producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: "topic-1", }) if err != nil { log.Fatal(err) } defer producer.Close() for i := 0; i < 10; i++ { if msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-%d", i)), }); err != nil { log.Fatal(err) } else { fmt.Printf("Published message: %v \n", msgId) } } }
Connect to a cluster through the Token authentication plugin
To connect to a cluster through the Token authentication plugin, follow these steps.
Get the Pulsar service URLs. For details, see get a service URL.
Get the Token authentication parameters. For details, see get a token.
Connect to a Pulsar cluster through the Token authentication plugin.
func main() { client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: SERVICE_URL, Authentication: pulsar.NewAuthenticationToken(AUTH_PARAMS), }) if err != nil { log.Fatal(err) } defer client.Close() }
SERVICE_URL
: the broker service URL of your Pulsar cluster.AUTH_PARAMS
: the token of your service account.
For a complete example about how to connect to a cluster through the Pulsar Go client, see Go client examples.