The Snowflake streaming sink connector pulls data from Pulsar topics and persists data to
Snowflake based on the SnowPipe Streaming feature. For more information about connectors,
see Connector Overview.
This connector is available as a built-in connector on StreamNative Cloud.
-
For a quick start on setting up and using the connector, refer to the Quick Start section. It provides
a step-by-step guide to get you up and running quickly.
-
To understand the internal workings of the connector, see the Data Sink Workflow. This section
explains how data flows from Pulsar to Snowflake.
-
The Pulsar Snowflake Streaming Sink Connector supports exactly-once semantics, ensuring data is processed without
duplication. Learn how this is achieved in the Exactly-Once Semantics section.
-
For insights into how the connector maintains reliability and handles errors, check
the Fault Tolerance section. It describes mechanisms for recovering from failures.
-
To learn how to configure the connector to suit your needs, see the Configuration section. It
provides details on all available settings and options.
-
For instructions on how to sink data into an Iceberg table, visit
the Sink Data into the Iceberg Table section. This part covers the steps required
to integrate with Iceberg tables.
Quick start
This section introduces how to get started with creating a Snowflake streaming sink connector and get it up and
running.
Prerequisites
The prerequisites for connecting a Snowflake streaming sink connector to external systems include:
-
Prepare a snowflake account
-
Get the account URL from the
Admin - Accounts page and click the link. It should be the format like
https://<account_identifier>.snowflakecomputing.com.
-
Generate the public key and private key for the authentication. For more details, please
check this guide
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
It will generate rsa_key.p8 (the private key) and rsa_key.pub (the public key) locally.
-
Log in and configure the public key.
See Installing SnowSQL to install the SnowSQL.
snowsql -a ${account_identifier} -u ${user_name}
The -a is followed by an account identifier, which is a substring of the account URL before. The -u is followed
by your username. After logging in, set the public key passphrase:
ALTER USER ${user_name} SET RSA_PUBLIC_KEY='MIIBIjA...';
You can get the public key passphrase (MIIBIjA…) by running the following command:
grep -v "\-\-\-" rsa_key.pub | tr -d '\n'
1. Create Snowflake objects and grant permission
Before creating the connector, you need to grant the permissions in Snowflake. Write the following content into a file,
e.g. name it with grant.sql. The script creates a user snservice that will be used in the sink config later and
associate it with a role snrole that is only used in Snowflake internally. Then it grants the necessary permissions.
CREATE DATABASE st_tuts;
CREATE SCHEMA st_tuts.demo;
CREATE ROLE snrole;
GRANT ALL PRIVILEGES ON DATABASE st_tuts TO ROLE snrole;
GRANT ALL PRIVILEGES ON SCHEMA st_tuts.demo TO ROLE snrole;
CREATE USER snservice;
GRANT ROLE snrole TO USER snservice;
ALTER USER snservice SET DEFAULT_ROLE = snrole;
ALTER USER snservice SET RSA_PUBLIC_KEY = 'MIIBI...';
Then run the following command to execute the SQL script above.
snowsql -a ${account_identifier} -u ${user_name} -f grant.sql
2. Create a connector
The following command shows how to use pulsarctl to create a builtin
connector. If you want to create a non-builtin connector,
you need to replace --sink-type snowflake with --archive /path/to/pulsar-io-snowflake.nar. You can find the button
to download the nar package at the beginning of the document.
pulsarctl sinks create \
--sink-type snowflake-streaming \
--name snowflake \
--tenant public \
--namespace default \
--retain-ordering \
--inputs "Your topic name" \
--parallelism 1 \
--sink-config \
'{
"user": "SNSERVICE",
"url": "https://<account_identifier>.snowflakecomputing.com",
"schema": "demo",
"database": "st_tuts",
"privateKey": "...",
"role": "snrole",
"warehouse": "compute_wh"
}'
The --sink-config is the minimum necessary configuration for starting this connector, and it is a JSON string. You
need to substitute the relevant parameters with your own.
You can get the private key passphrase (MIIBIjA…) by running the following command:
grep -v '\-\-\-' rsa_key.p8 | tr -d '\n'
If you want to configure more parameters, see Configuration for reference.
You can also choose to use a variety of other tools to create a connector:
3. Send messages to the topic
@Data
public static class TestMessage {
private long index;
}
PulsarClient client = PulsarClient.builder()
.serviceUrl("{{Your Pulsar URL}}")
.build();
var producer = client.newProducer(Schema.JSON(TestMessage.class))
.topic("{{Your topic name}}")
.create();
for (int i = 0; i < 10; i++) {
TestMessage msg = new TestMessage();
msg.index = i;
MessageId msgID = producer.send(msg);
System.out.println("Publish " + message + " and message ID " + msgID);
}
producer.flush();
producer.close();
client.close();
4. Check the data on Snowflake table
First, you need to execute the following SQL command to grant the role SNROLE to the user you logged in.
GRANT ROLE SNROLE TO USER ${account_name};
Then, switch the role to SNROLE, under Data - Database - ST_TUTS - DEMO - Tables you will find table
PERSISTENT___PUBLIC_DEFAULT_INPUT_SNOWFLAKE_1118738946 is created and records the messages produced above.
Configuration
This table outlines all the properties and the descriptions.
| Name | Type | Required | Sensitive | Default | Description |
|---|
url | String | Yes | false | "" (empty string) | The URL for accessing your Snowflake account. This URL must include your account identifier. The protocol (https://) and port number are optional. |
user | String | Yes | false | "" (empty string) | User login name for the Snowflake account. |
database | String | Yes | false | "" (empty string) | The database in Snowflake where the connector will sink data. |
schema | String | Yes | false | "" (empty string) | The schema in Snowflake where the connector will sink data. |
role | String | Yes | false | "" (empty string) | Access control role to use when inserting rows into the table. |
warehouse | String | No | false | "" (empty string) | The warehouse name in Snowflake. Defaults to empty. |
privateKey | String | Yes | true | "" (empty string) | The private key of the user. This is sensitive information used for authentication. |
topic2table | String | No | false | "" (empty string) | Optional parameter to map topics to tables. Separate each topic and table with a colon. Use fully qualified Pulsar topic names or regex. For example: “persistent://public/default/test:table” Each topic must match only one table.” |
metadataFields | String | No | false | ”message_id,partition,topic,publish_time” | The metadata fields for each Snowflake record. Separate multiple fields with commas. Supported fields: schema_version, partition, event_time, publish_time, message_id, sequence_id, producer_name, topic. |
icebergEnabled | boolean | No | false | false | Enable the Iceberg table format. Defaults to false. |
maxClientLag | long | No | false | 1 | Specifies how often Snowflake Ingest SDK flushes data to Snowflake, in seconds. Specify it to 0 or leave it empty to let the connector decide the value. For how the connector decide this value, see section Ingestion Latency. |
checkCommittedMessageIntervalMs | long | No | false | 1000 | Specifies how often the connector checks for committed messages, in milliseconds. |
enableSchematization | boolean | No | false | true | Enable schema detection and evolution. Defaults to true. |
While running the connector, please make sure the retain ordering is enabled: retainOrdering=true. If you create the
connector on SNCloud, retain ordering is enabled by default and cannot be
disabled. For why retain ordering is necessary, see Data sink workflow.
Authentication
The Snowflake streaming sink connector supports authenticating with Snowflake using a private key. Please complete the
key pair authentication instructions described
in Snowflake key pair rotation. After that, you can get copy
the entire private key and set it into the configuration privateKey.
If you are setting up the connector on the SNCloud UI, you need to create a secret to store the private key. The private
key should not have any headers or footers; it must be a base64 encoded string.
You can get your private key using the following command:
grep -v '\-\-\-' rsa_key.p8 | tr -d '\n'
Table name mapping
When using the Pulsar Snowflake Streaming Connector, Pulsar topics can be mapped to existing Snowflake tables. If no
mapping is specified, the connector will create a new table for each topic based on the topic’s name. Note that if the
icebergEnabled is true, the connector won’t create the table and will throw an exception if the table is not exist.
The connector converts the topic name into a valid Snowflake table name using the following rules:
-
Lowercase topic names are converted to uppercase for table names.
-
If the first character of the topic name is not a letter (a-z, A-Z) or an underscore (_), an underscore is added at
the beginning of the table name.
-
Any character in the topic name that is not valid for a Snowflake table name is replaced with an underscore. For more
information on valid characters,
see Snowflake’s identifier requirements. For
example, the topic name
test-topic would become TEST_TOPIC.
Be aware that if the connector modifies the topic name to create a table, there could be duplicate table names within
the same schema. For example, topics named numbers+x and numbers-x would both become NUMBERS_X. To prevent
duplication, the connector appends a unique suffix to the table name, consisting of an underscore and a generated hash
code.
You can use the configuration parameter topic2table to specify a mapping between topics and tables. The parameter
value is a comma-separated list of topic-table pairs, where each pair is separated by a colon. For example:
topic2table: "persistent://public/default/topic1:table1,persistent://public/default/topic2:table2"
Also, you can use the regex to match multiple topics to one table. For example:
Schema detection and evolution
The Connector offers support for schema detection and evolution. Snowflake
table structures can be automatically defined and adjusted to accommodate new data structures
loaded by the connector. To enable schema detection and evolution for this setup, configure the following configuration:
enableSchematization: true
The schema detection and evolution is only enabled if both the enableSchematization is true and the table has the
ENABLE_SCHEMA_EVOLUTION enabled. You can set it by running the following SQL command:
ALTER TABLE <your-table> SET ENABLE_SCHEMA_EVOLUTION = TRUE;
For more information of this configuration, please
see Table schema evolution.
Without schema detection and evolution, the Snowflake table loaded by the Pulsar connector contains only two VARIANT
columns: RECORD_CONTENT and RECORD_METADATA. When schema detection and evolution is enabled, Snowflake can
automatically identify the schema of streaming data and load it into tables that align with any user-defined schema.
Additionally, Snowflake allows for the addition of new columns or the removal of the NOT NULL constraint from columns
absent in new data files.
Please see more information about the schema in the Schema section.
Before schema detection and evolution is enabled, the table only consists of two VARIANT columns, RECORD_CONTENT and
RECORD_METADATA, as the following example demonstrates.
+------+---------------------------------------------------------+---------------------------------------------------+
| Row | RECORD_METADATA | RECORD_CONTENT |
|------+---------------------------------------------------------+---------------------------------------------------|
| 1 |{"message_id": "691:0:0:0", "partition": 0, "topic": "...| "account": "ABC123", "symbol": "ZTEST", "side":...|
| 2 |{"message_id": "691:0:0:1", "partition": 0, "topic": "...| "account": "XYZ789", "symbol": "ZABZX", "side":...|
| 3 |{"message_id": "691:0:0:2", "partition": 0, "topic": "...| "account": "XYZ789", "symbol": "ZTEST", "side":...|
| 4 |{"message_id": "691:0:0:3", "partition": 0, "topic": "...| "account": "ABC123", "symbol": "ZABZX", "side":...|
| 5 |{"message_id": "691:0:0:4", "partition": 0, "topic": "...| "account": "ABC123", "symbol": "ZTEST", "side":...|
+------+---------------------------------------------------------+---------------------------------------------------|
After schema detection and evolution is enabled, the table contains the columns that match the user-defined schema. The
table can also automatically evolve to support the structure of new data loaded by the connector.
+------+---------------------------------------------------------+---------+--------+-------+----------+
| Row | RECORD_METADATA | ACCOUNT | SYMBOL | SIDE | QUANTITY |
|------+---------------------------------------------------------+---------+--------+-------+----------|
| 1 |{"message_id": "691:0:0:0", "partition": 0, "topic": "...| ABC123 | ZTEST | BUY | 3572 |
| 2 |{"message_id": "691:0:0:1", "partition": 0, "topic": "...| XYZ789 | ZABZX | SELL | 3024 |
| 3 |{"message_id": "691:0:0:2", "partition": 0, "topic": "...| XYZ789 | ZTEST | SELL | 799 |
| 4 |{"message_id": "691:0:0:3", "partition": 0, "topic": "...| ABC123 | ZABZX | BUY | 2033 |
| 5 |{"message_id": "691:0:0:4", "partition": 0, "topic": "...| ABC123 | ZTEST | BUY | 1558 |
+------+---------------------------------------------------------+---------+--------+-------+----------|
Ingestion Latency
There are two configuration parameters determining the ingestion latency of the Snowflake streaming sink connector:
maxClientLag: This parameter specifies how often the Snowflake Ingest SDK flushes data to Snowflake, in seconds. If
you find the delay for the connector to load data to the Snowflake table too long,
you can reduce this value. However, setting it too low will result in more frequent data flush operations and result
in a worse query performance. For more instruction for configuring the maxClientLag, please
see Latency recommendations¶.
- Specify it to 0 or leave it empty to let the connector decide the value based on the table type:
- For the standard snowflake table, the default value is 1 second.
- For the Iceberg table, the default value is 30 seconds.
checkCommittedMessageIntervalMs: This parameter specifies how often the connector checks for committed messages and
acknowledge them in the Pulsar, in milliseconds. The default value is 1 second.
Data sink workflow
The diagram below illustrates the data sink flow for the Pulsar Snowflake Streaming connector:
- The connector subscribes to multiple user-specified topics and consumes messages from them.
- It filters messages by topic-partition and forwards them to the corresponding TopicPartitionChannel. Each
TopicPartitionChannel corresponds to a topic-partition. The connector creates a Snowflake Streaming Ingest channel
for
each TopicPartitionChannel to ingest data into Snowflake. For example, if there is a partitioned topic
test-topic
with 3 partitions, the connector will create 3 TopicPartitionChannels: test-topic-partition-0,
test-topic-partition-1, and test-topic-partition-2.
- Within each TopicPartitionChannel, the RecordService converts messages into Snowflake records and inserts them into
the Snowflake Streaming Ingest channel, which then ingests the data into Snowflake.
Please note that this workflow requires message order to be maintained, ensuring that a topic-partition’s
TopicPartitionChannel exists only in one connector instance. Therefore, the connector must have the retain ordering
configuration enabled. If you create the connector on SNCloud, retain ordering is enabled by default and cannot be
disabled.
For more information for how Snowflake Channel works, please
see Snowflake Channel.
Exactly-once semantics
Currently, the Snowflake streaming sink connector supports the exactly-once delivery guarantee semantic.
Exactly-once semantics ensure the delivery of Pulsar messages without duplication or data loss.
The connector uses a one-to-one mapping between partitions and channels, utilizing the Message ID and the Snowflake
Channel Offset Token to achieve exactly-once semantics. For more information,
see Snowflake Offset Token.
The connector uses the Message ID as the offset token. For instance, the message ID 0:4:1:3 (Ledger ID: 0; Entry ID:
4; Partition Index: 1; Batch Index: 3) is converted to the offset token 0:4:1, discarding the partition index. This
offset token is used each time messages are inserted into Snowflake. When the Snowflake SDK flushes the data, the last
committed offset token of the channel is updated, and the connector retrieves it from the SDK.
With retainOrdering enabled, the connector ensures that messages in each TopicPartitionChannel are received in order
without skips.
When opening each TopicPartitionChannel, the Snowflake Streaming Ingest channel is opened, retrieving the last committed
offset token, which is stored as committedMessageId. Another variable, processedMessageId, is updated after each
data insert. The connector discards all messages before the current processedMessageId. It periodically checks the
last committed offset token and updates lastCommittedMessageId, acknowledging all committed messages if
lastCommittedMessageId changes.
Here is an example diagram to show how it works:
Suppose the current processMessageId and the lastCommittedMessageId are both 0:4:0.
A new record arrives, and since its message ID is greater than processedMessageId, the connector inserts it into the
Snowflake channel and updates processedMessageId to 0:4:1.:
However, now there is a duplicated message 0:4:0 comes in again, it will happen due to the consumer redelivery or some
retry operations. The connector discards it as its message id is less than processedMessageId.
The snowflake channel will flush the data to the snowflake table, and the last committed offset token will be updated to
0:4:2. And a dedicated timer will periodically check the last committed offset token and update the
lastCommittedMessageId. The periodicity is determined by the configuration checkCommittedMessageIntervalMs.
Once the Snowflake channel flushes data, the last committed offset token updates to 0:4:2. A timer periodically checks
this token, updating lastCommittedMessageId. If it changes, the connector acknowledges all messages before
lastCommittedMessageId using the cumulative acknowledgment:
Schema
Supported Pulsar Schema
The snowflake streaming sink connector supports converting some of Pulsar schemas, as listed in the following table.
| Pulsar Schema | Supported |
|---|
| AVRO | Yes |
| JSON | Yes |
| PRIMITIVE | Yes |
| KEY_VALUE | No |
| PROTOBUF | No |
| PROTOBUF_NATIVE | No |
If you don’t enable the schema detection and evolution, the Snowflake table loaded by the Pulsar connector contains only
two VARIANT columns: RECORD_CONTENT and RECORD_METADATA.
RECORD_CONTENT: The content of the message. The type of the content is
VARIANT.
RECORD_METADATA: The metadata of the message. The type of the metadata is
VARIANT..
If Snowflake creates the table, then the table contains only these two columns. If the user creates the table for the
Connector to add rows to, then the table can contain more than these two columns (any additional columns must allow NULL
values because data from the connector does not include values for those columns).
You can also configure the metadataFields to specify the metadata fields for each Snowflake record. The default value
is message_id,partition,topic,publish_time. And the supported fields are schema_version, partition, event_time,
publish_time, message_id, sequence_id, producer_name, topic.
If enableSchematization is true and the message is a primitive type, the connector will store the message in the
RECORD_CONTENT column.
Standard snowflake table type mapping
If the schema detection and evolution are enabled, the connector will create and alter the table based on the avro
schema type of the message’s field. The following table shows the mapping between the field’s schema and the Snowflake
table.
| Field Avro Schema | Snowflake Table Type |
|---|
| INT | INT |
| LONG | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| BOOLEAN | BOOLEAN |
| STRING | VARCHAR |
| BYTES | BINARY |
| ARRAY | ARRAY |
| ENUM | VARIANT |
| UNION | VARIANT |
| RECORD | VARIANT |
Here is the mapping for the Avro logical type:
| Logical Type | Snowflake Table Type |
|---|
decimal | VARCHAR |
date | DATE |
time-millis, time-micros | TIME(6) |
timestamp-millis,timestamp-micros | TIMESTAMP(6) |
local-timestamp-millis | TIMESTAMP_LTZ |
local-timestamp-micros | TIMESTAMP_LTZ(6) |
Other types are not supported and will throw an exception.
Iceberg table type mapping
If the icebergEnabled is true and schema detection and evolution are enabled, the connector will alter the Iceberg
table based on the avro schema type of the message’s field. The following table shows the mapping between the field’s
schema and the Snowflake table.
| Field Avro Schema | Snowflake Table Type |
|---|
| INT | INT |
| LONG | LONG |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| BOOLEAN | BOOLEAN |
| STRING | VARCHAR |
| BYTES | BINARY |
The Iceberg table doesn’t support nested types like ARRAY, RECORD, and UNION.
Here is the mapping for the Avro logical type:
| Logical Type | Snowflake Table Type |
|---|
decimal | VARCHAR |
date | DATE |
time-millis, time-micros | TIME(6) |
timestamp-millis,timestamp-micros | TIMESTAMP(6) |
local-timestamp-millis | TIMESTAMP_LTZ |
local-timestamp-micros | TIMESTAMP_LTZ(6) |
Other types are not supported and will throw an exception.
Fault tolerance
The connector can guarantee messages are neither duplicated nor silently dropped. If there are any errors during the
messages processing, the connector will simply get restarted and reprocess the messages from the last committed
messages.
Sink data into the Iceberg table
The Snowflake streaming sink connector supports sinking data into the Iceberg table. To enable the Iceberg table format,
set the icebergEnabled configuration to true.
The connector cannot create Iceberg tables for you, so you must create them manually.
You can follow this
tutorial Create your first Apache Iceberg™ table
to create an Iceberg table.
When icebergEnabled is set to true, the connector will automatically update the Iceberg table based on the Avro
schema of the message fields. If not, you need to manually update the table to match the Pulsar Avro Schema Type.
For the schema mapping for the Iceberg table, please refer to
the Iceberg table type mapping.
Here is a simple example to create an Iceberg table:
create or replace ICEBERG TABLE TEST_DB.TEST_SCHEMA.ICEBERG_TABLE (
RECORD_METADATA OBJECT()
)
EXTERNAL_VOLUME = 'ICEBERG_EXTERNAL_VOLUME'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'ICEBERG_TABLE/';
And please make sure the Iceberg table has the ENABLE_SCHEMA_EVOLUTION enabled if you wants the connector to
automatically evolve the schema. You can set it by running the following SQL command:
ALTER ICEBERG TABLE TEST_DB.TEST_SCHEMA.ICEBERG_TABLE SET ENABLE_SCHEMA_EVOLUTION = TRUE;
If you want to manually alter the Iceberg table, you can refer to the following example:
CREATE OR REPLACE ICEBERG TABLE TABLE TEST_DB.TEST_SCHEMA.ICEBERG_TABLE (
RECORD_METADATA OBJECT(partition INT, topic STRING, key STRING, schema_version INT, event_time DATE, publish_time DATE, message_id STRING, sequence INT, producer_name STRING),
RECORD_CONTENT OBJECT (
name STRING,
age INTEGER
)
)
CATALOG = 'SNOWFLAKE'
EXTERNAL_VOLUME = 'ICEBERG_EXTERNAL_VOLUME'
ENABLE_SCHEMA_EVOLUTION = 'true
BASE_LOCATION = 'ICEBERG_TABLE';
Remember to create the RECORD_METADATA column and match the types as shown in the example.