1. Serverless Functions

Work with Pulsar Functions using pulsarctl CLI tool

This document describes how to create, update, and delete a Pulsar function using the pulsarctl CLI tool. For a full list of operations supported by the pulsarctl CLI tool, see pulsarctl command reference.

Note

  • Currently, Functions are only available for BYOC clusters. If you want to try Functions on your Hosted clusters, please contact us.
  • The following sections assume you use OAuth2 authentication.

Prerequisites

Prepare your environment

You need to prepare your environment before you submit a function to your Pulsar cluster.

Prepare service accounts

You need to create two service accounts. One is a normal service account granted with proper permissions (functions, produce, and consume). The other one must have the Super Admin access. The Super Admin service account is used to create a role binding for the normal service account.

Note

Currently, you can't edit a service account. If you need a service account to have Super Admin access, make sure to enable it when creating the service account. By default, service accounts do not have Super Admin enabled.

To create a service account, follow these steps.

  1. On the left navigation pane, click Service Accounts.

  2. Click Create Service Account.

  3. (Optional) Select Super Admin to grant the service account with Super admin access to a namespace or tenant.

  4. Enter a name for the service account, and then click Confirm.

Get OAuth2 credential files of your service accounts

To get an OAuth2 credential file of a service account through the StreamNative Console, follow these steps.

  1. On the left navigation pane, click Service Accounts.

  2. In the row of the service account you want to use, in the Key File column, click the Download icon to download the OAuth2 credential file to your local directory.

    The OAuth2 credential file should be something like this:

    {
      "type": "SN_SERVICE_ACCOUNT",
      "client_id": "CLIENT_ID",
      "client_secret": "CLIENT_SECRET",
      "client_email": "[email protected]",
      "issuer_url": "https://auth.streamnative.cloud"
    }
    

Create a role binding for your normal service account

To grant the underlying infrastructure with access to the newly created service account's OAuth2 key file, you need to create a service account binding via UI.

Go to the Service Accounts tab and choose the service account you want to use for running the connector. Clicking on the right button and there willbe a Edit service account bindings option. Binding Service Account step-1

Click the Edit service account bindings, choose the desired pool member and confirm. Binding Service Account step-2

Now your connector is ready to use the service account in StreamNative environments.

Authorize your normal service account

To grant the service account permissions on the namespace level, follow these steps:

  1. On the left navigation pane, in the Admin section, click Tenants/Namespaces.

  2. On the Tenants page, select your tenant and namespace.

  3. On your Namespace page, select the POLICY tab.

  4. In the Authorization area, click ADD ROLE, and select the service account that you just created in the previous section.

  5. On the drop-down menu below the service account, select the proper permissions to assign to the newly created service account. There are six permissions in total:

  • consume: allow the service account to consume messages.
  • produce: allow the service account to publish messages.
  • functions: allow the service account to submit and manage functions.
  • sinks: allow the service account to create and manage sink connectors.
  • sources: allow the service account to create and manage source connectors.
  • packages: allow the service account to upload and manage pulsar packages. If you want to submit a customized connector, then you will need to upload the connector’s JAR/NAR file first, which requires the packages permission.

Pulsar IO Authorize sa

Create a Pulsar function

After creating a cluster, you can use the pulsarctl functions create command to deploy a Pulsar function to your cluster. You can create a Pulsar function by using a local JAR package or an uploaded Pulsar package.

Create a Pulsar function using a local package

This example shows how to create a Pulsar function under the public tenant and default namespace by specifying a local package.

pulsarctl functions create
--tenant public
--namespace default
--name function1
--inputs persistent://public/default/test-java-input
--output persistent://public/default/test-java-output
--classname org.example.functions.Exclamation
--jar exclamation-1.0-SNAPSHOT.jar

You should see something like this:

Created function1 successfully

For details about Pulsar function configurations, see Pulsar function configurations.

Create a Pulsar function using an uploaded Pulsar package

