package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"time"
)
func main() {
// Step 1: replace with your configurations
serverUrl := "SERVER-URL"
jwtToken := "API-KEY"
topicName := "test-go-topic"
namespace := "public/default"
password := "token:" + jwtToken
// Step 2: create a producer to send messages
producer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": serverUrl,
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.username": namespace,
"sasl.password": password,
})
if err != nil {
panic(err)
}
defer producer.Close()
err = producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topicName, Partition: kafka.PartitionAny},
Value: []byte("hello world"),
}, nil)
if err != nil {
panic(err)
}
producer.Flush(1000)
// wait for delivery report
e := <-producer.Events()
message := e.(*kafka.Message)
if message.TopicPartition.Error != nil {
fmt.Printf("failed to deliver message: %v\n",
message.TopicPartition)
} else {
fmt.Printf("delivered to topic %s [%d] at offset %v\n",
*message.TopicPartition.Topic,
message.TopicPartition.Partition,
message.TopicPartition.Offset)
}
// Step 3: create a consumer to read messages
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": serverUrl,
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"sasl.username": namespace,
"sasl.password": password,
"session.timeout.ms": 6000,
"group.id": "my-group",
"auto.offset.reset": "earliest",
"isolation.level": "read_uncommitted", // Note: Ursa does not support read_committed for now
})
if err != nil {
panic(fmt.Sprintf("Failed to create consumer: %s", err))
}
defer consumer.Close()
topics := []string{topicName}
err = consumer.SubscribeTopics(topics, nil)
if err != nil {
panic(fmt.Sprintf("Failed to subscribe topics: %s", err))
}
// read one message then exit
for {
fmt.Println("polling...")
message, err = consumer.ReadMessage(1 * time.Second)
if err == nil {
fmt.Printf("consumed from topic %s [%d] at offset %v: %+v",
*message.TopicPartition.Topic,
message.TopicPartition.Partition, message.TopicPartition.Offset, string(message.Value))
break
}
}
}