1. Build Applications
  2. Kafka Clients
  3. Quick Starts

Connect to your cluster using the Kafka Go client

Note

This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account produce and consume permissions to a namespace for the target topic.

This document describes how to connect to your StreamNative cluster using the Kafka Go client using API Keys authentication.

Before you begin

Note

  • Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
  • The password for different utilities as kcat will be equal to token:<API KEY>.

You can follow the instructions to create an API key for the service account you choose to use.

Steps

  1. Install the Kafka Go client.

    go get -u github.com/confluentinc/confluent-kafka-go/v2/kafka
    
  2. Build a Go application to produce and consume messages.

    package main
    
    import (
      "fmt"
      "github.com/confluentinc/confluent-kafka-go/v2/kafka"
      "time"
    )
    
    func main() {
      // Step 1: replace with your configurations
      serverUrl := "SERVER-URL"
      jwtToken := "API-KEY"
      topicName := "test-go-topic"
      namespace := "public/default"
      password := "token:" + jwtToken
    
      // Step 2: create a producer to send messages
      producer, err := kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": serverUrl,
        "security.protocol": "SASL_SSL",
        "sasl.mechanism":    "PLAIN",
        "sasl.username":     namespace,
        "sasl.password":     password,
      })
      if err != nil {
        panic(err)
      }
      defer producer.Close()
    
      err = producer.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topicName, Partition: kafka.PartitionAny},
        Value:          []byte("hello world"),
      }, nil)
    
      if err != nil {
        panic(err)
      }
      producer.Flush(1000)
    
      // wait for delivery report
      e := <-producer.Events()
    
      message := e.(*kafka.Message)
      if message.TopicPartition.Error != nil {
        fmt.Printf("failed to deliver message: %v\n",
          message.TopicPartition)
      } else {
        fmt.Printf("delivered to topic %s [%d] at offset %v\n",
          *message.TopicPartition.Topic,
          message.TopicPartition.Partition,
          message.TopicPartition.Offset)
      }
    
      // Step 3: create a consumer to read messages
      consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":  serverUrl,
        "security.protocol":  "SASL_SSL",
        "sasl.mechanisms":    "PLAIN",
        "sasl.username":      namespace,
        "sasl.password":      password,
        "session.timeout.ms": 6000,
        "group.id":           "my-group",
        "auto.offset.reset":  "earliest",
        "isolation.level":    "read_uncommitted", // Note: Ursa does not support read_committed for now
      })
    
      if err != nil {
        panic(fmt.Sprintf("Failed to create consumer: %s", err))
      }
      defer consumer.Close()
    
      topics := []string{topicName}
      err = consumer.SubscribeTopics(topics, nil)
      if err != nil {
        panic(fmt.Sprintf("Failed to subscribe topics: %s", err))
      }
    
      // read one message then exit
      for {
        fmt.Println("polling...")
        message, err = consumer.ReadMessage(1 * time.Second)
        if err == nil {
          fmt.Printf("consumed from topic %s [%d] at offset %v: %+v",
            *message.TopicPartition.Topic,
            message.TopicPartition.Partition, message.TopicPartition.Offset, string(message.Value))
          break
        }
      }
    }
    
    • SERVER-URL: the Kafka service URL of your StreamNative cluster.
    • API-KEY: an API key of your service account.
  3. Run the Go application and you should see the following output:

    delivered to topic test-go-topic [0] at offset 29
    polling...
    polling...
    polling...
    polling...
    polling...
    polling...
    polling...
    consumed from topic test-go-topic [0] at offset 15: hello world
    
Previous
Kafka Python