1. Process Data Streams

Configuration Reference

Pulsar function configurations

This table lists all fields available for creating a Pulsar function.

FieldDescriptionDefault
auto-ackWhether or not the framework acknowledges messages automatically.true
classnameThe class name of a Pulsar function.
CPUThe CPU in cores that need to be allocated per function instance (applicable only to docker runtime).
custom-runtime-optionsA string that encodes options to customize the runtime.
custom-schema-inputsThe map of input topics to Schema class names (as a JSON string).
custom-serde-inputsThe map of input topics to SerDe class names (as a JSON string).
dead-letter-topicThe topic where all messages that were not processed successfully are sent. This parameter is not supported in Python Functions.
diskThe disk in bytes that need to be allocated per function instance (applicable only to docker runtime).
fqfnThe Fully Qualified Function Name (FQFN) for the function.
function-config-fileThe path to a YAML config file specifying the configuration of a Pulsar function.
goPath to the main Go executable binary for the function (if the function is written in Go). Go Functions are not supported in StreamNative Cloud.
inputsThe input topic or topics of a Pulsar function (multiple topics can be specified as a comma-separated list).
jarPath to the jar file for the function (if the function is written in Java). It also supports URL-path [http/https/file (file protocol assumes that file already exists on worker host)] from which worker can download the package.
log-topicThe topic to which the logs of a Pulsar function are produced.
max-message-retriesHow many times should we try to process a message before giving up.
nameThe name of a Pulsar function.
namespaceThe namespace of a Pulsar function.
outputThe output topic of a Pulsar function (If none is specified, no output is written).
output-serde-classnameThe SerDe class to be used for messages output by the function.
parallelismThe parallelism factor of a Pulsar function (i.e. the number of function instances to run).
processing-guaranteesThe processing guarantees (delivery semantics) applied to the function. Available values: [ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE].ATLEAST_ONCE
pyPath to the main Python file/Python Wheel file for the function (if the function is written in Python).
ramThe ram in bytes that need to be allocated per function instance (applicable only to process/docker runtime).
retain-orderingFunction consumes and processes messages in order.
schema-typeThe builtin schema type or custom schema class name to be used for messages output by the function.<empty string>
sliding-interval-countThe number of messages after which the window slides.
sliding-interval-duration-msThe time duration after which the window slides.
subs-namePulsar source subscription name if user wants a specific subscription-name for the input-topic consumer.
tenantThe tenant of a Pulsar function.
timeout-msThe message timeout in milliseconds.
topics-patternThe topic pattern to consume from a list of topics under a namespace that matches the pattern. [--input] and [--topic-pattern] are mutually exclusive. Add SerDe class name for a pattern in --custom-serde-inputs (only supported in Java Pulsar function).
user-configUser-defined config key/values.
window-length-countThe number of messages per window.
window-length-duration-msThe time duration of the window is milliseconds.

StreamNative Cloud custom runtime options

To facilitate submitting Pulsar functions based on your requirements, Function on Cloud service provides some custom options via custom-runtime-options.

This table lists all fields available for custom options.

NameTypeDefaultDescription
clusterNameStringN/AThe Pulsar cluster of a Pulsar function, source, or sink.
inputTypeClassNameString[BThe map of input topics to Java class names.
outputTypeClassNameString[BThe map of output topics to Java class names.
maxReplicasInteger0The maximum number of Pulsar instances that you want to run for this Pulsar Function. When the value of the maxReplicas parameter is greater than the value of replicas, it indicates that the Functions controller automatically scales the Pulsar Functions based on the CPU usage. By default, maxReplicas is set to 0, which indicates that auto-scaling is disabled.
envMap < String, String >N/AThe environment variables being attached to a Pod that is created by the Function Mesh Operator for the cluster.
imagePullSecretsList < String >N/AA list of references to secrets in the same namespace for pulling any of the images used by a Pod.
logLevelStringinfoThe log levels for Pulsar functions. For details, see log levels.
logRotationPolicyStringN/AThe log rotation policies for Pulsar functions. You can set the log rotation policies based on the time or the log file size. For details, see log rotation policies.
runnerImageTagStringN/AThe tag of the runner image that is used to submit a function, source, or sink.
hpaSpec (Preview)HPASpecN/AThe Kubernetes HorizontalPodAutoscaler settings. For details, see Kubernetes documentation. This feature may not ready for your cluster enviroment, please file a ticket if you want this feature enabled.
logFormatStringtextThe log format that defines how the content of a log file should be interpreted. Available options are json and text. The log format configurations are only available for the Java and Python runtimes.
logTopicStringN/Aused for sinks/sources since they don't have a log topic argument like functions
logTopicAgentStringruntimeThe log agent that defines how StreamNative cloud redirects your functions / connectors log into a Pulsar topic. Available options are runtime and sidecar. When use sidecar all logs (including instance logs) will be sent to the log topic by filebeat (better performance than runtime), else will use the Pulsar Functions runtime's log-topic implementation instead.

User could compose the custom runtime options as a JSON string and pass it to custom-runtime-options field. For example:

{
  "inputTypeClassName": "java.lang.String",
  "outputTypeClassName": "java.lang.String",
  "maxReplicas": 0
}

Then it could be passed to custom-runtime-options field as follows:

pulsarctl functions create --custom-runtime-options '{"inputTypeClassName":"java.lang.String","outputTypeClassName":"java.lang.String","maxReplicas":0}' ...
Previous
Monitor and Troubleshoot