This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account produce and consume permissions to a namespace for the target topic.
 
This document describes how to connect to your StreamNative cluster using the Kafka Go client using API Keys authentication.
Before you begin
- Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
 
- The password for different utilities as 
kcat will be equal to token:<API KEY>. 
 
You can follow the instructions to create an API key for the service account you choose to use.
Steps
- 
Install the Kafka Go client.
go get -u github.com/confluentinc/confluent-kafka-go/v2/kafka
 
 
- 
Build a Go application to produce and consume messages.
package main
import (
  "fmt"
  "github.com/confluentinc/confluent-kafka-go/v2/kafka"
  "time"
)
func main() {
  // Step 1: replace with your configurations
  serverUrl := "SERVER-URL"
  jwtToken := "API-KEY"
  topicName := "test-go-topic"
  namespace := "public/default"
  password := "token:" + jwtToken
  // Step 2: create a producer to send messages
  producer, err := kafka.NewProducer(&kafka.ConfigMap{
    "bootstrap.servers": serverUrl,
    "security.protocol": "SASL_SSL",
    "sasl.mechanism":    "PLAIN",
    "sasl.username":     namespace,
    "sasl.password":     password,
  })
  if err != nil {
    panic(err)
  }
  defer producer.Close()
  err = producer.Produce(&kafka.Message{
    TopicPartition: kafka.TopicPartition{Topic: &topicName, Partition: kafka.PartitionAny},
    Value:          []byte("hello world"),
  }, nil)
  if err != nil {
    panic(err)
  }
  producer.Flush(1000)
  // wait for delivery report
  e := <-producer.Events()
  message := e.(*kafka.Message)
  if message.TopicPartition.Error != nil {
    fmt.Printf("failed to deliver message: %v\n",
      message.TopicPartition)
  } else {
    fmt.Printf("delivered to topic %s [%d] at offset %v\n",
      *message.TopicPartition.Topic,
      message.TopicPartition.Partition,
      message.TopicPartition.Offset)
  }
  // Step 3: create a consumer to read messages
  consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
    "bootstrap.servers":  serverUrl,
    "security.protocol":  "SASL_SSL",
    "sasl.mechanisms":    "PLAIN",
    "sasl.username":      namespace,
    "sasl.password":      password,
    "session.timeout.ms": 6000,
    "group.id":           "my-group",
    "auto.offset.reset":  "earliest",
    "isolation.level":    "read_uncommitted", // Note: Ursa does not support read_committed for now
  })
  if err != nil {
    panic(fmt.Sprintf("Failed to create consumer: %s", err))
  }
  defer consumer.Close()
  topics := []string{topicName}
  err = consumer.SubscribeTopics(topics, nil)
  if err != nil {
    panic(fmt.Sprintf("Failed to subscribe topics: %s", err))
  }
  // read one message then exit
  for {
    fmt.Println("polling...")
    message, err = consumer.ReadMessage(1 * time.Second)
    if err == nil {
      fmt.Printf("consumed from topic %s [%d] at offset %v: %+v",
        *message.TopicPartition.Topic,
        message.TopicPartition.Partition, message.TopicPartition.Offset, string(message.Value))
      break
    }
  }
}
 
SERVER-URL: the Kafka service URL of your StreamNative cluster. 
API-KEY: an API key of your service account. 
 
- 
Run the Go application and you should see the following output:
delivered to topic test-go-topic [0] at offset 29
polling...
polling...
polling...
polling...
polling...
polling...
polling...
consumed from topic test-go-topic [0] at offset 15: hello world