- Process Data Streams
Create Functions
After creating a cluster, setup your enviroment and develop&package your function, you can use the pulsarctl
, pulsar-admin
command or terraform
to deploy a Pulsar function to your cluster.
You can create a Pulsar function by using a local JAR / Python package or an uploaded Pulsar functions package(recommend).
(Optional) Upload your function file to Pulsar
It's recommend to upload your function file to Pulsar before you create a function. Since you can add a version suffix to the package.
You need to set the context for Pulsarctl first:
# create a context
pulsarctl context set ${context-name} \
--admin-service-url ${admin-service-url} \
--issuer-endpoint ${issuerUrl} \
--audience urn:sn:pulsar:${orgName}:${instanceName} \
--key-file ${privateKey}
# activate oauth2
pulsarctl oauth2 activate
Note
Replace the placeholder variables with the actual values that you can get when setting up client tools.
context-name
: any name you wantadmin-service-url
: the HTTP service URL of your Pulsar cluster.privateKey
: the path to the downloaded OAuth2 key file.issuerUrl
: the URL of the OAuth2 issuer.audience
: the Uniform Resource Name (URN), which is a combination of theurn:sn:pulsar
, your organization name, and your Pulsar instance name.${orgName}
: the name of your organization.${instanceName}
: the name of your instance.
Upload packages
pulsarctl packages upload function://public/default/[email protected] \
--path exclamation-1.0-SNAPSHOT.jar \
--description "exclamation function" \
--properties fileName=exclamation-1.0-SNAPSHOT.jar
You should see the following output:
The package 'function://public/default/[email protected]' uploaded from path 'examples/api-examples.jar' successfully
Create a Java Pulsar function
This example shows how to create a Java Pulsar function under the public
tenant and default
namespace.
Below is the code of the Java Pulsar function:
package org.apache.pulsar.functions.api.examples;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
/**
* The classic Exclamation Function that appends an exclamation at the end
* of the input.
*/
public class ExclamationFunction implements Function<String, String> {
@Override
public String process(String input, Context context) {
return String.format("%s!", input);
}
}
You need to package it as a .jar
or .nar
file and upload it to Pulsar first.
pulsarctl functions create \
--tenant public \
--namespace default \
--name function1 \
--inputs persistent://public/default/test-java-input \
--output persistent://public/default/test-java-output \
--classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
--jar function://public/default/[email protected]
You should see something like this:
Created function1 successfully
For details about Pulsar function configurations, see Pulsar function configurations.
Create a Python Pulsar function.
This example shows how to create a Python Pulsar function under the public
tenant and default
namespace.
Below is the code of the Python function:
from pulsar import Function
# The classic ExclamationFunction that appends an exclamation at the end
# of the input
class ExclamationFunction(Function):
def __init__(self):
pass
def process(self, input, context):
return input + '!'
pulsarctl functions create \
--tenant public \
--namespace default \
--name function1 \
--inputs persistent://public/default/test-python-input \
--output persistent://public/default/test-python-output \
--classname exclamation.ExclamationFunction \
--py function://public/default/[email protected]
You should see something like this:
Created function1 successfully
For details about Pulsar function configurations, see Pulsar function configurations.
What’s next?
- Learn how to manage functions.
- Learn how to monitor functions.
- Reference common configurations.