Prerequisites
Before deploying a kafka connect to StreamNative Cloud, make sure the following prerequisites have been met:- A running external data system service.
- A running Pulsar Cluster with Kop feature enabled on StreamNative Cloud and the required environment has been set up.
Create a built-in kafka connect
Before creating a kafka connect, it’s highly recommended to do the following:
- Check kafka connect availability to ensure the version number of the kafka connect you want to create is supported on StreamNative Cloud.
- Go to StreamNative Hub and find the connect-specific docs of your version for configuration reference.
You may see below error logs for the first time you create a connector:You should set the
cleanup.policy
of the __kafka_connect_offset_storage
topic to compact
to avoid the above error with below command:test
on Streamnative Cloud using different tools.
To create a data generator source connect named You should see the following output:If you want to verify whether the data generator source connect has been created successfully, run the following command:You should see the following output:
test
, run the following command.Create kafka connect with SMT
StreamNative Cloud supports Single Message Transformations (SMTs) for Kafka Connect. You can use SMTs to transform messages before they are written to the target system. The following example shows how to create a Datagen source connector namedtest
on StreamNative Cloud using different tools.
Please refer to the Kafka Connect SMTs to check the supported SMTs in StreamNative cloud.
To create a data generator source connect named You should see the following output:If you want to verify whether the data generator source connect has been created successfully, run the following command:You should see the following output:
test
with SMT, run the following command.Create kafka connect with secret
Some connects require sensitive information, such as passwords, token, to be passed to the connector. And you may not want to expose these sensitive information in the connector configuration. To solve this problem, you can use the following methods to pass sensitive information to the connector:-
Create a secret
For example, the Milvus sink connector requires a token to be passed to the connector.
You can create a secret in the console UI and pass the secret name to the connector configuration.
The
location
should be the same as the region of your Pulsar cluster.The awsAccessKey
and awsSecretKey
is the field name, and the lambda-sink-secret
can be any unique name you want to give to the secret.For a Milvus sink, we should create a secret with a token
field.-
Pass secrets to the connector configuration
The following example shows how to create a Milvus sink connector named
test
on Streamnative Cloud using different tools.
To create a Milvus sink connector named You should see the following output:If you want to verify whether the data generator source connect has been created successfully, run the following command:You should see the following output:
test
, run the following command.The
miluvs-sec
is the name of the secret you createdCreate a custom kafka connect
Before creating a kafka connect, it’s highly recommended to do the following:
- Check kafka connect availability to ensure the version number of the kafka connect you want to create is supported on StreamNative Cloud.
- Go to StreamNative Hub and find the connect-specific docs of your version for configuration reference.
Upload your connector file to Pulsar
Upload packagesYou should see the following output:
the property
libDir
specifies the directory where the third-party libraries are located in the zip file.mongo-source
on Streamnative Cloud using different tools.
To create a custom mongodb source connect named You should see the following output:If you want to verify whether the data generator source connect has been created successfully, run the following command:You should see the following output:
mongo-source
, run the following command.The
sn.pulsar.package.url
is the package url you uploaded to the StreamNative Cloud Package service.Set resources for kafka connect
You can use below two configs to set the resources for the kafka connect to control the CPU and memory usage of the connector:sn.cpu
: The number of CPU cores to allocate to the connector, default to 0.5.sn.memory
: The bytes of memory to allocate to the connector, default to 2147483648 (2G).
You need to upgrade your Pulsar cluster to
v3.0.8.4+
, v3.3.3.4+
or v4.0.1.3+
to use the sn.cpu
and sn.memory
configs.Schema Registry Support
Kafka Connect supports using schema registry to save Avro/Protobuf/Json schema for the value and key. And StreamNative has an internal schema registry which can be used without complex configurations. To use it, you just need to set the following configuration in the connector configuration:value.converter.schema.registry.internal: true
: if you want to use the internal schema registry for the value converter.key.converter.schema.registry.internal: true
: if you want to use the internal schema registry for the key converter.