- Connect to External Systems
- Pulsar IO
Configuration Reference
This section lists all the common configuration options for the built-in source and sink connectors. For connector-specific configurations, see StreamNative Hub.
Source connector configurations
This table lists all the common configurations for a source connector.
Field | Description |
---|---|
-a , --archive | The path to the NAR archive for the source. It supports the file-URL-path (file://) which assumes that the NAR file already exists on the worker host from which the worker can download the package. For a built-in connector, it should be set to builtin//<connector_name> . |
--classname | The source's class name if the archive is set to a file-URL-path (file://). |
--cpu | The CPU (in cores) that needs to be allocated per source instance (applicable only to Docker runtime). |
--deserialization-classname | The SerDe classname for the source. |
--destination-topic-name | The Pulsar topic to which data is sent. |
--disk | The disk (in bytes) that needs to be allocated per source instance (applicable only to Docker runtime). |
--name | The source's name. |
--namespace | The source's namespace. |
--parallelism | The source's parallelism factor, that is, the number of source instances to run. |
--processing-guarantees | The processing guarantees (also named as delivery semantics) applied to the source. A source connector receives messages from the external system and writes messages to a Pulsar topic. The --processing-guarantees ensures the processing guarantees for writing messages to the Pulsar topic. The available values are ATLEAST_ONCE , ATMOST_ONCE , EFFECTIVELY_ONCE . |
--ram | The RAM (in bytes) that needs to be allocated per source instance (applicable only to the process and Docker runtimes). |
-st , --schema-type | The schema type. Either a built-in schema (for example, AVRO and JSON) or a custom schema class name to be used to encode messages emitted from source. |
--source-config | The key/values configurations of the source. For example: '{"sleepBetweenMessages": 60}'; For configuration details, refer to the documentation of each connector |
--source-config-file | The path to a YAML config file that specifies the source's configuration. |
-t , --source-type | The source's connector provider. |
--tenant | The source's tenant. |
--producer-config | The custom producer configuration (as a JSON string). |
Sink connector configurations
This table lists all the common configurations for a sink connector.
Field | Description |
---|---|
-a , --archive | The path to the archive file for the sink. It supports the file-URL-path (file://) which assumes that the NAR file already exists on the worker host from which the worker can download the package. For a built-in connector, it should be set to builtin//<connector_name> . |
--classname | The sink's class name if the archive is set to a file-URL-path (file://). |
--cpu | The CPU (in cores) that needs to be allocated per sink instance (applicable only to Docker runtime). |
--custom-schema-inputs | The map of input topics to schema types or class names (as a JSON string). |
--custom-serde-inputs | The map of input topics to SerDe class names (as a JSON string). |
--disk | The disk (in bytes) that needs to be allocated per sink instance (applicable only to Docker runtime). |
-i, --inputs | The sink's input topic or topics (multiple topics can be specified as a comma-separated list). |
--name | The sink's name. |
--namespace | The sink's namespace. |
--parallelism | The sink's parallelism factor, that is, the number of sink instances to run. |
--processing-guarantees | The processing guarantees (also known as delivery semantics) applied to the sink. The --processing-guarantees implementation in Pulsar also relies on sink implementation. The available values are ATLEAST_ONCE , ATMOST_ONCE , EFFECTIVELY_ONCE . |
--ram | The RAM (in bytes) that needs to be allocated per sink instance (applicable only to the process and Docker runtimes). |
--retain-ordering | Sink consumes messages in order. |
--sink-config | The key/values configurations of the sink. For example: '{"sleepBetweenMessages": 60}'; For configuration details, refer to the documentation of each connector |
--sink-config-file | The path to a YAML config file specifying the sink's configuration. |
-t , --sink-type | The sink's connector provider. The sink-type parameter of the currently built-in connectors is determined by the setting of the name parameter. You can use the pulsar-admin sinks available-sinks command to get all built-in sink connectors. |
--subs-name | Pulsar source subscription name if you want to specify a subscription name for the input-topic consumer. |
--tenant | The sink's tenant. |
--timeout-ms | The message timeout in milliseconds. |
--topics-pattern | The topic pattern to consume from a list of topics under a namespace that matches the pattern. --input and --topics-Pattern are mutually exclusive. Add SerDe class name for a pattern in --customSerdeInputs . |
StreamNative Cloud custom runtime options
To facilitate submitting Pulsar functions based on your requirements, Function on Cloud service provides some custom options via custom-runtime-options
.
Note
When update functions/sinks/sources with custom runtime options, the original custom runtime options will be replaced by the new ones, so make sure all the wanted fields are passed in the custom runtime options when you do the update.
This table lists all fields available for custom options.
Name | Type | Default | Description |
---|---|---|---|
clusterName | String | N/A | The Pulsar cluster of a Pulsar function, source, or sink. |
inputTypeClassName | String | [B | The map of input topics to Java class names. |
outputTypeClassName | String | [B | The map of output topics to Java class names. |
maxReplicas | Integer | 0 | The maximum number of Pulsar instances that you want to run for this Pulsar Function. When the value of the maxReplicas parameter is greater than the value of replicas , it indicates that the Functions controller automatically scales the Pulsar Functions based on the CPU usage. By default, maxReplicas is set to 0, which indicates that auto-scaling is disabled. |
env | Map<String, String> | N/A | The environment variables being attached to a Pod that is created by the Function Mesh Operator for the cluster. |
imagePullSecrets | List<String> | N/A | A list of references to secrets in the same namespace for pulling any of the images used by a Pod. |
logLevel | String | info | The log levels for Pulsar functions. For details, see log levels. |
logRotationPolicy | String | N/A | The log rotation policies for Pulsar functions. You can set the log rotation policies based on the time or the log file size. For details, see log rotation policies. |
runnerImageTag | String | N/A | The tag of the runner image that is used to submit a function, source, or sink. |
hpaSpec (Preview) | HPASpec | N/A | The Kubernetes HorizontalPodAutoscaler settings. For details, see Kubernetes documentation. This feature may not ready for your cluster enviroment, please file a ticket if you want this feature enabled. |
logFormat | String | text | The log format that defines how the content of a log file should be interpreted. Available options are json and text . The log format configurations are only available for the Java and Python runtimes. |
logTopic | String | N/A | Used for sinks/sources since they don't have a log topic argument like functions |
logTopicAgent | String | runtime | The log agent that defines how StreamNative cloud redirects your functions / connectors log into a Pulsar topic. Available options are runtime and sidecar . When use sidecar all logs (including instance logs) will be sent to the log topic by filebeat (better performance than runtime ), else will use the Pulsar Functions runtime's log-topic implementation instead. |
genericKind | String | N/A | Used for functions written in languages other than Java and Python, available values are executable , nodejs , wasm |
terminationGracePeriodSeconds | Long | N/A | The amount of time that kubernetes will give for a pod before terminating it. |
pauseRollout | Boolean | N/A | Whether to pause the rollout of functions/connectors, when set to true, running functions&connectors will not restart during the upgrade of backend function-mesh operator. |
User could compose the custom runtime options as a JSON string and pass it to custom-runtime-options
field. For example:
{
"inputTypeClassName": "java.lang.String",
"logTopic": "log-topic-name"
}
Then it could be passed to custom-runtime-options
field as follows:
pulsarctl sinks create --custom-runtime-options '{"inputTypeClassName":"java.lang.String","logTopic":"log-topic-name"}' ...