1. Process Data Streams
  2. Develop Functions

Develop Pulsar Functions in Golang(Private Preview)

This section introduces how to develop and pacakge Golang Pulsar functions to use on StreamNative cloud.

We provide a different Golang runtime other than the community one, and it's still in private preview stage, If you want to try it out or have any questions, please submit a ticket to the support team.

Develop

We provide a GO SDK for developing Golang Pulsar Functions.

The following examples use Pulsar Functions SDK for the Golang language.

package main

import (
	"context"
	"fmt"
	"strings"

	"github.com/streamnative/pulsar-function-go/pf"
	"github.com/sirupsen/logrus"
)

func HandleExclamation(ctx context.Context, in []byte) ([]byte, error) {
	// 1. unmarshal []byte to your struct, use any schema you want
	payload := string(in)

	// 2. do your logic
	if fc, ok := pf.FromContext(ctx); ok {
		for _, word := range strings.Split(payload, " ") {
			// 2.1 Incr and Get Counter from state store
			_ = fc.IncrCounter(word, 1)
			count, _ := fc.GetCounter(word)
			// 2.2 Sending logs to a Pulsar topic
			logrus.Infof("got word: %s for %d times", word, count)
		}
        // 2.3 Get user-defined configurations
        cfg := fc.GetUserConfValue("configKey")
		// 2.4 Get secret configurations
        sec, err := fc.GetSecret("secretKey")
        if err == nil {
            msg := fmt.Sprintf("config: %v, secret: %s", cfg, *sec)
			// 2.5 Publish to any topic
            _, _ = fc.Publish("persistent://public/default/test-exec-package-serde-extra", []byte(msg))
        }
	}
	data := payload + "!"

	// 3. marshal your struct to []byte
	return []byte(data), nil
}

func main() {
	pf.Start(HandleExclamation)
}

To get more examples, please refer to examples

Feature Matrix

The StreamNative's Golang runtime doesn't support full features comparing to Java runtime, and it's still in developing, below is the matrix:

Input Arguments

InputJavaGo(Pulsar)PythonGo(StreamNative)
Custom SerDe?
Schema - Avro?
Schema - JSON?
Schema - Protobuf?
Schema - KeyValue?
Schema - AutoSchema?
Scehma - Protobuf Native?
e-2-e encryption
maxMessageRetries
dead-letter policy
SubscriptionName
SubscriptionType
SubscriptionInitialPosition
AutoAck

Note

Users can implement the Schema themselves since we are passing and expecting []byte to/from the users' function, so leave ? here.

Output Arguments

OutputJavaGo(Pulsar)PythonGo(StreamNative)
Custom SerDe?
Schema - Avro?
Schema - JSON?
Schema - Protobuf?
Schema - KeyValue?
Schema - AutoSchema?
Schema - Protobuf Native?
useThreadLocalProducers
Key-based Batcher
e-2-e encryption
Compression

Context

ContextJavaGo(Pulsar)PythonGo(StreamNative)
InputTopics
OutputTopic
CurrentRecord
OutputSchemaType
Tenant
Namespace
FunctionName
FunctionId
InstanceId
NumInstances
FunctionVersion
PulsarAdminClient
GetLogger
RecordMetrics
UserConfig
Secrets
State
Publish
ConsumerBuilder
Seek / Pause / Resume
PulsarClient

Other

OtherJavaGo(Pulsar)PythonGo(StreamNative)
Resources
At-most-once
At-least-once
Effectively-once

Package

For Golang, we need to compile the function file to an executable one:

  1. Prepare your function file:

    package main
    
    import (
        "context"
        "github.com/jiangpengcheng/pulsar-function-go/pf"
    )
    
    func HandleExclamation(ctx context.Context, in []byte) ([]byte, error) {
        return []byte(string(in) + "!"), nil
    }
    
    func main() {
        pf.Start(HandleExclamation)
    }
    
  2. Compile

    go mod init func
    go mod tidy
    GO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -o exclamation exclamation.go
    

Deploy

After creating a cluster, set up your environment and develop&package your function, you can use the pulsarctl, pulsar-admin command, the REST API, or terraform to deploy a Pulsar function to your cluster.

You can create a Golang Pulsar function by using a local compiled Golang file or an uploaded Pulsar functions package(recommend).

(Optional) Upload your function file to Pulsar

It's recommend to upload your function file to Pulsar before you create a function. Since you can add a version suffix to the package.

You need to set the context for Pulsarctl first:

# create a context
pulsarctl context set ${context-name}  \
--admin-service-url ${admin-service-url} \
--issuer-endpoint ${issuerUrl} \
--audience urn:sn:pulsar:${orgName}:${instanceName} \
--key-file ${privateKey}

# activate oauth2
pulsarctl oauth2 activate

Note

Replace the placeholder variables with the actual values that you can get when setting up client tools.

  • context-name: any name you want
  • admin-service-url: the HTTP service URL of your Pulsar cluster.
  • privateKey: the path to the downloaded OAuth2 key file.
  • issuerUrl: the URL of the OAuth2 issuer.
  • audience: the Uniform Resource Name (URN), which is a combination of the urn:sn:pulsar, your organization name, and your Pulsar instance name.

Upload packages

pulsarctl packages upload function://public/default/[email protected] \
--path exclamation-1.0-SNAPSHOT.jar \
--description "exclamation function" \
--properties fileName=exclamation-1.0-SNAPSHOT.jar

You should see the following output:

The package 'function://public/default/[email protected]' uploaded from path 'examples/api-examples.jar' successfully

Create

pulsarctl functions create \
--tenant public \
--namespace default \
--name function1 \
--inputs persistent://public/default/test-go-input \
--output persistent://public/default/test-go-output \
--classname exclamation \
--go function://public/default/[email protected] \
--custom-runtime-options '{"genericKind": "executable"}'

Note

We are using a different Golang runtime, need to specify the --custom-runtime-options '{"genericKind": "executable"}' to make it work.

You should see something like this:

Created function1 successfully

For details about Pulsar function configurations, see Pulsar function configurations.

What’s next?

Previous
Develop Python Functions