1. Process Data Streams
  2. Develop Functions

Develop Pulsar Functions in WASM(Private Preview)

This section introduces how to develop and pacakge WASM Pulsar functions to use on StreamNative cloud.

The WASM runtime is still in private preview stage, If you want to try it out or have any questions, please submit a ticket to the support team.

Develop

The WASM runtime is using WasmEdge, theoretically, you can use any languages which can be compiled to a WASM module to write your functions, below is an example using Rust:

  • cargo.toml
[package]
name = "excla"
version = "0.1.0"
edition = "2021"

[lib]
crate-type = ["cdylib"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
apache-avro = { version = "0.15.0", features = ["bzip", "xz", "snappy", "zstandard"] }
serde = "^1.0"
serde_json = "^1.0"
wasmedge-bindgen = "0.4.1"
wasmedge-bindgen-macro = "0.4.1"
  • src/lib.rs
#[allow(unused_imports)]
use wasmedge_bindgen::*;
use wasmedge_bindgen_macro::*;

use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize,  Serialize)]
pub struct Student {
    pub name: Option<String>,
    pub age: Option<i32>,
    pub grade: Option<i32>
}

// use `process_json` as the class name when creating functions
#[wasmedge_bindgen]
pub fn process_json(s: Vec<u8>) -> Vec<u8> {
    let stu = serde_json::from_slice::<Student>(&s[..]).unwrap();
    let stu = Student {
        grade: stu.grade.map(|grade| grade + 1),
        ..stu
    };
    let res = serde_json::to_vec(&stu).unwrap();
    return res
}

Feature Matrix

The WASM runtime doesn't support full features comparing to Java runtime, and it's still in developing, below is the matrix:

Input Arguments

InputJavaGo(Pulsar)PythonWASM
Custom SerDeβœ…βŒβœ…?
Schema - Avroβœ…βŒβœ…?
Schema - JSONβœ…βŒβœ…?
Schema - Protobufβœ…βŒβŒ?
Schema - KeyValueβœ…βŒβŒ?
Schema - AutoSchemaβœ…βŒβŒ?
Scehma - Protobuf Nativeβœ…βŒβŒ?
e-2-e encryptionβœ…βŒβœ…βœ…
maxMessageRetriesβœ…βŒβŒβœ…
dead-letter policyβœ…βŒβŒβœ…
SubscriptionNameβœ…βœ…βœ…βœ…
SubscriptionTypeβœ…βœ…βœ…βœ…
SubscriptionInitialPositionβœ…βŒβœ…βœ…
AutoAckβœ…βœ…βœ…βœ…

Note

Users can implement the Schema themselves since we are passing and expecting []byte to/from the users' function, so leave ? here.

Output Arguments

OutputJavaGo(Pulsar)PythonWASM
Custom SerDeβœ…βŒβœ…?
Schema - Avroβœ…βŒβœ…?
Schema - JSONβœ…βŒβœ…?
Schema - Protobufβœ…βŒβŒ?
Schema - KeyValueβœ…βŒβŒ?
Schema - AutoSchemaβœ…βŒβŒ?
Schema - Protobuf Nativeβœ…βŒβŒ?
useThreadLocalProducersβœ…βŒβŒβœ…
Key-based Batcherβœ…βœ…βœ…βœ…
e-2-e encryptionβœ…βŒβœ…βœ…
Compressionβœ…βœ…βœ…βœ…

Context

WASM runtime doesn't support the Context features at all for now.

Other

OtherJavaGo(Pulsar)PythonWASM
Resourcesβœ…βœ…βœ…βœ…
At-most-onceβœ…βœ…βœ…βœ…
At-least-onceβœ…βœ…βœ…βœ…
Effectively-onceβœ…βŒβœ…βŒ

Package

You need to compile the function to a .wasm module first before creating Pulsar Functions.

cargo build --target wasm32-wasi --release

Deploy

After creating a cluster, set up your environment and develop&package your function, you can use the pulsarctl, pulsar-admin command, the REST API, or terraform to deploy a Pulsar function to your cluster.

You can create a WASM Pulsar function by using a local .wasm file or an uploaded Pulsar functions package(recommend).

(Optional) Upload your function file to Pulsar

It's recommend to upload your function file to Pulsar before you create a function. Since you can add a version suffix to the package.

You need to set the context for Pulsarctl first:

# create a context
pulsarctl context set ${context-name}  \
--admin-service-url ${admin-service-url} \
--issuer-endpoint ${issuerUrl} \
--audience urn:sn:pulsar:${orgName}:${instanceName} \
--key-file ${privateKey}

# activate oauth2
pulsarctl oauth2 activate

Note

Replace the placeholder variables with the actual values that you can get when setting up client tools.

  • context-name: any name you want
  • admin-service-url: the HTTP service URL of your Pulsar cluster.
  • privateKey: the path to the downloaded OAuth2 key file.
  • issuerUrl: the URL of the OAuth2 issuer.
  • audience: the Uniform Resource Name (URN), which is a combination of the urn:sn:pulsar, your organization name, and your Pulsar instance name.

Upload packages

pulsarctl packages upload function://public/default/[email protected] \
--path exclamation-1.0-SNAPSHOT.jar \
--description "exclamation function" \
--properties fileName=exclamation-1.0-SNAPSHOT.jar

You should see the following output:

The package 'function://public/default/[email protected]' uploaded from path 'examples/api-examples.jar' successfully

Create

pulsarctl functions create \
--tenant public \
--namespace default \
--name function1 \
--inputs persistent://public/default/test-wasm-input \
--output persistent://public/default/test-wasm-output \
--classname exclamation \
--py function://public/default/[email protected] \
--custom-runtime-options '{"genericKind": "wasm"}'

Note

Since Pulsar doesn't support WASM runtime, we need to use --py to specify the function file and specify the --custom-runtime-options '{"genericKind": "wasm"}' to make it work.

You should see something like this:

Created function1 successfully

For details about Pulsar function configurations, see Pulsar function configurations.

What’s next?

Previous
Develop NodeJs Functions