1. Go
  2. Tutorial

Build Consumer

Next, create the consumer application by pasting the following Go code into a file named consumer.go.

package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/apache/pulsar-client-go/pulsar"
)

func main() {

	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL:            "<Broker Service URL>",
		Authentication: pulsar.NewAuthenticationToken("<API KEY>"),
	})

	if err != nil {
		fmt.Printf("Failed to create Pulsar client: %s", err)
		os.Exit(1)
	}

	if err != nil {
		fmt.Printf("Failed to create consumer: %s", err)
		os.Exit(1)
	}

	defer client.Close()

	topic := "purchases"

	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
		Topic:                       topic,
		SubscriptionName:            "pulsar-go-getting-started",
		SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
	})

	if err != nil {
		fmt.Printf("Failed to create Pulsar consumer: %s", err)
		os.Exit(1)
	}

	defer consumer.Close()

	// Set up a channel for handling Ctrl-C, etc
	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

	// Process messages
	run := true
	for run {
		select {
		case sig := <-sigchan:
			fmt.Printf("Caught signal %v: terminating\n", sig)
			run = false
		default:
			ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
			msg, err := consumer.Receive(ctx)
			cancel() // Clean up the context
			if err != nil {
				// Errors are informational and automatically handled by the consumer
				continue
			}
			fmt.Printf("Consumed event from topic %s: ID = %s, key = %-10s value = %s\n",
				topic, msg.ID(), msg.Key(), msg.Payload())

			consumer.Ack(msg)
		}
	}
}

Fill in the appropriate <Broker Service URL> and <API KEY> in the URL and Authentication properties where the consumer is instantiated via the pulsar.NewClient method. Compile the consumer as follows:

go build -o out/consumer consumer.go
Previous
Build Producer