1. Process Data Streams
  2. Develop Functions

Develop Pulsar Functions in NodeJs(Private Preview)

This section introduces how to develop and pacakge NodeJs Pulsar functions to use on StreamNative cloud.

The NodeJs runtime is still in private preview stage, If you want to try it out or have any questions, please submit a ticket to the support team.

Develop

The StreamNative cloud will find the process method from the NodeJs function file, below is an example:

async function process(message, context) {
  // 1. Get the logger to send logs to a Pulsar topic(when log-topic is set)
  const logger = context.getLogger()
  for (let word of message.split(' ')) {
    // 2. Incre and Get counter from state store
    await context.incrementCounter(word, 1)
    let count = await context.getCounter(word)
    logger.info(`got word: ${word} for ${count['value']} times`)
  }
  // 3. Get user-defined configurations
  let cfg = context.getUserConfigValue('configKey')
  // 4. Get secret configurations
  let sec = context.getSecret('secretKey')
  let result = `config: ${cfg}, secret: ${sec}`
  // 5. Publish messages to any Pulsar topic
  await context.publish(
    'persistent://public/default/test-node-package-serde-extra',
    result
  )
  // 6. Return the processing result
  return message.concat('!')
}

It can also handle Avro and Json schema record, for Avro scheme input and output:

function process(params) {
  params['grade']['int'] = params['grade']['int'] + 1
  return params
}

const definitions = {
  name: 'Student',
  type: 'record',
  fields: [
    { name: 'name', type: ['null', 'string'] },
    { name: 'age', type: ['null', 'int'] },
    { name: 'grade', type: ['null', 'int'] },
  ],
}

module.exports.definitions = definitions

For Json input and output:

function process(params) {
  params['grade'] = params['grade'] + 1
  return params
}

To get more examples, please refer to examples

Feature Matrix

The NodeJs runtime doesn't support full features comparing to Java runtime, and it's still in developing, below is the matrix:

Input Arguments

InputJavaGo(Pulsar)PythonNodeJs
Custom SerDe
Schema - Avro
Schema - JSON
Schema - Protobuf
Schema - KeyValue
Schema - AutoSchema
Scehma - Protobuf Native
e-2-e encryption
maxMessageRetries
dead-letter policy
SubscriptionName
SubscriptionType
SubscriptionInitialPosition
AutoAck

Output Arguments

OutputJavaGo(Pulsar)PythonNodeJs
Custom SerDe
Schema - Avro
Schema - JSON
Schema - Protobuf
Schema - KeyValue
Schema - AutoSchema
Schema - Protobuf Native
useThreadLocalProducers
Key-based Batcher
e-2-e encryption
Compression

Context

ContextJavaGo(Pulsar)PythonNodeJs
InputTopics
OutputTopic
CurrentRecord
OutputSchemaType
Tenant
Namespace
FunctionName
FunctionId
InstanceId
NumInstances
FunctionVersion
PulsarAdminClient
GetLogger
RecordMetrics
UserConfig
Secrets
State
Publish
ConsumerBuilder
Seek / Pause / Resume
PulsarClient

Other

OtherJavaGo(Pulsar)PythonNodeJs
Resources
At-most-once
At-least-once
Effectively-once

Package

You can either provide a single .js file or package your function into a .zip file when creating NodeJs functions.

A zip file should contain:

  1. the pacakge.json file
  2. other source code file

Like:

Archive:  pad.zip
 Length   Method    Size  Cmpr    Date    Time   CRC-32   Name
--------  ------  ------- ---- ---------- ----- --------  ----
     107  Defl:N       89  17% 2024-03-01 01:07 918cdb62  index.js
      94  Defl:N       78  17% 2024-03-01 01:07 2ed1db00  package.json
--------          -------  ---                            -------
     201              167  17%                            2 files

Below is an example of the package.json:

{
  "name": "test",
  "main": "index.js",
  "dependencies": {
    "left-pad": "1.3.0"
  }
}

The "main" file is required for such package.json:

function process(message) {
  const leftPad = require('left-pad')
  return leftPad(message, 20, 'x')
}

The StreamNative Cloud will install dependencies specified in the package.json and handle your function.

Deploy

After creating a cluster, set up your environment and develop&package your function, you can use the pulsarctl, pulsar-admin command, the REST API, or terraform to deploy a Pulsar function to your cluster.

You can create a NodeJs Pulsar function by using a local .js or .zip file or an uploaded Pulsar functions package(recommend).

(Optional) Upload your function file to Pulsar

It's recommend to upload your function file to Pulsar before you create a function. Since you can add a version suffix to the package.

You need to set the context for Pulsarctl first:

# create a context
pulsarctl context set ${context-name}  \
--admin-service-url ${admin-service-url} \
--issuer-endpoint ${issuerUrl} \
--audience urn:sn:pulsar:${orgName}:${instanceName} \
--key-file ${privateKey}

# activate oauth2
pulsarctl oauth2 activate

Note

Replace the placeholder variables with the actual values that you can get when setting up client tools.

  • context-name: any name you want
  • admin-service-url: the HTTP service URL of your Pulsar cluster.
  • privateKey: the path to the downloaded OAuth2 key file.
  • issuerUrl: the URL of the OAuth2 issuer.
  • audience: the Uniform Resource Name (URN), which is a combination of the urn:sn:pulsar, your organization name, and your Pulsar instance name.

Upload packages

pulsarctl packages upload function://public/default/[email protected] \
--path exclamation-1.0-SNAPSHOT.jar \
--description "exclamation function" \
--properties fileName=exclamation-1.0-SNAPSHOT.jar

You should see the following output:

The package 'function://public/default/[email protected]' uploaded from path 'examples/api-examples.jar' successfully

Create

pulsarctl functions create \
--tenant public \
--namespace default \
--name function1 \
--inputs persistent://public/default/test-node-input \
--output persistent://public/default/test-node-output \
--classname exclamation \
--py function://public/default/[email protected] \
--custom-runtime-options '{"genericKind": "nodejs"}'

Note

Since Pulsar doesn't support NodeJs runtime, we need to use --py to specify the function file and specify the --custom-runtime-options '{"genericKind": "nodejs"}' to make it work.

You should see something like this:

Created function1 successfully

For details about Pulsar function configurations, see Pulsar function configurations.

What’s next?

Previous
Develop Golang Functions