The Snowflake sink connector is used to write messages from Apache Pulsar topics to Snowflake tables with snowpipe streaming.
Admin - Accounts
page and click the link. It should be the format like
https://<account_identifier>.snowflakecomputing.com
.
rsa_key.p8
(the private key) and rsa_key.pub
(the public key) locally.
-a
is followed by an account identifier, which is a substring of the account URL before. The -u
is followed
by your username. After logging in, set the public key passphrase:
(MIIBIjA…)
by running the following command:
grant.sql
. The script creates a user snservice
that will be used in the sink config later and
associate it with a role snrole
that is only used in Snowflake internally. Then it grants the necessary permissions.
builtin
connector. If you want to create a non-builtin
connector,
you need to replace --sink-type snowflake
with --archive /path/to/pulsar-io-snowflake.nar
. You can find the button
to download the nar
package at the beginning of the document.
--sink-config
is the minimum necessary configuration for starting this connector, and it is a JSON string. You
need to substitute the relevant parameters with your own.
You can get the private key passphrase (MIIBIjA…)
by running the following command:
pulsar-admin
are similar to
those of pulsarctl
. You can find an example
for StreamNative Cloud Doc.SNROLE
to the user you logged in.
SNROLE
, under Data - Database - ST_TUTS - DEMO - Tables
you will find table
PERSISTENT___PUBLIC_DEFAULT_INPUT_SNOWFLAKE_1118738946
is created and records the messages produced above.
Name | Type | Required | Sensitive | Default | Description |
---|---|---|---|---|---|
url | String | Yes | false | "" (empty string) | The URL for accessing your Snowflake account. This URL must include your account identifier. The protocol (https://) and port number are optional. |
user | String | Yes | false | "" (empty string) | User login name for the Snowflake account. |
database | String | Yes | false | "" (empty string) | The database in Snowflake where the connector will sink data. |
schema | String | Yes | false | "" (empty string) | The schema in Snowflake where the connector will sink data. |
role | String | Yes | false | "" (empty string) | Access control role to use when inserting rows into the table. |
warehouse | String | No | false | "" (empty string) | The warehouse name in Snowflake. Defaults to empty. |
privateKey | String | Yes | true | "" (empty string) | The private key of the user. This is sensitive information used for authentication. |
topic2table | String | No | false | "" (empty string) | Optional parameter to map topics to tables. Separate each topic and table with a colon. Use fully qualified Pulsar topic names or regex. For example: “persistent://public/default/test:table” Each topic must match only one table.” |
metadataFields | String | No | false | ”message_id,partition,topic,publish_time” | The metadata fields for each Snowflake record. Separate multiple fields with commas. Supported fields: schema_version, partition, event_time, publish_time, message_id, sequence_id, producer_name, topic. |
icebergEnabled | boolean | No | false | false | Enable the Iceberg table format. Defaults to false. |
maxClientLag | long | No | false | 1 | Specifies how often Snowflake Ingest SDK flushes data to Snowflake, in seconds. Specify it to 0 or leave it empty to let the connector decide the value. For how the connector decide this value, see section Ingestion Latency . |
checkCommittedMessageIntervalMs | long | No | false | 1000 | Specifies how often the connector checks for committed messages, in milliseconds. |
enableSchematization | boolean | No | false | true | Enable schema detection and evolution. Defaults to true. |
retainOrdering=true
. If you create the
connector on SNCloud, retain ordering is enabled by default and cannot be
disabled. For why retain ordering is necessary, see Data sink workflow.privateKey
.
If you are setting up the connector on the SNCloud UI, you need to create a secret to store the private key. The private
key should not have any headers or footers; it must be a base64 encoded string.
You can get your private key using the following command:
icebergEnabled
is true, the connector won’t create the table and will throw an exception if the table is not exist.
The connector converts the topic name into a valid Snowflake table name using the following rules:
test-topic
would become TEST_TOPIC
.
numbers+x
and numbers-x
would both become NUMBERS_X
. To prevent
duplication, the connector appends a unique suffix to the table name, consisting of an underscore and a generated hash
code.
You can use the configuration parameter topic2table
to specify a mapping between topics and tables. The parameter
value is a comma-separated list of topic-table pairs, where each pair is separated by a colon. For example:
enableSchematization
is true and the table has the
ENABLE_SCHEMA_EVOLUTION
enabled. You can set it by running the following SQL command:
maxClientLag
: This parameter specifies how often the Snowflake Ingest SDK flushes data to Snowflake, in seconds. If
you find the delay for the connector to load data to the Snowflake table too long,
you can reduce this value. However, setting it too low will result in more frequent data flush operations and result
in a worse query performance. For more instruction for configuring the maxClientLag
, please
see Latency recommendations¶.
checkCommittedMessageIntervalMs
: This parameter specifies how often the connector checks for committed messages and
acknowledge them in the Pulsar, in milliseconds. The default value is 1 second.test-topic
with 3 partitions, the connector will create 3 TopicPartitionChannels: test-topic-partition-0
,
test-topic-partition-1
, and test-topic-partition-2
.exactly-once
delivery guarantee semantic.
Exactly-once semantics ensure the delivery of Pulsar messages without duplication or data loss.
The connector uses a one-to-one mapping between partitions and channels, utilizing the Message ID and the Snowflake
Channel Offset Token to achieve exactly-once semantics. For more information,
see Snowflake Offset Token.
The connector uses the Message ID as the offset token. For instance, the message ID 0:4:1:3
(Ledger ID: 0; Entry ID:
4; Partition Index: 1; Batch Index: 3) is converted to the offset token 0:4:1
, discarding the partition index. This
offset token is used each time messages are inserted into Snowflake. When the Snowflake SDK flushes the data, the last
committed offset token of the channel is updated, and the connector retrieves it from the SDK.
With retainOrdering
enabled, the connector ensures that messages in each TopicPartitionChannel are received in order
without skips.
When opening each TopicPartitionChannel, the Snowflake Streaming Ingest channel is opened, retrieving the last committed
offset token, which is stored as committedMessageId
. Another variable, processedMessageId
, is updated after each
data insert. The connector discards all messages before the current processedMessageId
. It periodically checks the
last committed offset token and updates lastCommittedMessageId
, acknowledging all committed messages if
lastCommittedMessageId
changes.
Here is an example diagram to show how it works:
Suppose the current processMessageId
and the lastCommittedMessageId
are both 0:4:0
.
processedMessageId
, the connector inserts it into the
Snowflake channel and updates processedMessageId
to 0:4:1
.:
0:4:0
comes in again, it will happen due to the consumer redelivery or some
retry operations. The connector discards it as its message id is less than processedMessageId
.
0:4:2
. And a dedicated timer will periodically check the last committed offset token and update the
lastCommittedMessageId
. The periodicity is determined by the configuration checkCommittedMessageIntervalMs
.
Once the Snowflake channel flushes data, the last committed offset token updates to 0:4:2
. A timer periodically checks
this token, updating lastCommittedMessageId
. If it changes, the connector acknowledges all messages before
lastCommittedMessageId
using the cumulative acknowledgment:
Pulsar Schema | Supported |
---|---|
AVRO | Yes |
JSON | Yes |
PRIMITIVE | Yes |
KEY_VALUE | No |
PROTOBUF | No |
PROTOBUF_NATIVE | No |
RECORD_CONTENT
and RECORD_METADATA
.
RECORD_CONTENT
: The content of the message. The type of the content is
VARIANT
.RECORD_METADATA
: The metadata of the message. The type of the metadata is
VARIANT
..metadataFields
to specify the metadata fields for each Snowflake record. The default value
is message_id,partition,topic,publish_time
. And the supported fields are schema_version
, partition
, event_time
,
publish_time
, message_id
, sequence_id
, producer_name
, topic
.
If enableSchematization
is true and the message is a primitive type, the connector will store the message in the
RECORD_CONTENT
column.
Field Avro Schema | Snowflake Table Type |
---|---|
INT | INT |
LONG | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
STRING | VARCHAR |
BYTES | BINARY |
ARRAY | ARRAY |
ENUM | VARIANT |
UNION | VARIANT |
RECORD | VARIANT |
Logical Type | Snowflake Table Type |
---|---|
decimal | VARCHAR |
date | DATE |
time-millis , time-micros | TIME(6) |
timestamp-millis ,timestamp-micros | TIMESTAMP(6) |
local-timestamp-millis | TIMESTAMP_LTZ |
local-timestamp-micros | TIMESTAMP_LTZ(6) |
icebergEnabled
is true and schema detection and evolution are enabled, the connector will alter the Iceberg
table based on the avro schema type of the message’s field. The following table shows the mapping between the field’s
schema and the Snowflake table.
Field Avro Schema | Snowflake Table Type |
---|---|
INT | INT |
LONG | LONG |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
STRING | VARCHAR |
BYTES | BINARY |
Logical Type | Snowflake Table Type |
---|---|
decimal | VARCHAR |
date | DATE |
time-millis , time-micros | TIME(6) |
timestamp-millis ,timestamp-micros | TIMESTAMP(6) |
local-timestamp-millis | TIMESTAMP_LTZ |
local-timestamp-micros | TIMESTAMP_LTZ(6) |
icebergEnabled
configuration to true
.
icebergEnabled
is set to true
, the connector will automatically update the Iceberg table based on the Avro
schema of the message fields. If not, you need to manually update the table to match the Pulsar Avro Schema Type.
For the schema mapping for the Iceberg table, please refer to
the Iceberg table type mapping.
Here is a simple example to create an Iceberg table:
ENABLE_SCHEMA_EVOLUTION
enabled if you wants the connector to
automatically evolve the schema. You can set it by running the following SQL command:
RECORD_METADATA
column and match the types as shown in the example.