1. Connect to External Systems
  2. Pulsar IO

Configuration Reference

This section lists all the common configuration options for the built-in source and sink connectors. For connector-specific configurations, see StreamNative Hub.

Source connector configurations

This table lists all the common configurations for a source connector.

FieldDescription
-a, --archiveThe path to the NAR archive for the source.
It supports the file-URL-path (file://) which assumes that the NAR file already exists on the worker host from which the worker can download the package.
For a built-in connector, it should be set to builtin//<connector_name>.
--classnameThe source's class name if the archive is set to a file-URL-path (file://).
--cpuThe CPU (in cores) that needs to be allocated per source instance (applicable only to Docker runtime).
--deserialization-classnameThe SerDe classname for the source.
--destination-topic-nameThe Pulsar topic to which data is sent.
--diskThe disk (in bytes) that needs to be allocated per source instance (applicable only to Docker runtime).
--nameThe source's name.
--namespaceThe source's namespace.
--parallelismThe source's parallelism factor, that is, the number of source instances to run.
--processing-guaranteesThe processing guarantees (also named as delivery semantics) applied to the source. A source connector receives messages from the external system and writes messages to a Pulsar topic. The --processing-guarantees ensures the processing guarantees for writing messages to the Pulsar topic.
The available values are ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE.
--ramThe RAM (in bytes) that needs to be allocated per source instance (applicable only to the process and Docker runtimes).
-st, --schema-typeThe schema type.
Either a built-in schema (for example, AVRO and JSON) or a custom schema class name to be used to encode messages emitted from source.
--source-configThe key/values configurations of the source. For example: '{"sleepBetweenMessages": 60}'; For configuration details, refer to the documentation of each connector
--source-config-fileThe path to a YAML config file that specifies the source's configuration.
-t, --source-typeThe source's connector provider.
--tenantThe source's tenant.
--producer-configThe custom producer configuration (as a JSON string).

Sink connector configurations

This table lists all the common configurations for a sink connector.

FieldDescription
-a, --archiveThe path to the archive file for the sink.
It supports the file-URL-path (file://) which assumes that the NAR file already exists on the worker host from which the worker can download the package.
For a built-in connector, it should be set to builtin//<connector_name>.
--classnameThe sink's class name if the archive is set to a file-URL-path (file://).
--cpuThe CPU (in cores) that needs to be allocated per sink instance (applicable only to Docker runtime).
--custom-schema-inputsThe map of input topics to schema types or class names (as a JSON string).
--custom-serde-inputsThe map of input topics to SerDe class names (as a JSON string).
--diskThe disk (in bytes) that needs to be allocated per sink instance (applicable only to Docker runtime).
-i, --inputsThe sink's input topic or topics (multiple topics can be specified as a comma-separated list).
--nameThe sink's name.
--namespaceThe sink's namespace.
--parallelismThe sink's parallelism factor, that is, the number of sink instances to run.
--processing-guaranteesThe processing guarantees (also known as delivery semantics) applied to the sink. The --processing-guarantees implementation in Pulsar also relies on sink implementation.
The available values are ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE.
--ramThe RAM (in bytes) that needs to be allocated per sink instance (applicable only to the process and Docker runtimes).
--retain-orderingSink consumes messages in order.
--sink-configThe key/values configurations of the sink. For example: '{"sleepBetweenMessages": 60}'; For configuration details, refer to the documentation of each connector
--sink-config-fileThe path to a YAML config file specifying the sink's configuration.
-t, --sink-typeThe sink's connector provider. The sink-type parameter of the currently built-in connectors is determined by the setting of the name parameter. You can use the pulsar-admin sinks available-sinks command to get all built-in sink connectors.
--subs-namePulsar source subscription name if you want to specify a subscription name for the input-topic consumer.
--tenantThe sink's tenant.
--timeout-msThe message timeout in milliseconds.
--topics-patternThe topic pattern to consume from a list of topics under a namespace that matches the pattern.
--input and --topics-Pattern are mutually exclusive.
Add SerDe class name for a pattern in --customSerdeInputs.

StreamNative Cloud custom runtime options

To facilitate submitting Pulsar functions based on your requirements, Function on Cloud service provides some custom options via custom-runtime-options.

Note

When update functions/sinks/sources with custom runtime options, the original custom runtime options will be replaced by the new ones, so make sure all the wanted fields are passed in the custom runtime options when you do the update.

This table lists all fields available for custom options.

NameTypeDefaultDescription
clusterNameStringN/AThe Pulsar cluster of a Pulsar function, source, or sink.
inputTypeClassNameString[BThe map of input topics to Java class names.
outputTypeClassNameString[BThe map of output topics to Java class names.
maxReplicasInteger0The maximum number of Pulsar instances that you want to run for this Pulsar Function. When the value of the maxReplicas parameter is greater than the value of replicas, it indicates that the Functions controller automatically scales the Pulsar Functions based on the CPU usage. By default, maxReplicas is set to 0, which indicates that auto-scaling is disabled.
envMap<String, String>N/AThe environment variables being attached to a Pod that is created by the Function Mesh Operator for the cluster.
imagePullSecretsList<String>N/AA list of references to secrets in the same namespace for pulling any of the images used by a Pod.
logLevelStringinfoThe log levels for Pulsar functions. For details, see log levels.
logRotationPolicyStringN/AThe log rotation policies for Pulsar functions. You can set the log rotation policies based on the time or the log file size. For details, see log rotation policies.
runnerImageTagStringN/AThe tag of the runner image that is used to submit a function, source, or sink.
hpaSpec (Preview)HPASpecN/AThe Kubernetes HorizontalPodAutoscaler settings. For details, see Kubernetes documentation. This feature may not ready for your cluster enviroment, please file a ticket if you want this feature enabled.
logFormatStringtextThe log format that defines how the content of a log file should be interpreted. Available options are json and text. The log format configurations are only available for the Java and Python runtimes.
logTopicStringN/AUsed for sinks/sources since they don't have a log topic argument like functions
logTopicAgentStringruntimeThe log agent that defines how StreamNative cloud redirects your functions / connectors log into a Pulsar topic. Available options are runtime and sidecar. When use sidecar all logs (including instance logs) will be sent to the log topic by filebeat (better performance than runtime), else will use the Pulsar Functions runtime's log-topic implementation instead.
genericKindStringN/AUsed for functions written in languages other than Java and Python, available values are executable, nodejs, wasm
terminationGracePeriodSecondsLongN/AThe amount of time that kubernetes will give for a pod before terminating it.
pauseRolloutBooleanN/AWhether to pause the rollout of functions/connectors, when set to true, running functions&connectors will not restart during the upgrade of backend function-mesh operator.

User could compose the custom runtime options as a JSON string and pass it to custom-runtime-options field. For example:

{
  "inputTypeClassName": "java.lang.String",
  "logTopic": "log-topic-name"
}

Then it could be passed to custom-runtime-options field as follows:

pulsarctl sinks create --custom-runtime-options '{"inputTypeClassName":"java.lang.String","logTopic":"log-topic-name"}' ...
Previous
Monitor and Troubleshoot