sink
Kafka Connect Snowflake Sink
The official Snowflake Kafka Connect Sink connector.

Available on
StreamNative Cloud console

Authored by
Snowflake Computing
Support type
Community
License
Apache License 2.0

Prerequisites

Quick Start

  1. Setup the kcctl client: doc

  2. Create a Snowflake instance

  3. Setup the database, user in Snowflake, please refer to: Snowflake Documentation

  4. Setup keypair: refer to: Using key pair authentication & key rotation

  5. Create a secret in StreamNative Console, and save the private key's content and passphrase to the secret, please refer to: doc, let's say the secret name is gcp, and key is auth

  6. Create a JSON file like the following:

    {
        "name": "snowflake-demo",
        "config": {
            "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "key.converter.schemas.enable": "false",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable": "false",
            "snowflake.ingestion.method": "SNOWPIPE_STREAMING",
            "snowflake.role.name": "kafka_connector_role_1",
            "snowflake.user.name": "kafka_connector_user_1",
            "snowflake.url.name": "${SNOWFLAKE_URL}:443",
            "snowflake.private.key": "${snsecret:snowflake-demo:snowflake.private.key}",
            "snowflake.private.key.passphrase": "${snsecret:snowflake-demo:snowflake.private.key.passphrase}",
            "topics": "snowflake-input",
            "snowflake.database.name": "kafka_db",
            "snowflake.schema.name": "public",
            "tasks.max": "1"
        }
    }
    
  7. Run the following command to create the connector:

    kcctl create -f <filename>.json
    

Configuration

The Snowflake Kafka sink connector is configured using the following Required properties:

ParameterDescription
nameThe name of the connector.
connector.classcom.snowflake.kafka.connector.SnowflakeSinkConnector .
topicsA list of Kafka topics that the sink connector watches. (You can define either the topics or the topics.regex setting, but not both.)
topics.regexA regular expression that matches the Kafka topics that the sink connector watches. (You can define either the topics or the topics.regex setting, but not both.)
snowflake.url.nameThe URL for accessing your Snowflake account.
snowflake.user.nameUser login name for the Snowflake account.
snowflake.private.keyThe private key to authenticate the user. Include only the key, not the header or footer. If the key is split across multiple lines, remove the line breaks.
snowflake.database.nameThe name of the database that contains the table to insert rows into.
snowflake.schema.nameThe name of the schema that contains the table to insert rows into.
header.converterRequired only if the records are formatted in Avro and include a header.
key.converterKafka record's key converter.
value.converterKafka record's value converter.

For the full list of configs, see the Official Snowflake Kafka Connect documentation