- Serverless Functions
Work with Pulsar Functions using pulsarctl CLI tool
This document describes how to create, update, and delete a Pulsar function using the pulsarctl
CLI tool. For a full list of operations supported by the pulsarctl
CLI tool, see pulsarctl command reference.
Note
- Currently, Functions are only available for BYOC clusters. If you want to try Functions on your Hosted clusters, please contact us.
- The following sections assume you use OAuth2 authentication.
Prerequisites
- Install and configure the
snctl
CLI tool. - Install the
pulsarctl
CLI tool. - Log in to StreamNative Cloud Console.
- Create a Pulsar cluster and connect to your Pulsar cluster using the
pulsarctl
CLI tool. - Create a tenant and a namespace.
Prepare your environment
You need to prepare your environment before you submit a function to your Pulsar cluster.
Prepare service accounts
You need to create two service accounts. One is a normal service account granted with proper permissions (functions
, produce
, and consume
). The other one must have the Super Admin access. The Super Admin service account is used to create a role binding for the normal service account.
Note
Currently, you can't edit a service account. If you need a service account to have Super Admin access, make sure to enable it when creating the service account. By default, service accounts do not have Super Admin enabled.
To create a service account, follow these steps.
On the left navigation pane, click Service Accounts.
Click Create Service Account.
(Optional) Select Super Admin to grant the service account with Super admin access to a namespace or tenant.
Enter a name for the service account, and then click Confirm.
Get OAuth2 credential files of your service accounts
To get an OAuth2 credential file of a service account through the StreamNative Console, follow these steps.
On the left navigation pane, click Service Accounts.
In the row of the service account you want to use, in the Key File column, click the Download icon to download the OAuth2 credential file to your local directory.
The OAuth2 credential file should be something like this:
{ "type": "SN_SERVICE_ACCOUNT", "client_id": "CLIENT_ID", "client_secret": "CLIENT_SECRET", "client_email": "[email protected]", "issuer_url": "https://auth.streamnative.cloud" }
Create a role binding for your normal service account
To grant the underlying infrastructure with access to the newly created service account's OAuth2 key file, you need to create a service account binding via UI.
Go to the Service Accounts
tab and choose the service account you want to use for running the connector. Clicking on the right button and there willbe a Edit service account bindings
option.
Click the Edit service account bindings
, choose the desired pool member and confirm.
Now your connector is ready to use the service account in StreamNative environments.
Authorize your normal service account
To grant the service account permissions on the namespace level, follow these steps:
On the left navigation pane, in the Admin section, click Tenants/Namespaces.
On the Tenants page, select your tenant and namespace.
On your Namespace page, select the POLICY tab.
In the Authorization area, click ADD ROLE, and select the service account that you just created in the previous section.
On the drop-down menu below the service account, select the proper permissions to assign to the newly created service account. There are six permissions in total:
consume
: allow the service account to consume messages.produce
: allow the service account to publish messages.functions
: allow the service account to submit and manage functions.sinks
: allow the service account to create and manage sink connectors.sources
: allow the service account to create and manage source connectors.packages
: allow the service account to upload and manage pulsar packages. If you want to submit a customized connector, then you will need to upload the connector’s JAR/NAR file first, which requires thepackages
permission.
Create a Pulsar function
After creating a cluster, you can use the pulsarctl functions create
command to deploy a Pulsar function to your cluster. You can create a Pulsar function by using a local JAR package or an uploaded Pulsar package.
Create a Pulsar function using a local package
This example shows how to create a Pulsar function under the public
tenant and default
namespace by specifying a local package.
pulsarctl functions create
--tenant public
--namespace default
--name function1
--inputs persistent://public/default/test-java-input
--output persistent://public/default/test-java-output
--classname org.example.functions.Exclamation
--jar exclamation-1.0-SNAPSHOT.jar
You should see something like this:
Created function1 successfully
For details about Pulsar function configurations, see Pulsar function configurations.
Create a Pulsar function using an uploaded Pulsar package
This example shows how to create a Pulsar function under the public
tenant and default
namespace by an uploaded Pulsar package.
Upload your JAR package to the Pulsar Pulsar.
pulsarctl packages upload function://public/default/[email protected] --path exclamation-1.0-SNAPSHOT.jar --description "Java Exclamation function" -P fileName=exclamation-1.0-SNAPSHOT.jar # Required. Otherwise, Pulsar cannot know which format this package is using.
You should see something like this:
The package 'function://public/default/[email protected]' uploaded from path 'exclamation-1.0-SNAPSHOT.jar' successfully
Create a Pulsar function.
pulsarctl functions create --tenant public --namespace default --name exclamation-from-pkg --inputs persistent://public/default/test-java-input --output persistent://public/default/test-java-output --classname org.example.functions.Exclamation --jar function://public/default/[email protected]
You should see something like this:
Created exclamation-from-pkg successfully
(Optional) Get the function status.
pulsarctl functions status --tenant public --namespace default --name exclamation-from-pkg
You should see something like this:
{ "numInstances": 1, "numRunning": 1, "instances": [ { "instanceId": 0, "status": { "running": true, "error": "", "numRestarts": 0, "numReceived": 0, "numSuccessfullyProcessed": 0, "numUserExceptions": 0, "latestUserExceptions": [], "numSystemExceptions": 0, "latestSystemExceptions": [], "averageLatency": 0.0, "lastInvocationTime": 0, "workerId": "sn-platform-pulsar" } } ] }
For details about Pulsar function configurations, see Pulsar function configurations.
Update a Pulsar function
When you want to update the configuration options of a Pulsar function, you can use the pulsarctl functions update
command to delete it.
This example shows how to update the output topic of
function1
for thedefault
namespace under thepublic
tenant.Input
pulsarctl functions update --tenant public --namespace default --name function1 --output test-output-topic
Output
Updated function1 successfully
This example shows how to update the log topic of
function1
for thedefault
namespace under thepublic
tenant.Input
pulsarctl functions update --log-topic persistent://public/default/test-log-topic // other function parameters
Output
Updated function1 successfully
Delete a Pulsar function
When you want to remove a Pulsar function, you can use the pulsarctl functions delete
command to delete it. This example shows how to delete function1
from the default
namespace under the public
tenant.
Input
pulsarctl functions delete
--tenant public
--namespace default
--name function1
Output
Deleted successfully
Pulsar function configurations
This table lists all fields available for creating a Pulsar function.
Field | Description | Default |
---|---|---|
auto-ack | Whether or not the framework acknowledges messages automatically. | true |
classname | The class name of a Pulsar function. | |
CPU | The CPU in cores that need to be allocated per function instance (applicable only to docker runtime). | |
custom-runtime-options | A string that encodes options to customize the runtime. | |
custom-schema-inputs | The map of input topics to Schema class names (as a JSON string). | |
custom-serde-inputs | The map of input topics to SerDe class names (as a JSON string). | |
dead-letter-topic | The topic where all messages that were not processed successfully are sent. This parameter is not supported in Python Functions. | |
disk | The disk in bytes that need to be allocated per function instance (applicable only to docker runtime). | |
fqfn | The Fully Qualified Function Name (FQFN) for the function. | |
function-config-file | The path to a YAML config file specifying the configuration of a Pulsar function. | |
go | Path to the main Go executable binary for the function (if the function is written in Go). Go Functions are not supported in StreamNative Cloud. | |
inputs | The input topic or topics of a Pulsar function (multiple topics can be specified as a comma-separated list). | |
jar | Path to the jar file for the function (if the function is written in Java). It also supports URL-path [http/https/file (file protocol assumes that file already exists on worker host)] from which worker can download the package. | |
log-topic | The topic to which the logs of a Pulsar function are produced. | |
max-message-retries | How many times should we try to process a message before giving up. | |
name | The name of a Pulsar function. | |
namespace | The namespace of a Pulsar function. | |
output | The output topic of a Pulsar function (If none is specified, no output is written). | |
output-serde-classname | The SerDe class to be used for messages output by the function. | |
parallelism | The parallelism factor of a Pulsar function (i.e. the number of function instances to run). | |
processing-guarantees | The processing guarantees (delivery semantics) applied to the function. Available values: [ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE]. | ATLEAST_ONCE |
py | Path to the main Python file/Python Wheel file for the function (if the function is written in Python). | |
ram | The ram in bytes that need to be allocated per function instance (applicable only to process/docker runtime). | |
retain-ordering | Function consumes and processes messages in order. | |
schema-type | The builtin schema type or custom schema class name to be used for messages output by the function. | <empty string> |
sliding-interval-count | The number of messages after which the window slides. | |
sliding-interval-duration-ms | The time duration after which the window slides. | |
subs-name | Pulsar source subscription name if user wants a specific subscription-name for the input-topic consumer. | |
tenant | The tenant of a Pulsar function. | |
timeout-ms | The message timeout in milliseconds. | |
topics-pattern | The topic pattern to consume from a list of topics under a namespace that matches the pattern. [--input] and [--topic-pattern] are mutually exclusive. Add SerDe class name for a pattern in --custom-serde-inputs (only supported in Java Pulsar function). | |
user-config | User-defined config key/values. | |
window-length-count | The number of messages per window. | |
window-length-duration-ms | The time duration of the window is milliseconds. |