Documentation Index
Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
Use this file to discover all available pages before exploring further.
This section introduces how to develop and pacakge NodeJs Pulsar functions to use on StreamNative cloud.
The NodeJs runtime is still in private preview stage, If you want to try it out or have any questions, please submit a ticket to the support team.
Develop
The StreamNative cloud will find the process method from the NodeJs function file, below is an example:
async function process(message, context) {
// 1. Get the logger to send logs to a Pulsar topic(when log-topic is set)
const logger = context.getLogger()
for (let word of message.split(' ')) {
// 2. Incre and Get counter from state store
await context.incrementCounter(word, 1)
let count = await context.getCounter(word)
logger.info(`got word: ${word} for ${count['value']} times`)
}
// 3. Get user-defined configurations
let cfg = context.getUserConfigValue('configKey')
// 4. Get secret configurations
let sec = context.getSecret('secretKey')
let result = `config: ${cfg}, secret: ${sec}`
// 5. Publish messages to any Pulsar topic
await context.publish(
'persistent://public/default/test-node-package-serde-extra',
result
)
// 6. Return the processing result
return message.concat('!')
}
It can also handle Avro and Json schema record, for Avro scheme input and output:
function process(params) {
params['grade']['int'] = params['grade']['int'] + 1
return params
}
const definitions = {
name: 'Student',
type: 'record',
fields: [
{ name: 'name', type: ['null', 'string'] },
{ name: 'age', type: ['null', 'int'] },
{ name: 'grade', type: ['null', 'int'] },
],
}
module.exports.definitions = definitions
For Json input and output:
function process(params) {
params['grade'] = params['grade'] + 1
return params
}
To get more examples, please refer to examples
Feature Matrix
The NodeJs runtime doesn’t support full features comparing to Java runtime, and it’s still in developing, below is the matrix:
| Input | Java | Go(Pulsar) | Python | NodeJs |
|---|
| Custom SerDe | ✅ | ❌ | ✅ | ✅ |
| Schema - Avro | ✅ | ❌ | ✅ | ✅ |
| Schema - JSON | ✅ | ❌ | ✅ | ✅ |
| Schema - Protobuf | ✅ | ❌ | ❌ | ❌ |
| Schema - KeyValue | ✅ | ❌ | ❌ | ❌ |
| Schema - AutoSchema | ✅ | ❌ | ❌ | ❌ |
| Scehma - Protobuf Native | ✅ | ❌ | ❌ | ❌ |
| e-2-e encryption | ✅ | ❌ | ✅ | ✅ |
| maxMessageRetries | ✅ | ❌ | ❌ | ✅ |
| dead-letter policy | ✅ | ❌ | ❌ | ✅ |
| SubscriptionName | ✅ | ✅ | ✅ | ✅ |
| SubscriptionType | ✅ | ✅ | ✅ | ✅ |
| SubscriptionInitialPosition | ✅ | ❌ | ✅ | ✅ |
| AutoAck | ✅ | ✅ | ✅ | ✅ |
Output Arguments
| Output | Java | Go(Pulsar) | Python | NodeJs |
|---|
| Custom SerDe | ✅ | ❌ | ✅ | ✅ |
| Schema - Avro | ✅ | ❌ | ✅ | ✅ |
| Schema - JSON | ✅ | ❌ | ✅ | ✅ |
| Schema - Protobuf | ✅ | ❌ | ❌ | ❌ |
| Schema - KeyValue | ✅ | ❌ | ❌ | ❌ |
| Schema - AutoSchema | ✅ | ❌ | ❌ | ❌ |
| Schema - Protobuf Native | ✅ | ❌ | ❌ | ❌ |
| useThreadLocalProducers | ✅ | ❌ | ❌ | ✅ |
| Key-based Batcher | ✅ | ✅ | ✅ | ✅ |
| e-2-e encryption | ✅ | ❌ | ✅ | ✅ |
| Compression | ✅ | ✅ | ✅ | ✅ |
Context
| Context | Java | Go(Pulsar) | Python | NodeJs |
|---|
| InputTopics | ✅ | ✅ | ✅ | ✅ |
| OutputTopic | ✅ | ✅ | ✅ | ✅ |
| CurrentRecord | ✅ | ✅ | ✅ | ✅ |
| OutputSchemaType | ✅ | ❌ | ✅ | ✅ |
| Tenant | ✅ | ✅ | ✅ | ✅ |
| Namespace | ✅ | ✅ | ✅ | ✅ |
| FunctionName | ✅ | ✅ | ✅ | ✅ |
| FunctionId | ✅ | ✅ | ✅ | ✅ |
| InstanceId | ✅ | ✅ | ✅ | ✅ |
| NumInstances | ✅ | ❌ | ✅ | ✅ |
| FunctionVersion | ✅ | ✅ | ✅ | ✅ |
| PulsarAdminClient | ✅ | ❌ | ❌ | ❌ |
| GetLogger | ✅ | ❌ | ✅ | ✅ |
| RecordMetrics | ✅ | ✅ | ✅ | ❌ |
| UserConfig | ✅ | ✅ | ✅ | ✅ |
| Secrets | ✅ | ❌ | ✅ | ✅ |
| State | ✅ | ❌ | ❌ | ✅ |
| Publish | ✅ | ✅ | ✅ | ✅ |
| ConsumerBuilder | ✅ | ❌ | ❌ | ❌ |
| Seek / Pause / Resume | ✅ | ❌ | ❌ | ❌ |
| PulsarClient | ✅ | ❌ | ❌ | ❌ |
Other
| Other | Java | Go(Pulsar) | Python | NodeJs |
|---|
| Resources | ✅ | ✅ | ✅ | ✅ |
| At-most-once | ✅ | ✅ | ✅ | ✅ |
| At-least-once | ✅ | ✅ | ✅ | ✅ |
| Effectively-once | ✅ | ❌ | ✅ | ❌ |
Package
You can either provide a single .js file or package your function into a .zip file when creating NodeJs functions.
A zip file should contain:
- the
pacakge.json file
- other source code file
Like:
Archive: pad.zip
Length Method Size Cmpr Date Time CRC-32 Name
-------- ------ ------- ---- ---------- ----- -------- ----
107 Defl:N 89 17% 2024-03-01 01:07 918cdb62 index.js
94 Defl:N 78 17% 2024-03-01 01:07 2ed1db00 package.json
sidebarTitle: Develop NodeJs Functions
-------- ------- --- -------
201 167 17% 2 files
Below is an example of the package.json:
{
"name": "test",
"main": "index.js",
"dependencies": {
"left-pad": "1.3.0"
}
}
The “main” file is required for such package.json:
function process(message) {
const leftPad = require('left-pad')
return leftPad(message, 20, 'x')
}
The StreamNative Cloud will install dependencies specified in the package.json and handle your function.
Deploy
After creating a cluster, set up your environment and develop&package your function, you can use the snctl, pulsarctl, pulsar-admin command, the REST API, or terraform to deploy a Pulsar function to your cluster.
You can create a NodeJs Pulsar function by using a local .js or .zip file or an uploaded Pulsar functions package(recommend).
(Optional) Upload your function file to Pulsar
It’s recommended to upload your function file to Pulsar before you create a function. Since you can add a version suffix to the package.
snctl
Pulsarctl
Pulsar-admin
Upload packagessnctl pulsar admin packages upload function://${tenant}/${namespace}/${package_name} \
--path ${file_path} \
--description "${description}" \
--properties fileName=${file_name}
You should see the following output:The package 'function://${tenant}/${namespace}/${package_name}' uploaded from path '${file_path}' successfully
You need to set the context for Pulsarctl first:# create a context
pulsarctl context set ${context-name} \
--admin-service-url ${admin-service-url} \
--issuer-endpoint ${issuerUrl} \
--audience urn:sn:pulsar:${orgName}:${instanceName} \
--key-file ${privateKey}
# activate oauth2
pulsarctl oauth2 activate
Replace the placeholder variables with the actual values that you can get when setting up client tools.
context-name: any name you want
admin-service-url: the HTTP service URL of your Pulsar cluster.
privateKey: the path to the downloaded OAuth2 key file.
issuerUrl: the URL of the OAuth2 issuer.
audience: the Uniform Resource Name (URN), which is a combination of the urn:sn:pulsar, your organization name, and your Pulsar instance name.
Upload packagespulsarctl packages upload function://${tenant}/${namespace}/${package_name} \
--path ${file_path} \
--description "${description}" \
--properties fileName=${file_name}
You should see the following output:The package 'function://${tenant}/${namespace}/${package_name}' uploaded from path '${file_path}' successfully
./bin/pulsar-admin \
--admin-url "${WEB_SERVICE_URL}" \
--auth-plugin org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2 \
--auth-params '{"privateKey":"file://${privateKey}","issuerUrl":"${issuerUrl}","audience":"urn:sn:pulsar:${orgName}:${instanceName}}' \
packages upload function://${tenant}/${namespace}/${package_name} \
--path ${file_path} \
--description "${description}" \
--properties fileName=${file_name}
Replace the placeholder variables with the actual values that you can get when setting up client tools.
admin-url: the HTTP service URL of your Pulsar cluster.
privateKey: the path to the downloaded OAuth2 key file.
issuerUrl: the URL of the OAuth2 issuer.
audience: the Uniform Resource Name (URN), which is a combination of the urn:sn:pulsar, your organization name, and your Pulsar instance name.
You should see the following output:The package 'function://${tenant}/${namespace}/${package_name}' uploaded from path '${file_path}' successfully
Create
snctl
Pulsarctl
Pulsar-admin
Terraform
REST API
snctl functions create \
--tenant public \
--namespace default \
--name function1 \
--inputs persistent://public/default/test-node-input \
--output persistent://public/default/test-node-output \
--classname exclamation \
--py function://public/default/node-exclamation@v0.1 \
--custom-runtime-options '{"genericKind": "nodejs"}'
--use-service-account
Since Pulsar doesn’t support NodeJs runtime, we need to use --py to specify the function file and specify the --custom-runtime-options '{"genericKind": "nodejs"}' to make it work.
You should see something like this:Created function1 successfully
pulsarctl functions create \
--tenant public \
--namespace default \
--name function1 \
--inputs persistent://public/default/test-node-input \
--output persistent://public/default/test-node-output \
--classname exclamation \
--py function://public/default/node-exclamation@v0.1 \
--custom-runtime-options '{"genericKind": "nodejs"}'
Since Pulsar doesn’t support NodeJs runtime, we need to use --py to specify the function file and specify the --custom-runtime-options '{"genericKind": "nodejs"}' to make it work.
You should see something like this:Created function1 successfully
./bin/pulsar-admin \
--admin-url "${WEB_SERVICE_URL}" \
--auth-plugin org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2 \
--auth-params '{"privateKey":"file:///YOUR-KEY-FILE-PATH","issuerUrl":"https://auth.streamnative.cloud/","audience":"urn:sn:pulsar:${orgName}:${instanceName}}' \
functions create \
--tenant public \
--namespace default \
--name function1 \
--inputs persistent://public/default/test-node-input \
--output persistent://public/default/test-node-output \
--classname exclamation \
--py function://public/default/node-exclamation@v0.1 \
--custom-runtime-options '{"genericKind": "nodejs"}'
Since Pulsar doesn’t support NodeJs runtime, we need to use --py to specify the function file and specify the --custom-runtime-options '{"genericKind": "nodejs"}' to make it work.
You should see something like this:Create your terraform yaml file:terraform {
required_providers {
pulsar = {
version = "0.2.0"
source = "registry.terraform.io/streamnative/pulsar"
}
}
}
provider "pulsar" {
web_service_url = "{$admin-url}"
api_version = "3"
audience = "urn:sn:pulsar:${orgName}:${instanceName}}"
issuer_url = "${issuerUrl}"
key_file_path = "${privateKey}"
}
// Note: function resource requires v3 api.
resource "pulsar_function" "function-1" {
provider = pulsar
name = "function1"
tenant = "public"
namespace = "default"
parallelism = 1
processing_guarantees = "ATLEAST_ONCE"
py = "function://public/default/node-exclamation@v0.1"
classname = "exclamation.ExclamationFunction"
inputs = ["persistent://public/default/test-node-input"]
output = "persistent://public/default/test-node-output"
subscription_name = "test-sub"
subscription_position = "Latest"
cleanup_subscription = true
skip_to_latest = true
forward_source_message_property = true
retain_key_ordering = true
auto_ack = true
max_message_retries = 100
dead_letter_topic = "public/default/dlt"
log_topic = "public/default/lt"
timeout_ms = 6666
secrets = jsonencode(
{
"SECRET1": {
"path": "sectest",
"key": "hello"
}
})
custom_runtime_options = jsonencode(
{
"genericKind": "nodejs",
"env": {
"HELLO": "WORLD"
}
})
}
Since Pulsar doesn’t support NodeJs runtime, we need to use --py to specify the function file and specify the --custom-runtime-options '{"genericKind": "nodejs"}' to make it work.
Init the terraform provider in the same dir of your .tf file if you haven’t done it:You should see something like this:Initializing the backend...
Initializing provider plugins...
- Finding streamnative/pulsar versions matching "0.2.0"...
- Installing streamnative/pulsar v0.2.0...
- Installed streamnative/pulsar v0.2.0 (self-signed, key ID 3105E1011F3C3671)
Partner and community providers are signed by their developers.
If you'd like to know more about provider signing, you can read about it here:
https://www.terraform.io/docs/cli/plugins/signing.html
Terraform has created a lock file .terraform.lock.hcl to record the provider
selections it made above. Include this file in your version control repository
so that Terraform can guarantee to make the same selections by default when
you run "terraform init" in the future.
Terraform has been successfully initialized!
You may now begin working with Terraform. Try running "terraform plan" to see
any changes that are required for your infrastructure. All Terraform commands
should now work.
If you ever set or change modules or backend configuration for Terraform,
rerun this command to reinitialize your working directory. If you forget, other
commands will detect it and remind you to do so if necessary.
Create the function:You should see something like:Terraform used the selected providers to generate the following execution plan. Resource actions are indicated with the following symbols:
+ create
Terraform will perform the following actions:
# pulsar_function.function-1 will be created
+ resource "pulsar_function" "function-1" {
+ auto_ack = true
+ classname = "exclamation.ExclamationFunction"
+ cleanup_subscription = true
+ cpu = 0.5
+ custom_runtime_options = jsonencode(
{
+ genericKind = "nodejs",
+ env = {
+ HELLO = "WORLD"
}
}
)
+ dead_letter_topic = "public/default/dlt"
+ disk_mb = 128
+ forward_source_message_property = true
+ id = (known after apply)
+ inputs = [
+ "persistent://public/default/test-node-input",
]
+ py = "function://public/default/node-exclamation@v0.1"
+ log_topic = "public/default/lt"
+ max_message_retries = 100
+ name = "function1"
+ namespace = "default"
+ output = "persistent://public/default/test-node-output"
+ parallelism = 1
+ processing_guarantees = "ATLEAST_ONCE"
+ ram_mb = 128
+ retain_key_ordering = true
+ secrets = jsonencode(
{
+ SECRET1 = {
+ key = "hello"
+ path = "sectest"
}
}
)
+ skip_to_latest = true
+ subscription_name = "test-sub"
+ subscription_position = "Latest"
+ tenant = "public"
+ timeout_ms = 6666
}
Plan: 1 to add, 0 to change, 0 to destroy.
Do you want to perform these actions?
Terraform will perform the actions described above.
Only 'yes' will be accepted to approve.
Enter a value:
After enter “yes”, you should see the following:pulsar_function.function-1: Creating...
pulsar_function.function-1: Creation complete after 1s [id=public/default/function1]
Apply complete! Resources: 1 added, 0 changed, 0 destroyed.
Replace the placeholder variables with the actual values that you can get when setting up client tools.
admin-url: the HTTP service URL of your Pulsar cluster.
privateKey: the path to the downloaded OAuth2 key file.
issuerUrl: the URL of the OAuth2 issuer.
audience: the Uniform Resource Name (URN), which is a combination of the urn:sn:pulsar, your organization name, and your Pulsar instance name.
If you would like to create a function configuration using the REST API you can
do so using CURL.curl -X POST ${WEB_SERVICE_URL}/admin/v3/functions/public/default/${FUNCTION_NAME} \
-H 'Authorization: Bearer ${TOKEN}' \
-H "Content-Type: multipart/form-data" \
-F 'functionConfig={"name": "${FUNCTION_NAME}", "tenant": "public", "namespace": "default", "runtime": "PYTHON", "py": "function://public/default/${PACKAGE_NAME}@${PACKAGE_VERSION}", "output": "public/default/output-test", "inputs": ["public/default/input"], "className": "exclamation", "customRuntimeOptions": "{\"genericKind\": \"nodejs\"}"};type=application/json' \
-F 'url=function://public/default/${PACKAGE_NAME}@${PACKAGE_VERSION}'
Since Pulsar doesn’t support NodeJs runtime, we need to use --py to specify the function file and specify the --custom-runtime-options '{"genericKind": "nodejs"}' to make it work.
The function is assumed to be already uploaded at this point. If you have not
uploaded the function, change the url parameter to be your local filepath.
This will look something like the following.-F 'url=file://$YOUR_LCOAL_FUNCTION_FILE'
You should see something like this:Replace the placeholder variables with the actual values that you can get when setting up client tools.
WEB_SERVICE_URL: the HTTPS service URL of your Pulsar cluster.
TOKEN: a valid token to interact with your Pulsar cluster.
FUNCTION_NAME: the name of your function.
FUNCTION_VERSION: the version of your function you want to deploy e.g. 1.12.
For details about Pulsar function configurations, see Pulsar function configurations.
What’s next?