Prerequisites
- A running Snowflake instance in Snowflake
Quick Start
Setup the kcctl client: doc
Create a Snowflake instance
Setup the database, user in Snowflake, please refer to: Snowflake Documentation
Setup keypair: refer to: Using key pair authentication & key rotation
Create a secret in StreamNative Console, and save the private key's content and passphrase to the secret, please refer to: doc, let's say the secret name is
gcp
, and key isauth
Create a JSON file like the following:
{ "name": "snowflake-demo", "config": { "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "snowflake.ingestion.method": "SNOWPIPE_STREAMING", "snowflake.role.name": "kafka_connector_role_1", "snowflake.user.name": "kafka_connector_user_1", "snowflake.url.name": "${SNOWFLAKE_URL}:443", "snowflake.private.key": "${snsecret:snowflake-demo:snowflake.private.key}", "snowflake.private.key.passphrase": "${snsecret:snowflake-demo:snowflake.private.key.passphrase}", "topics": "snowflake-input", "snowflake.database.name": "kafka_db", "snowflake.schema.name": "public", "tasks.max": "1" } }
Run the following command to create the connector:
kcctl create -f <filename>.json
Configuration
The Snowflake Kafka sink connector is configured using the following Required properties:
Parameter | Description |
---|---|
name | The name of the connector. |
connector.class | com.snowflake.kafka.connector.SnowflakeSinkConnector . |
topics | A list of Kafka topics that the sink connector watches. (You can define either the topics or the topics.regex setting, but not both.) |
topics.regex | A regular expression that matches the Kafka topics that the sink connector watches. (You can define either the topics or the topics.regex setting, but not both.) |
snowflake.url.name | The URL for accessing your Snowflake account. |
snowflake.user.name | User login name for the Snowflake account. |
snowflake.private.key | The private key to authenticate the user. Include only the key, not the header or footer. If the key is split across multiple lines, remove the line breaks. |
snowflake.database.name | The name of the database that contains the table to insert rows into. |
snowflake.schema.name | The name of the schema that contains the table to insert rows into. |
header.converter | Required only if the records are formatted in Avro and include a header. |
key.converter | Kafka record's key converter. |
value.converter | Kafka record's value converter. |
For the full list of configs, see the Official Snowflake Kafka Connect documentation