- Go
- Tutorial
Build Producer
Let's create the producer application by pasting the following Go code into a file named producer.go
.
package main
import (
"fmt"
"math/rand"
"os"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
p, err := kafka.NewProducer(&kafka.ConfigMap{
// User-specific properties that you must set
"bootstrap.servers": "<BOOTSTRAP SERVERS>",
"sasl.username": "unused",
"sasl.password": "token:<API KEY>",
// Fixed properties
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"acks": "all"})
if err != nil {
fmt.Printf("Failed to create producer: %s", err)
os.Exit(1)
}
// Go-routine to handle message delivery reports and
// possibly other event types (errors, stats, etc)
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Produced event to topic %s: key = %-10s value = %s\n",
*ev.TopicPartition.Topic, string(ev.Key), string(ev.Value))
}
}
}
}()
users := [...]string{"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"}
items := [...]string{"book", "alarm clock", "t-shirts", "gift card", "batteries"}
topic := "purchases"
for n := 0; n < 10; n++ {
key := users[rand.Intn(len(users))]
data := items[rand.Intn(len(items))]
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Key: []byte(key),
Value: []byte(data),
}, nil)
}
// Wait for all messages to be delivered
p.Flush(15 * 1000)
p.Close()
}
Fill in the appropriate <BOOTSTRAP SERVERS>
endpoint and <API KEY>
in the bootstrap.servers
and sasl.password
properties where the producer is instantiated using the kafka.NewProducer
method.
Compile the producer with the following:
go build -o out/producer producer.go
If you get any errors during the build make sure that you initialized the module correctly per the instructions in the previous step.