1. Connect to External Systems

Create Kafka Connectors

Prerequisites

Before deploying a kafka connect to StreamNative Cloud, make sure the following prerequisites have been met:

Create a built-in kafka connect

Tip

Before creating a kafka connect, it’s highly recommended to do the following:

  1. Check kafka connect availability to ensure the version number of the kafka connect you want to create is supported on StreamNative Cloud.
  2. Go to StreamNative Hub and find the connect-specific docs of your version for configuration reference.

Note

You may see below error logs for the first time you create a connector:

org.apache.kafka.common.config.ConfigException: Topic '__kafka_connect_offset_storage' supplied via the 'offset.storage.topic' property is required to have 'cleanup.policy=compact' to guarantee consistency and durability of source connector offsets, but found the topic currently has 'cleanup.policy=delete'. Continuing would likely result in eventually losing source connector offsets and problems restarting this Connect cluster in the future. Change the 'offset.storage.topic' property in the Connect worker configurations to use a topic with 'cleanup.policy=compact'.

You should set the cleanup.policy of the __kafka_connect_offset_storage topic to compact to avoid the above error with below command:

./bin/kafka-configs.sh --bootstrap-server xxxx:9093 --command-config ~/kafka/kafka-token.properties --alter --topic __kafka_connect_offset_storage --add-config cleanup.policy=compact

The following example shows how to create a data generator source connect named test on Streamnative Cloud using different tools.

To create a data generator source connect named test, run the following command.

> cat datagen.json
{
  "name": "test",
  "config": {
    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
    "kafka.topic": "testusers",
    "quickstart": "users",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "max.interval": 1000,
    "iterations": 10000000,
    "tasks.max": "1"
  }
}
> kcctl apply -f datagen.json

You should see the following output:

Created connector test

If you want to verify whether the data generator source connect has been created successfully, run the following command:

kcctl get connectors

You should see the following output:


 NAME                  TYPE     STATE        TASKS
 test                  source   RUNNING      0: RUNNING

Create kafka connect with SMT

StreamNative Cloud supports Single Message Transformations (SMTs) for Kafka Connect. You can use SMTs to transform messages before they are written to the target system.

The following example shows how to create a Datagen source connector named test on StreamNative Cloud using different tools.

Please refer to the Kafka Connect SMTs to check the supported SMTs in StreamNative cloud.

To create a data generator source connect named test with SMT, run the following command.

> cat datagen.json
{
  "name": "test",
  "config": {
    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
    "kafka.topic": "testusers",
    "quickstart": "users",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "max.interval": 1000,
    "iterations": 10000000,
    "tasks.max": "1",
    "transforms": "InsertSource",
    "transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.InsertSource.static.field": "data_source",
    "transforms.InsertSource.static.value": "test-file-source"
  }
}
> kcctl apply -f datagen.json

You should see the following output:

Created connector test

If you want to verify whether the data generator source connect has been created successfully, run the following command:

kcctl get connectors

You should see the following output:


 NAME                  TYPE     STATE        TASKS
 test                  source   RUNNING      0: RUNNING

Create kafka connect with secret

Some connects require sensitive information, such as passwords, token, to be passed to the connector. And you may not want to expose these sensitive information in the connector configuration. To solve this problem, you can use the following methods to pass sensitive information to the connector:

  • Create a secret

    For example, the Milvus sink connector requires a token to be passed to the connector. You can create a secret in the console UI and pass the secret name to the connector configuration.

    screenshot of creating secret

    Tip

    The location should be the same as the region of your Pulsar cluster.

    The awsAccessKey and awsSecretKey is the field name, and the lambda-sink-secret can be any unique name you want to give to the secret.

    For a Milvus sink, we should create a secret with a token field.

  • Pass secrets to the connector configuration

    The following example shows how to create a Milvus sink connector named test on Streamnative Cloud using different tools.

    To create a Milvus sink connector named test, run the following command.

    > cat milvus.json
    {
      "name": "test",
      "config": {
        "connector.class": "com.milvus.io.kafka.MilvusSinkConnector",
        "public.endpoint": "http://dockerhost.default.svc.cluster.local:19530",
        "collection.name": "demo"
        "token": "${snsecret:miluvs-sec:token}",
        "topics": "kafka-milvus-input",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "tasks.max": "1"
      }
    }
    > kcctl apply -f milvus.json
    

    Tip

    The miluvs-sec is the name of the secret you created

    You should see the following output:

    Created connector test
    

    If you want to verify whether the data generator source connect has been created successfully, run the following command:

    kcctl get connectors
    

    You should see the following output:

    
    NAME                  TYPE     STATE        TASKS
    test                  sink     RUNNING      0: RUNNING
    

Create a custom kafka connect

Tip

Before creating a kafka connect, it’s highly recommended to do the following:

  1. Check kafka connect availability to ensure the version number of the kafka connect you want to create is supported on StreamNative Cloud.
  2. Go to StreamNative Hub and find the connect-specific docs of your version for configuration reference.

To create a custom kafka connect, you need to upload the connector jar/zip file to the StreamNative Cloud Package service first. Below are the steps:

Upload your connector file to Pulsar

You need to set the context for Pulsarctl first:

# create a context
pulsarctl context set ${context-name}  \
--admin-service-url ${admin-service-url} \
--issuer-endpoint ${issuerUrl} \
--audience urn:sn:pulsar:${orgName}:${instanceName} \
--key-file ${privateKey}

# activate oauth2
pulsarctl oauth2 activate

Note

Replace the placeholder variables with the actual values that you can get when setting up client tools.

  • context-name: any name you want
  • admin-service-url: the HTTP service URL of your Pulsar cluster.
  • privateKey: the path to the downloaded OAuth2 key file.
  • issuerUrl: the URL of the OAuth2 issuer.
  • audience: the Uniform Resource Name (URN), which is a combination of the urn:sn:pulsar, your organization name, and your Pulsar instance name.

Upload packages

pulsarctl packages upload function://public/default/mongo-connect-zip@v1 \
--path /tmp/mongodb-kafka-connect-mongodb-1.12.0.zip \
--description "mongodb kafka connect in zip format" \
--properties fileName=mongo-kafka-connect.zip \
--properties libDir=mongodb-kafka-connect-mongodb-1.12.0/lib

You should see the following output:

The package 'function://public/default/mongo-connect-zip@v1' uploaded from path '/tmp/mongodb-kafka-connect-mongodb-1.12.0.zip' successfully

Tip

the property libDir specifies the directory where the third-party libraries are located in the zip file.

The following example shows how to create a custom mongodb source connect named mongo-source on Streamnative Cloud using different tools.

To create a custom mongodb source connect named mongo-source, run the following command.

> cat mongo.json
{
  "name": "mongo-source",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "connection.uri": "mongodb://mongo.default.svc.cluster.local:27017/?authSource=admin",
    "database": "kafka-mongo",
    "collection": "source",
    "key.converter.schemas.enable": false,
    "value.converter.schemas.enable": false,
    "sn.pulsar.package.url": "function://public/default/mongo-connect-zip@v1",
    "tasks.max": "1"
  }
}
> kcctl apply -f datagen.json

Tip

The sn.pulsar.package.url is the package url you uploaded to the StreamNative Cloud Package service.

You should see the following output:

Created connector mongo-source

If you want to verify whether the data generator source connect has been created successfully, run the following command:

kcctl get connectors

You should see the following output:


 NAME                  TYPE     STATE        TASKS
 mongo-source          source   RUNNING      0: RUNNING
Previous
Check Kafka Connect Availability