> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Connect to your cluster using the Kafka Go client

<Note title="Note">
  This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account `produce` and `consume` permissions to a namespace for the target topic.
</Note>

This document describes how to connect to your StreamNative cluster using the Kafka Go client using [API Keys](/cloud/security/authentication/service-accounts/use-api-keys/api-keys-overview#kafka-clients) authentication.

## Before you begin

<Note title="Note">
  * Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
  * The password for different utilities as `kcat` will be equal to `token:<API KEY>`.
</Note>

You can follow the instructions to [create an API key](/cloud/security/authentication/service-accounts/use-api-keys/api-keys-overview#using-api-keys-to-connect-to-your-cluster) for the service account you choose to use.

## Steps

1. Install the Kafka Go client.

   ```bash theme={null}
   go get -u github.com/confluentinc/confluent-kafka-go/v2/kafka
   ```

2. Build a Go application to produce and consume messages.

   ```go theme={null}
   package main

   import (
     "fmt"
     "github.com/confluentinc/confluent-kafka-go/v2/kafka"
     "time"
   )

   func main() {
     // Step 1: replace with your configurations
     serverUrl := "SERVER-URL"
     jwtToken := "API-KEY"
     topicName := "test-go-topic"
     namespace := "public/default"
     password := "token:" + jwtToken

     // Step 2: create a producer to send messages
     producer, err := kafka.NewProducer(&kafka.ConfigMap{
       "bootstrap.servers": serverUrl,
       "security.protocol": "SASL_SSL",
       "sasl.mechanism":    "PLAIN",
       "sasl.username":     namespace,
       "sasl.password":     password,
     })
     if err != nil {
       panic(err)
     }
     defer producer.Close()

     err = producer.Produce(&kafka.Message{
       TopicPartition: kafka.TopicPartition{Topic: &topicName, Partition: kafka.PartitionAny},
       Value:          []byte("hello world"),
     }, nil)

     if err != nil {
       panic(err)
     }
     producer.Flush(1000)

     // wait for delivery report
     e := <-producer.Events()

     message := e.(*kafka.Message)
     if message.TopicPartition.Error != nil {
       fmt.Printf("failed to deliver message: %v\n",
         message.TopicPartition)
     } else {
       fmt.Printf("delivered to topic %s [%d] at offset %v\n",
         *message.TopicPartition.Topic,
         message.TopicPartition.Partition,
         message.TopicPartition.Offset)
     }

     // Step 3: create a consumer to read messages
     consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
       "bootstrap.servers":  serverUrl,
       "security.protocol":  "SASL_SSL",
       "sasl.mechanisms":    "PLAIN",
       "sasl.username":      namespace,
       "sasl.password":      password,
       "session.timeout.ms": 6000,
       "group.id":           "my-group",
       "auto.offset.reset":  "earliest",
       "isolation.level":    "read_uncommitted", // Note: Ursa does not support read_committed for now
     })

     if err != nil {
       panic(fmt.Sprintf("Failed to create consumer: %s", err))
     }
     defer consumer.Close()

     topics := []string{topicName}
     err = consumer.SubscribeTopics(topics, nil)
     if err != nil {
       panic(fmt.Sprintf("Failed to subscribe topics: %s", err))
     }

     // read one message then exit
     for {
       fmt.Println("polling...")
       message, err = consumer.ReadMessage(1 * time.Second)
       if err == nil {
         fmt.Printf("consumed from topic %s [%d] at offset %v: %+v",
           *message.TopicPartition.Topic,
           message.TopicPartition.Partition, message.TopicPartition.Offset, string(message.Value))
         break
       }
     }
   }
   ```

   * `SERVER-URL`: the Kafka service URL of your StreamNative cluster.
   * `API-KEY`: an API key of your service account.

3. Run the Go application and you should see the following output:

   ```bash theme={null}
   delivered to topic test-go-topic [0] at offset 29
   polling...
   polling...
   polling...
   polling...
   polling...
   polling...
   polling...
   consumed from topic test-go-topic [0] at offset 15: hello world
   ```
