- Process Data Streams
- Develop Functions
Develop Pulsar Functions in NodeJs(Private Preview)
This section introduces how to develop and pacakge NodeJs Pulsar functions to use on StreamNative cloud.
The NodeJs runtime is still in private preview stage, If you want to try it out or have any questions, please submit a ticket to the support team.
Develop
The StreamNative cloud will find the process
method from the NodeJs function file, below is an example:
async function process(message, context) {
// 1. Get the logger to send logs to a Pulsar topic(when log-topic is set)
const logger = context.getLogger()
for (let word of message.split(' ')) {
// 2. Incre and Get counter from state store
await context.incrementCounter(word, 1)
let count = await context.getCounter(word)
logger.info(`got word: ${word} for ${count['value']} times`)
}
// 3. Get user-defined configurations
let cfg = context.getUserConfigValue('configKey')
// 4. Get secret configurations
let sec = context.getSecret('secretKey')
let result = `config: ${cfg}, secret: ${sec}`
// 5. Publish messages to any Pulsar topic
await context.publish(
'persistent://public/default/test-node-package-serde-extra',
result
)
// 6. Return the processing result
return message.concat('!')
}
It can also handle Avro
and Json
schema record, for Avro
scheme input and output:
function process(params) {
params['grade']['int'] = params['grade']['int'] + 1
return params
}
const definitions = {
name: 'Student',
type: 'record',
fields: [
{ name: 'name', type: ['null', 'string'] },
{ name: 'age', type: ['null', 'int'] },
{ name: 'grade', type: ['null', 'int'] },
],
}
module.exports.definitions = definitions
For Json
input and output:
function process(params) {
params['grade'] = params['grade'] + 1
return params
}
To get more examples, please refer to examples
Feature Matrix
The NodeJs runtime doesn't support full features comparing to Java runtime, and it's still in developing, below is the matrix:
Input Arguments
Input | Java | Go(Pulsar) | Python | NodeJs |
---|---|---|---|---|
Custom SerDe | ✅ | ❌ | ✅ | ✅ |
Schema - Avro | ✅ | ❌ | ✅ | ✅ |
Schema - JSON | ✅ | ❌ | ✅ | ✅ |
Schema - Protobuf | ✅ | ❌ | ❌ | ❌ |
Schema - KeyValue | ✅ | ❌ | ❌ | ❌ |
Schema - AutoSchema | ✅ | ❌ | ❌ | ❌ |
Scehma - Protobuf Native | ✅ | ❌ | ❌ | ❌ |
e-2-e encryption | ✅ | ❌ | ✅ | ✅ |
maxMessageRetries | ✅ | ❌ | ❌ | ✅ |
dead-letter policy | ✅ | ❌ | ❌ | ✅ |
SubscriptionName | ✅ | ✅ | ✅ | ✅ |
SubscriptionType | ✅ | ✅ | ✅ | ✅ |
SubscriptionInitialPosition | ✅ | ❌ | ✅ | ✅ |
AutoAck | ✅ | ✅ | ✅ | ✅ |
Output Arguments
Output | Java | Go(Pulsar) | Python | NodeJs |
---|---|---|---|---|
Custom SerDe | ✅ | ❌ | ✅ | ✅ |
Schema - Avro | ✅ | ❌ | ✅ | ✅ |
Schema - JSON | ✅ | ❌ | ✅ | ✅ |
Schema - Protobuf | ✅ | ❌ | ❌ | ❌ |
Schema - KeyValue | ✅ | ❌ | ❌ | ❌ |
Schema - AutoSchema | ✅ | ❌ | ❌ | ❌ |
Schema - Protobuf Native | ✅ | ❌ | ❌ | ❌ |
useThreadLocalProducers | ✅ | ❌ | ❌ | ✅ |
Key-based Batcher | ✅ | ✅ | ✅ | ✅ |
e-2-e encryption | ✅ | ❌ | ✅ | ✅ |
Compression | ✅ | ✅ | ✅ | ✅ |
Context
Context | Java | Go(Pulsar) | Python | NodeJs |
---|---|---|---|---|
InputTopics | ✅ | ✅ | ✅ | ✅ |
OutputTopic | ✅ | ✅ | ✅ | ✅ |
CurrentRecord | ✅ | ✅ | ✅ | ✅ |
OutputSchemaType | ✅ | ❌ | ✅ | ✅ |
Tenant | ✅ | ✅ | ✅ | ✅ |
Namespace | ✅ | ✅ | ✅ | ✅ |
FunctionName | ✅ | ✅ | ✅ | ✅ |
FunctionId | ✅ | ✅ | ✅ | ✅ |
InstanceId | ✅ | ✅ | ✅ | ✅ |
NumInstances | ✅ | ❌ | ✅ | ✅ |
FunctionVersion | ✅ | ✅ | ✅ | ✅ |
PulsarAdminClient | ✅ | ❌ | ❌ | ❌ |
GetLogger | ✅ | ❌ | ✅ | ✅ |
RecordMetrics | ✅ | ✅ | ✅ | ❌ |
UserConfig | ✅ | ✅ | ✅ | ✅ |
Secrets | ✅ | ❌ | ✅ | ✅ |
State | ✅ | ❌ | ❌ | ✅ |
Publish | ✅ | ✅ | ✅ | ✅ |
ConsumerBuilder | ✅ | ❌ | ❌ | ❌ |
Seek / Pause / Resume | ✅ | ❌ | ❌ | ❌ |
PulsarClient | ✅ | ❌ | ❌ | ❌ |
Other
Other | Java | Go(Pulsar) | Python | NodeJs |
---|---|---|---|---|
Resources | ✅ | ✅ | ✅ | ✅ |
At-most-once | ✅ | ✅ | ✅ | ✅ |
At-least-once | ✅ | ✅ | ✅ | ✅ |
Effectively-once | ✅ | ❌ | ✅ | ❌ |
Package
You can either provide a single .js
file or package your function into a .zip
file when creating NodeJs functions.
A zip file should contain:
- the
pacakge.json
file - other source code file
Like:
Archive: pad.zip
Length Method Size Cmpr Date Time CRC-32 Name
-------- ------ ------- ---- ---------- ----- -------- ----
107 Defl:N 89 17% 2024-03-01 01:07 918cdb62 index.js
94 Defl:N 78 17% 2024-03-01 01:07 2ed1db00 package.json
-------- ------- --- -------
201 167 17% 2 files
Below is an example of the package.json
:
{
"name": "test",
"main": "index.js",
"dependencies": {
"left-pad": "1.3.0"
}
}
The "main" file is required for such package.json
:
function process(message) {
const leftPad = require('left-pad')
return leftPad(message, 20, 'x')
}
The StreamNative Cloud will install dependencies specified in the package.json
and handle your function.
Deploy
After creating a cluster, set up your environment and develop&package your function, you can use the snctl
, pulsarctl
, pulsar-admin
command, the REST API, or terraform
to deploy a Pulsar function to your cluster.
You can create a NodeJs Pulsar function by using a local .js
or .zip
file or an uploaded Pulsar functions package(recommend).
(Optional) Upload your function file to Pulsar
It's recommend to upload your function file to Pulsar before you create a function. Since you can add a version suffix to the package.
Upload packages
snctl pulsar admin packages upload function://public/default/[email protected] \
--path exclamation-1.0-SNAPSHOT.jar \
--description "exclamation function" \
--properties fileName=exclamation-1.0-SNAPSHOT.jar
You should see the following output:
The package 'function://public/default/[email protected]' uploaded from path 'examples/api-examples.jar' successfully
Create
snctl functions create \
--tenant public \
--namespace default \
--name function1 \
--inputs persistent://public/default/test-node-input \
--output persistent://public/default/test-node-output \
--classname exclamation \
--py function://public/default/[email protected] \
--custom-runtime-options '{"genericKind": "nodejs"}'
--use-service-account
Note
Since Pulsar doesn't support NodeJs runtime, we need to use --py
to specify the function file and specify the --custom-runtime-options '{"genericKind": "nodejs"}'
to make it work.
You should see something like this:
Created function1 successfully
For details about Pulsar function configurations, see Pulsar function configurations.
What’s next?
- Learn how to develop WASM functions.
- Learn how to manage functions.
- Learn how to monitor functions.
- Reference common configurations.