1. Process Data Streams

Create Functions

After creating a cluster, setup your enviroment and develop&package your function, you can use the pulsarctl, pulsar-admin command or terraform to deploy a Pulsar function to your cluster.

You can create a Pulsar function by using a local JAR / Python package or an uploaded Pulsar functions package(recommend).

(Optional) Upload your function file to Pulsar

It's recommend to upload your function file to Pulsar before you create a function. Since you can add a version suffix to the package.

You need to set the context for Pulsarctl first:

# create a context
pulsarctl context set ${context-name}  \
--admin-service-url ${admin-service-url} \
--issuer-endpoint ${issuerUrl} \
--audience urn:sn:pulsar:${orgName}:${instanceName} \
--key-file ${privateKey}

# activate oauth2
pulsarctl oauth2 activate

Note

Replace the placeholder variables with the actual values that you can get when setting up client tools.

  • context-name: any name you want
  • admin-service-url: the HTTP service URL of your Pulsar cluster.
  • privateKey: the path to the downloaded OAuth2 key file.
  • issuerUrl: the URL of the OAuth2 issuer.
  • audience: the Uniform Resource Name (URN), which is a combination of the urn:sn:pulsar, your organization name, and your Pulsar instance name.

Upload packages

pulsarctl packages upload function://public/default/[email protected] \
--path exclamation-1.0-SNAPSHOT.jar \
--description "exclamation function" \
--properties fileName=exclamation-1.0-SNAPSHOT.jar

You should see the following output:

The package 'function://public/default/[email protected]' uploaded from path 'examples/api-examples.jar' successfully

Create a Java Pulsar function

This example shows how to create a Java Pulsar function under the public tenant and default namespace.

Below is the code of the Java Pulsar function:

package org.apache.pulsar.functions.api.examples;

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

/**
 * The classic Exclamation Function that appends an exclamation at the end
 * of the input.
 */
public class ExclamationFunction implements Function<String, String> {
    @Override
    public String process(String input, Context context) {
        return String.format("%s!", input);
    }
}

You need to package it as a .jar or .nar file and upload it to Pulsar first.

pulsarctl functions create \
--tenant public \
--namespace default \
--name function1 \
--inputs persistent://public/default/test-java-input \
--output persistent://public/default/test-java-output \
--classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
--jar function://public/default/[email protected]

You should see something like this:

Created function1 successfully

For details about Pulsar function configurations, see Pulsar function configurations.

Create a Python Pulsar function.

This example shows how to create a Python Pulsar function under the public tenant and default namespace.

Below is the code of the Python function:

from pulsar import Function

# The classic ExclamationFunction that appends an exclamation at the end
# of the input
class ExclamationFunction(Function):
  def __init__(self):
    pass

  def process(self, input, context):
    return input + '!'
pulsarctl functions create \
--tenant public \
--namespace default \
--name function1 \
--inputs persistent://public/default/test-python-input \
--output persistent://public/default/test-python-output \
--classname exclamation.ExclamationFunction \
--py function://public/default/[email protected]

You should see something like this:

Created function1 successfully

For details about Pulsar function configurations, see Pulsar function configurations.

What’s next?

Previous
Package Functions