- Build Applications
- Kafka Clients
- Quick Starts
Connect to your cluster using the Kafka Go client
Note
This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account produce
and consume
permissions to a namespace for the target topic.
This document describes how to connect to your StreamNative cluster using the Kafka Go client using API Keys authentication.
Before you begin
Note
- Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
- The password for different utilities as
kcat
will be equal totoken:<API KEY>
.
You can follow the instructions to create an API key for the service account you choose to use.
Steps
Install the Kafka Go client.
go get -u github.com/confluentinc/confluent-kafka-go/v2/kafka
Build a Go application to produce and consume messages.
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "time" ) func main() { // Step 1: replace with your configurations serverUrl := "SERVER-URL" jwtToken := "API-KEY" topicName := "test-go-topic" namespace := "public/default" password := "token:" + jwtToken // Step 2: create a producer to send messages producer, err := kafka.NewProducer(&kafka.ConfigMap{ "bootstrap.servers": serverUrl, "security.protocol": "SASL_SSL", "sasl.mechanism": "PLAIN", "sasl.username": namespace, "sasl.password": password, }) if err != nil { panic(err) } defer producer.Close() err = producer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topicName, Partition: kafka.PartitionAny}, Value: []byte("hello world"), }, nil) if err != nil { panic(err) } producer.Flush(1000) // wait for delivery report e := <-producer.Events() message := e.(*kafka.Message) if message.TopicPartition.Error != nil { fmt.Printf("failed to deliver message: %v\n", message.TopicPartition) } else { fmt.Printf("delivered to topic %s [%d] at offset %v\n", *message.TopicPartition.Topic, message.TopicPartition.Partition, message.TopicPartition.Offset) } // Step 3: create a consumer to read messages consumer, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": serverUrl, "security.protocol": "SASL_SSL", "sasl.mechanisms": "PLAIN", "sasl.username": namespace, "sasl.password": password, "session.timeout.ms": 6000, "group.id": "my-group", "auto.offset.reset": "earliest", "isolation.level": "read_uncommitted", // Note: Ursa does not support read_committed for now }) if err != nil { panic(fmt.Sprintf("Failed to create consumer: %s", err)) } defer consumer.Close() topics := []string{topicName} err = consumer.SubscribeTopics(topics, nil) if err != nil { panic(fmt.Sprintf("Failed to subscribe topics: %s", err)) } // read one message then exit for { fmt.Println("polling...") message, err = consumer.ReadMessage(1 * time.Second) if err == nil { fmt.Printf("consumed from topic %s [%d] at offset %v: %+v", *message.TopicPartition.Topic, message.TopicPartition.Partition, message.TopicPartition.Offset, string(message.Value)) break } } }
SERVER-URL
: the Kafka service URL of your StreamNative cluster.API-KEY
: an API key of your service account.
Run the Go application and you should see the following output:
delivered to topic test-go-topic [0] at offset 29 polling... polling... polling... polling... polling... polling... polling... consumed from topic test-go-topic [0] at offset 15: hello world