- Process Data Streams
- Develop Functions
Develop Pulsar Functions in Golang(Private Preview)
This section introduces how to develop and pacakge Golang Pulsar functions to use on StreamNative cloud.
We provide a different Golang runtime other than the community one, and it's still in private preview stage, If you want to try it out or have any questions, please submit a ticket to the support team.
Develop
We provide a GO SDK for developing Golang Pulsar Functions.
The following examples use Pulsar Functions SDK for the Golang language.
package main
import (
"context"
"fmt"
"strings"
"github.com/streamnative/pulsar-function-go/pf"
"github.com/sirupsen/logrus"
)
func HandleExclamation(ctx context.Context, in []byte) ([]byte, error) {
// 1. unmarshal []byte to your struct, use any schema you want
payload := string(in)
// 2. do your logic
if fc, ok := pf.FromContext(ctx); ok {
for _, word := range strings.Split(payload, " ") {
// 2.1 Incr and Get Counter from state store
_ = fc.IncrCounter(word, 1)
count, _ := fc.GetCounter(word)
// 2.2 Sending logs to a Pulsar topic
logrus.Infof("got word: %s for %d times", word, count)
}
// 2.3 Get user-defined configurations
cfg := fc.GetUserConfValue("configKey")
// 2.4 Get secret configurations
sec, err := fc.GetSecret("secretKey")
if err == nil {
msg := fmt.Sprintf("config: %v, secret: %s", cfg, *sec)
// 2.5 Publish to any topic
_, _ = fc.Publish("persistent://public/default/test-exec-package-serde-extra", []byte(msg))
}
}
data := payload + "!"
// 3. marshal your struct to []byte
return []byte(data), nil
}
func main() {
pf.Start(HandleExclamation)
}
To get more examples, please refer to examples
Feature Matrix
The StreamNative's Golang runtime doesn't support full features comparing to Java runtime, and it's still in developing, below is the matrix:
Input Arguments
Input | Java | Go(Pulsar) | Python | Go(StreamNative) |
---|---|---|---|---|
Custom SerDe | ✅ | ❌ | ✅ | ? |
Schema - Avro | ✅ | ❌ | ✅ | ? |
Schema - JSON | ✅ | ❌ | ✅ | ? |
Schema - Protobuf | ✅ | ❌ | ❌ | ? |
Schema - KeyValue | ✅ | ❌ | ❌ | ? |
Schema - AutoSchema | ✅ | ❌ | ❌ | ? |
Scehma - Protobuf Native | ✅ | ❌ | ❌ | ? |
e-2-e encryption | ✅ | ❌ | ✅ | ✅ |
maxMessageRetries | ✅ | ❌ | ❌ | ✅ |
dead-letter policy | ✅ | ❌ | ❌ | ✅ |
SubscriptionName | ✅ | ✅ | ✅ | ✅ |
SubscriptionType | ✅ | ✅ | ✅ | ✅ |
SubscriptionInitialPosition | ✅ | ❌ | ✅ | ✅ |
AutoAck | ✅ | ✅ | ✅ | ✅ |
Note
Users can implement the Schema themselves since we are passing and expecting []byte to/from the users' function, so leave ? here.
Output Arguments
Output | Java | Go(Pulsar) | Python | Go(StreamNative) |
---|---|---|---|---|
Custom SerDe | ✅ | ❌ | ✅ | ? |
Schema - Avro | ✅ | ❌ | ✅ | ? |
Schema - JSON | ✅ | ❌ | ✅ | ? |
Schema - Protobuf | ✅ | ❌ | ❌ | ? |
Schema - KeyValue | ✅ | ❌ | ❌ | ? |
Schema - AutoSchema | ✅ | ❌ | ❌ | ? |
Schema - Protobuf Native | ✅ | ❌ | ❌ | ? |
useThreadLocalProducers | ✅ | ❌ | ❌ | ✅ |
Key-based Batcher | ✅ | ✅ | ✅ | ✅ |
e-2-e encryption | ✅ | ❌ | ✅ | ✅ |
Compression | ✅ | ✅ | ✅ | ✅ |
Context
Context | Java | Go(Pulsar) | Python | Go(StreamNative) |
---|---|---|---|---|
InputTopics | ✅ | ✅ | ✅ | ✅ |
OutputTopic | ✅ | ✅ | ✅ | ✅ |
CurrentRecord | ✅ | ✅ | ✅ | ✅ |
OutputSchemaType | ✅ | ❌ | ✅ | ✅ |
Tenant | ✅ | ✅ | ✅ | ✅ |
Namespace | ✅ | ✅ | ✅ | ✅ |
FunctionName | ✅ | ✅ | ✅ | ✅ |
FunctionId | ✅ | ✅ | ✅ | ✅ |
InstanceId | ✅ | ✅ | ✅ | ✅ |
NumInstances | ✅ | ❌ | ✅ | ✅ |
FunctionVersion | ✅ | ✅ | ✅ | ✅ |
PulsarAdminClient | ✅ | ❌ | ❌ | ❌ |
GetLogger | ✅ | ❌ | ✅ | ✅ |
RecordMetrics | ✅ | ✅ | ✅ | ❌ |
UserConfig | ✅ | ✅ | ✅ | ✅ |
Secrets | ✅ | ❌ | ✅ | ✅ |
State | ✅ | ❌ | ❌ | ✅ |
Publish | ✅ | ✅ | ✅ | ✅ |
ConsumerBuilder | ✅ | ❌ | ❌ | ❌ |
Seek / Pause / Resume | ✅ | ❌ | ❌ | ❌ |
PulsarClient | ✅ | ❌ | ❌ | ❌ |
Other
Other | Java | Go(Pulsar) | Python | Go(StreamNative) |
---|---|---|---|---|
Resources | ✅ | ✅ | ✅ | ✅ |
At-most-once | ✅ | ✅ | ✅ | ✅ |
At-least-once | ✅ | ✅ | ✅ | ✅ |
Effectively-once | ✅ | ❌ | ✅ | ❌ |
Package
For Golang, we need to compile the function file to an executable one:
Prepare your function file:
package main import ( "context" "github.com/jiangpengcheng/pulsar-function-go/pf" ) func HandleExclamation(ctx context.Context, in []byte) ([]byte, error) { return []byte(string(in) + "!"), nil } func main() { pf.Start(HandleExclamation) }
Compile
go mod init func go mod tidy GO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -o exclamation exclamation.go
Deploy
After creating a cluster, set up your environment and develop&package your function, you can use the pulsarctl
, pulsar-admin
command, the REST API, or terraform
to deploy a Pulsar function to your cluster.
You can create a Golang Pulsar function by using a local compiled Golang file or an uploaded Pulsar functions package(recommend).
(Optional) Upload your function file to Pulsar
It's recommend to upload your function file to Pulsar before you create a function. Since you can add a version suffix to the package.
You need to set the context for Pulsarctl first:
# create a context
pulsarctl context set ${context-name} \
--admin-service-url ${admin-service-url} \
--issuer-endpoint ${issuerUrl} \
--audience urn:sn:pulsar:${orgName}:${instanceName} \
--key-file ${privateKey}
# activate oauth2
pulsarctl oauth2 activate
Note
Replace the placeholder variables with the actual values that you can get when setting up client tools.
context-name
: any name you wantadmin-service-url
: the HTTP service URL of your Pulsar cluster.privateKey
: the path to the downloaded OAuth2 key file.issuerUrl
: the URL of the OAuth2 issuer.audience
: the Uniform Resource Name (URN), which is a combination of theurn:sn:pulsar
, your organization name, and your Pulsar instance name.${orgName}
: the name of your organization.${instanceName}
: the name of your instance.
Upload packages
pulsarctl packages upload function://public/default/[email protected] \
--path exclamation-1.0-SNAPSHOT.jar \
--description "exclamation function" \
--properties fileName=exclamation-1.0-SNAPSHOT.jar
You should see the following output:
The package 'function://public/default/[email protected]' uploaded from path 'examples/api-examples.jar' successfully
Create
pulsarctl functions create \
--tenant public \
--namespace default \
--name function1 \
--inputs persistent://public/default/test-go-input \
--output persistent://public/default/test-go-output \
--classname exclamation \
--go function://public/default/[email protected] \
--custom-runtime-options '{"genericKind": "executable"}'
Note
We are using a different Golang runtime, need to specify the --custom-runtime-options '{"genericKind": "executable"}'
to make it work.
You should see something like this:
Created function1 successfully
For details about Pulsar function configurations, see Pulsar function configurations.
What’s next?
- Learn how to develop NodeJs functions.
- Learn how to develop WASM functions.
- Learn how to manage functions.
- Learn how to monitor functions.
- Reference common configurations.