This example shows how to create a Pulsar function under the public tenant and default namespace by an uploaded Pulsar package.

  1. Upload your JAR package to the Pulsar Pulsar.

    pulsarctl packages upload function://public/default/[email protected]
    --path exclamation-1.0-SNAPSHOT.jar
    --description "Java Exclamation function"
    -P fileName=exclamation-1.0-SNAPSHOT.jar # Required. Otherwise, Pulsar cannot know which format this package is using.
    

    You should see something like this:

    The package 'function://public/default/[email protected]' uploaded from path 'exclamation-1.0-SNAPSHOT.jar' successfully
    
  2. Create a Pulsar function.

    pulsarctl functions create
    --tenant public
    --namespace default
    --name exclamation-from-pkg
    --inputs persistent://public/default/test-java-input
    --output persistent://public/default/test-java-output
    --classname org.example.functions.Exclamation
    --jar function://public/default/[email protected]
    

    You should see something like this:

    Created exclamation-from-pkg successfully
    
  3. (Optional) Get the function status.

    pulsarctl functions status --tenant public --namespace default --name exclamation-from-pkg
    

    You should see something like this:

    {
      "numInstances": 1,
      "numRunning": 1,
      "instances": [
        {
          "instanceId": 0,
          "status": {
            "running": true,
            "error": "",
            "numRestarts": 0,
            "numReceived": 0,
            "numSuccessfullyProcessed": 0,
            "numUserExceptions": 0,
            "latestUserExceptions": [],
            "numSystemExceptions": 0,
            "latestSystemExceptions": [],
            "averageLatency": 0.0,
            "lastInvocationTime": 0,
            "workerId": "sn-platform-pulsar"
          }
        }
      ]
    }
    

For details about Pulsar function configurations, see Pulsar function configurations.

Update a Pulsar function

When you want to update the configuration options of a Pulsar function, you can use the pulsarctl functions update command to delete it.

  • This example shows how to update the output topic of function1 for the default namespace under the public tenant.

    Input

    pulsarctl functions update
    --tenant public
    --namespace default
    --name function1
    --output test-output-topic
    

    Output

    Updated function1 successfully
    
  • This example shows how to update the log topic of function1 for the default namespace under the public tenant.

    Input

    pulsarctl functions update
    --log-topic persistent://public/default/test-log-topic
    // other function parameters
    

    Output

    Updated function1 successfully
    

Delete a Pulsar function

When you want to remove a Pulsar function, you can use the pulsarctl functions delete command to delete it. This example shows how to delete function1 from the default namespace under the public tenant.

Input

pulsarctl functions delete
--tenant public
--namespace default
--name function1

Output

Deleted successfully

Pulsar function configurations

This table lists all fields available for creating a Pulsar function.

FieldDescriptionDefault
auto-ackWhether or not the framework acknowledges messages automatically.true
classnameThe class name of a Pulsar function.
CPUThe CPU in cores that need to be allocated per function instance (applicable only to docker runtime).
custom-runtime-optionsA string that encodes options to customize the runtime.
custom-schema-inputsThe map of input topics to Schema class names (as a JSON string).
custom-serde-inputsThe map of input topics to SerDe class names (as a JSON string).
dead-letter-topicThe topic where all messages that were not processed successfully are sent. This parameter is not supported in Python Functions.
diskThe disk in bytes that need to be allocated per function instance (applicable only to docker runtime).
fqfnThe Fully Qualified Function Name (FQFN) for the function.
function-config-fileThe path to a YAML config file specifying the configuration of a Pulsar function.
goPath to the main Go executable binary for the function (if the function is written in Go). Go Functions are not supported in StreamNative Cloud.
inputsThe input topic or topics of a Pulsar function (multiple topics can be specified as a comma-separated list).
jarPath to the jar file for the function (if the function is written in Java). It also supports URL-path [http/https/file (file protocol assumes that file already exists on worker host)] from which worker can download the package.
log-topicThe topic to which the logs of a Pulsar function are produced.
max-message-retriesHow many times should we try to process a message before giving up.
nameThe name of a Pulsar function.
namespaceThe namespace of a Pulsar function.
outputThe output topic of a Pulsar function (If none is specified, no output is written).
output-serde-classnameThe SerDe class to be used for messages output by the function.
parallelismThe parallelism factor of a Pulsar function (i.e. the number of function instances to run).
processing-guaranteesThe processing guarantees (delivery semantics) applied to the function. Available values: [ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE].ATLEAST_ONCE
pyPath to the main Python file/Python Wheel file for the function (if the function is written in Python).
ramThe ram in bytes that need to be allocated per function instance (applicable only to process/docker runtime).
retain-orderingFunction consumes and processes messages in order.
schema-typeThe builtin schema type or custom schema class name to be used for messages output by the function.<empty string>
sliding-interval-countThe number of messages after which the window slides.
sliding-interval-duration-msThe time duration after which the window slides.
subs-namePulsar source subscription name if user wants a specific subscription-name for the input-topic consumer.
tenantThe tenant of a Pulsar function.
timeout-msThe message timeout in milliseconds.
topics-patternThe topic pattern to consume from a list of topics under a namespace that matches the pattern. [--input] and [--topic-pattern] are mutually exclusive. Add SerDe class name for a pattern in --custom-serde-inputs (only supported in Java Pulsar function).
user-configUser-defined config key/values.
window-length-countThe number of messages per window.
window-length-duration-msThe time duration of the window is milliseconds.
Previous
Functions (Beta)