Develop
StreamNative supports all features for the Python functions, please refer to: Develop Functions to learn how to develop a Python Function.Package
Please refer to: Pacakge Python FunctionsDeploy
After creating a cluster, set up your environment and develop&package your function, you can use thesnctl, pulsarctl, pulsar-admin command, the REST API, or terraform to deploy a Pulsar function to your cluster.
You can create a Python Pulsar function by using a local .py, .pip, or .zip file or an uploaded Pulsar functions package(recommend).
(Optional) Upload your function file to Pulsar
It’s recommend to upload your function file to Pulsar before you create a function. Since you can add a version suffix to the package.- snctl
- Pulsarctl
- Pulsar-admin
Upload packagesYou should see the following output:
Copy
Ask AI
snctl pulsar admin packages upload function://public/default/[email protected] \
--path exclamation-1.0-SNAPSHOT.jar \
--description "exclamation function" \
--properties fileName=exclamation-1.0-SNAPSHOT.jar
Copy
Ask AI
The package 'function://public/default/[email protected]' uploaded from path 'examples/api-examples.jar' successfully
You need to set the context for Pulsarctl first:Upload packagesYou should see the following output:
Copy
Ask AI
# create a context
pulsarctl context set ${context-name} \
--admin-service-url ${admin-service-url} \
--issuer-endpoint ${issuerUrl} \
--audience urn:sn:pulsar:${orgName}:${instanceName} \
--key-file ${privateKey}
# activate oauth2
pulsarctl oauth2 activate
Replace the placeholder variables with the actual values that you can get when setting up client tools.
context-name: any name you wantadmin-service-url: the HTTP service URL of your Pulsar cluster.privateKey: the path to the downloaded OAuth2 key file.issuerUrl: the URL of the OAuth2 issuer.audience: the Uniform Resource Name (URN), which is a combination of theurn:sn:pulsar, your organization name, and your Pulsar instance name.${orgName}: the name of your organization.${instanceName}: the name of your instance.
Copy
Ask AI
pulsarctl packages upload function://public/default/[email protected] \
--path exclamation-1.0-SNAPSHOT.jar \
--description "exclamation function" \
--properties fileName=exclamation-1.0-SNAPSHOT.jar
Copy
Ask AI
The package 'function://public/default/[email protected]' uploaded from path 'examples/api-examples.jar' successfully
Copy
Ask AI
./bin/pulsar-admin \
--admin-url "${WEB_SERVICE_URL}" \
--auth-plugin org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2 \
--auth-params '{"privateKey":"file://${privateKey}","issuerUrl":"${issuerUrl}","audience":"urn:sn:pulsar:${orgName}:${instanceName}}' \
packages upload function://public/default/[email protected] \
--path exclamation.py \
--description "exclamation function" \
--properties fileName=exclamation.py
Replace the placeholder variables with the actual values that you can get when setting up client tools.
admin-url: the HTTP service URL of your Pulsar cluster.privateKey: the path to the downloaded OAuth2 key file.issuerUrl: the URL of the OAuth2 issuer.audience: the Uniform Resource Name (URN), which is a combination of theurn:sn:pulsar, your organization name, and your Pulsar instance name.${orgName}: the name of your organization.${instanceName}: the name of your instance.
Copy
Ask AI
The package 'function://public/default/[email protected]' uploaded from path 'examples/api-examples.jar' successfully
Create
- snctl
- Pulsarctl
- Pulsar-admin
- Terraform
- REST API
Copy
Ask AI
snctl pulsar admin functions create \
--tenant public \
--namespace default \
--name function1 \
--inputs persistent://public/default/test-python-input \
--output persistent://public/default/test-python-output \
--classname exclamation.ExclamationFunction \
--py function://public/default/[email protected]
--use-service-account
Copy
Ask AI
Created function1 successfully
Copy
Ask AI
pulsarctl functions create \
--tenant public \
--namespace default \
--name function1 \
--inputs persistent://public/default/test-python-input \
--output persistent://public/default/test-python-output \
--classname exclamation.ExclamationFunction \
--py function://public/default/[email protected]
Copy
Ask AI
Created function1 successfully
Copy
Ask AI
./bin/pulsar-admin \
--admin-url "${WEB_SERVICE_URL}" \
--auth-plugin org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2 \
--auth-params '{"privateKey":"file:///YOUR-KEY-FILE-PATH","issuerUrl":"https://auth.streamnative.cloud/","audience":"urn:sn:pulsar:${orgName}:${instanceName}}' \
functions create \
--tenant public \
--namespace default \
--name function1 \
--inputs persistent://public/default/test-python-input \
--output persistent://public/default/test-python-output \
--classname exclamation.ExclamationFunction \
--py function://public/default/[email protected]
Copy
Ask AI
Created successfully
Create your terraform yaml file:Init the terraform provider in the same dir of your You should see something like this:Create the function:You should see something like:After enter “yes”, you should see the following:
Copy
Ask AI
terraform {
required_providers {
pulsar = {
version = "0.2.0"
source = "registry.terraform.io/streamnative/pulsar"
}
}
}
provider "pulsar" {
web_service_url = "{$admin-url}"
api_version = "3"
audience = "urn:sn:pulsar:${orgName}:${instanceName}}"
issuer_url = "${issuerUrl}"
key_file_path = "${privateKey}"
}
// Note: function resource requires v3 api.
resource "pulsar_function" "function-1" {
provider = pulsar
name = "function1"
tenant = "public"
namespace = "default"
parallelism = 1
processing_guarantees = "ATLEAST_ONCE"
py = "function://public/default/[email protected]"
classname = "exclamation.ExclamationFunction"
inputs = ["persistent://public/default/test-python-input"]
output = "persistent://public/default/test-python-output"
subscription_name = "test-sub"
subscription_position = "Latest"
cleanup_subscription = true
skip_to_latest = true
forward_source_message_property = true
retain_key_ordering = true
auto_ack = true
max_message_retries = 100
dead_letter_topic = "public/default/dlt"
log_topic = "public/default/lt"
timeout_ms = 6666
secrets = jsonencode(
{
"SECRET1": {
"path": "sectest",
"key": "hello"
}
})
custom_runtime_options = jsonencode(
{
"env": {
"HELLO": "WORLD"
}
})
}
.tf file if you haven’t done it:Copy
Ask AI
terraform init
Copy
Ask AI
Initializing the backend...
Initializing provider plugins...
- Finding streamnative/pulsar versions matching "0.2.0"...
- Installing streamnative/pulsar v0.2.0...
- Installed streamnative/pulsar v0.2.0 (self-signed, key ID 3105E1011F3C3671)
Partner and community providers are signed by their developers.
If you'd like to know more about provider signing, you can read about it here:
https://www.terraform.io/docs/cli/plugins/signing.html
Terraform has created a lock file .terraform.lock.hcl to record the provider
selections it made above. Include this file in your version control repository
so that Terraform can guarantee to make the same selections by default when
you run "terraform init" in the future.
Terraform has been successfully initialized!
You may now begin working with Terraform. Try running "terraform plan" to see
any changes that are required for your infrastructure. All Terraform commands
should now work.
If you ever set or change modules or backend configuration for Terraform,
rerun this command to reinitialize your working directory. If you forget, other
commands will detect it and remind you to do so if necessary.
Copy
Ask AI
terraform apply
Copy
Ask AI
Terraform used the selected providers to generate the following execution plan. Resource actions are indicated with the following symbols:
+ create
Terraform will perform the following actions:
# pulsar_function.function-1 will be created
+ resource "pulsar_function" "function-1" {
+ auto_ack = true
+ classname = "exclamation.ExclamationFunction"
+ cleanup_subscription = true
+ cpu = 0.5
+ custom_runtime_options = jsonencode(
{
+ env = {
+ HELLO = "WORLD"
}
}
)
+ dead_letter_topic = "public/default/dlt"
+ disk_mb = 128
+ forward_source_message_property = true
+ id = (known after apply)
+ inputs = [
+ "persistent://public/default/test-python-input",
]
+ py = "function://public/default/[email protected]"
+ log_topic = "public/default/lt"
+ max_message_retries = 100
+ name = "function1"
+ namespace = "default"
+ output = "persistent://public/default/test-python-output"
+ parallelism = 1
+ processing_guarantees = "ATLEAST_ONCE"
+ ram_mb = 128
+ retain_key_ordering = true
+ secrets = jsonencode(
{
+ SECRET1 = {
+ key = "hello"
+ path = "sectest"
}
}
)
+ skip_to_latest = true
+ subscription_name = "test-sub"
+ subscription_position = "Latest"
+ tenant = "public"
+ timeout_ms = 6666
}
Plan: 1 to add, 0 to change, 0 to destroy.
Do you want to perform these actions?
Terraform will perform the actions described above.
Only 'yes' will be accepted to approve.
Enter a value:
Copy
Ask AI
pulsar_function.function-1: Creating...
pulsar_function.function-1: Creation complete after 1s [id=public/default/function1]
Apply complete! Resources: 1 added, 0 changed, 0 destroyed.
Replace the placeholder variables with the actual values that you can get when setting up client tools.
admin-url: the HTTP service URL of your Pulsar cluster.privateKey: the path to the downloaded OAuth2 key file.issuerUrl: the URL of the OAuth2 issuer.audience: the Uniform Resource Name (URN), which is a combination of theurn:sn:pulsar, your organization name, and your Pulsar instance name.${orgName}: the name of your organization.${instanceName}: the name of your instance.
If you would like to create a function configuration using the REST API you can
do so using CURL.You should see something like this:
Copy
Ask AI
curl -X POST ${WEB_SERVICE_URL}/admin/v3/functions/public/default/${FUNCTION_NAME} \
-H 'Authorization: Bearer ${TOKEN}' \
-H "Content-Type: multipart/form-data" \
-F 'functionConfig={"name": "${FUNCTION_NAME}", "tenant": "public", "namespace": "default", "runtime": "PYTHON", "py": "function://public/default/${PACKAGE_NAME}@${PACKAGE_VERSION}", "output": "public/default/output-test", "inputs": ["public/default/input"], "className": "exclamation.ExclamationFunction"};type=application/json' \
-F 'url=function://public/default/${PACKAGE_NAME}@${PACKAGE_VERSION}'
The function is assumed to be already uploaded at this point. If you have not
uploaded the function, change the
url parameter to be your local filepath.
This will look something like the following.Copy
Ask AI
-F 'url=file://$YOUR_LCOAL_PYTHON_FILE'
Copy
Ask AI
Created successfully
Replace the placeholder variables with the actual values that you can get when setting up client tools.
WEB_SERVICE_URL: the HTTPS service URL of your Pulsar cluster.TOKEN: a valid token to interact with your Pulsar cluster.FUNCTION_NAME: the name of your function.FUNCTION_VERSION: the version of your function you want to deploy e.g. 1.12.
What’s next?
- Learn how to develop Golang functions.
- Learn how to develop NodeJs functions.
- Learn how to develop WASM functions.
- Learn how to manage functions.
- Learn how to configure stateful functions.
- Learn how to monitor functions.
- Reference common configurations.