- Go
- Tutorial
Build Consumer
Next, create the consumer application by pasting the following Go code into a file named consumer.go
.
package main
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/apache/pulsar-client-go/pulsar"
)
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "<Broker Service URL>",
Authentication: pulsar.NewAuthenticationToken("<API KEY>"),
})
if err != nil {
fmt.Printf("Failed to create Pulsar client: %s", err)
os.Exit(1)
}
if err != nil {
fmt.Printf("Failed to create consumer: %s", err)
os.Exit(1)
}
defer client.Close()
topic := "purchases"
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: "pulsar-go-getting-started",
SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
})
if err != nil {
fmt.Printf("Failed to create Pulsar consumer: %s", err)
os.Exit(1)
}
defer consumer.Close()
// Set up a channel for handling Ctrl-C, etc
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
// Process messages
run := true
for run {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
msg, err := consumer.Receive(ctx)
cancel() // Clean up the context
if err != nil {
// Errors are informational and automatically handled by the consumer
continue
}
fmt.Printf("Consumed event from topic %s: ID = %s, key = %-10s value = %s\n",
topic, msg.ID(), msg.Key(), msg.Payload())
consumer.Ack(msg)
}
}
}
Fill in the appropriate <Broker Service URL>
and <API KEY>
in the URL
and Authentication
properties where the consumer is instantiated via the pulsar.NewClient
method. Compile the consumer as follows:
go build -o out/consumer consumer.go