sink
Snowflake Sink Connector

Available on
StreamNative Cloud console

Authored by
danpi,RobertIndie,dependabot[bot],streamnativebot
Support type
streamnative
License
StreamNative, Inc.. All Rights Reserved

The Snowflake sink connector pulls data from Pulsar topics and persists data to Snowflake.

Features

This section describes the features of the Snowflake sink connector. For details about how to configure these features, see how to configure.

Delivery guarantees

The Pulsar IO connector framework provides three delivery guarantees: at-most-once, at-least-once, and effectively-once.

Currently, the Snowflake sink connector only supports the at-least-once delivery guarantee semantic.

Table name mapping

The Snowflake sink connector supports automatically creating a table when the table does not exist. You can configure the following options:

autoCreateTable=true

The Snowflake sink connector allows you to specify the mapping relationship between topics and tables. Each topic and its mapped table name should be separated by a colon. The parameter topic2table see example below:

topic2table=topic1:table1,topic2:table2

Metadata Fields

There are two fields in the table: metadata and content. Metadata is ancillary information in content, such as topic, messageId, publishTime, and so on. By default, the following metadata fields of Pulsar will be created as the metadata:

metadataField=__message_id__,__partition__,__topic__,__event_time__

Note

Currently, the Snowflake sink connector does not support custom metadata.

Data format types

The snowflake connector supports converting some Pulsar schemas, as listed in the following table.

Pulsar SchemaSupported
AVROYes
PRIMITIVEYes
JSONYes
KEY_VALUENo
PROTOBUFNo
PROTOBUF_NATIVENo

Batch progress

To increase write throughput, you can configure the buffer size and latency for the Snowflake sink connector.

bufferCountRecords = 10_000
bufferSizeBytes = 5_000_000
bufferFlushTimeInSeconds = 120

How to get

This section describes how to get the Snowflake sink connector.

Work with Function Worker

You can get the Snowflake sink connector using one of the following methods if you use Pulsar Function Worker to run the connector in a cluster.

To build the Snowflake sink connector from the source code, follow these steps.

  1. Clone the source code to your machine.

    git clone https://github.com/streamnative/pulsar-io-snowflake
    
  2. Build the connector in the pulsar-io-snowflake directory.

    mvn clean install -DskipTests
    

    After the connector is successfully built, a NAR package is generated under the target directory.

    ls target
    pulsar-io-snowflake-2.11.4.3.nar
    

Work with Function Mesh

You can pull the Snowflake sink connector Docker image from the Docker Hub if you use Function Mesh to run the connector.

How to configure

Before using the Snowflake sink connector, you need to configure it. This table outlines the properties and the descriptions.

NameTypeRequiredDefaultDescription
userStringYes""(empty string)The user account name of the Snowflake service.
privateKeyStringYes""(empty string)The private key of the user.
hostStringYes""(empty string)The host URL of the snowflake service.
databaseStringYes""(empty string)The Snowflake database where the connector will sink data.
schemaStringYes""(empty string)The Snowflake schema belongs to the level below the Snowflake database and consists of a set of tables.
tableNameStringNo""(empty string)If the autoCreateTable option is set to false, the Snowflake connector will persist messages to this table.
warehouseStringNo""(empty string)The warehouse name in the snowflake. By default, no warehouse name is set.
bufferCountRecordsintNo10_000The number of records that are buffered in the memory before they are ingested to Snowflake. By default, it is set to 10_000.
bufferSizeBytesintNo5_000_000The cumulative size (in units of bytes) of the records that are buffered in the memory before they are ingested in Snowflake as data files. By default, it is set to 5_000_000 (5 MB).
bufferFlushTimeInSecondsintNo60The number of seconds between buffer flushes, where the flush is from the Pulsar’s memory cache to the internal stage. By default, it is set to 60 seconds.
autoCreateTablebooleanNofalseAutomatically create a table when the table does not exist.
processingGuaranteesStringNo"ATLEAST_ONCE"Specify the processing guarantee semantics. Currently, the Snowflake connector only supports ATLEAST_ONCE processing guarantee semantics.
topic2tableStringNo""(empty string)Specify the mapping relationship between topics and tables. Each topic and the mapped table name should be separated by a colon, such as topic1:table1,topic2:table2.
metadataFieldStringNo"message_id,partition,topic,event_time"The metadata fields for each snowflake record. You can separate multiple fields with commas. The supported metadata fields are: schema_version , partition , event_time , publish_time , message_id , sequence_id , producer_name , topic. Currently, the Snowflake sink connector currently does not support custom metadata.

Work with Function Worker

You can create a configuration file (JSON or YAML) to set the properties if you use Pulsar Function Worker to run connectors in a cluster.

Example

  • JSON

     {
         "tenant": "public",
         "namespace": "default",
         "name": "snowflake-sink",
         "archive": "connectors/pulsar-io-snowflake-2.11.4.3.nar",
         "inputs": [
           "test-snowflake-pulsar"
         ],
         "parallelism": 1,
         "retainOrdering": true,
         "processingGuarantees": "ATLEAST_ONCE",
         "sourceSubscriptionName": "sf_sink_sub",
         "configs": {
           "user": "TEST",
           "host": "ry77682.us-central1.gcp.snowflakecomputing.com:443",
           "schema": "DEMO",
           "warehouse": "SNDEV",
           "database": "TESTDB",
           "privateKey": "SECRETS"
       }
     }
    
  • YAML

    tenant: public
    namespace: default
    name: snowflake-sink
    parallelism: 1
    inputs:
      - test-snowflake-pulsar
    archive: connectors/pulsar-io-snowflake-2.11.4.3.nar
    sourceSubscriptionName: sf_sink_sub
    retainOrdering: true
    processingGuarantees: ATLEAST_ONCE
    configs:
      user: TEST
      host: ry77682.us-central1.gcp.snowflakecomputing.com:443
      schema: DEMO
      warehouse: SNDEV
      database: TESTDB
      privateKey: SECRETS
    

Work with Function Mesh

You can create a CustomResourceDefinitions (CRD) to create a Snowflake sink connector. Using CRD makes Function Mesh naturally integrate with the Kubernetes ecosystem. For more information about Pulsar sink CRD configurations, see sink CRD configurations.

You can define a CRD file (YAML) to set the properties as below.

apiVersion: compute.functionmesh.io/v1alpha1
kind: Sink
metadata:
  name: snowflake-sink-sample
spec:
  image: streamnative/pulsar-io-snowflake:2.11.4.3
  replicas: 1
  maxReplicas: 1
  retainOrdering: true
  input:
    topics: 
      - persistent://public/default/test-snowflake-pulsar
  sinkConfig:
    user: TEST
    host: ry77682.us-central1.gcp.snowflakecomputing.com:443
    schema: DEMO
    warehouse: SNDEV
    database: TESTDB
    privateKey: SECRETS
  pulsar:
    pulsarConfig: "test-pulsar-sink-config"
  resources:
    limits:
      cpu: "0.2"
      memory: 1.1G
    requests:
      cpu: "0.1"
      memory: 1G
  java:
    jar: connectors/pulsar-io-snowflake-2.11.4.3.nar
  clusterName: test-pulsar
  autoAck: false

How to use

You can use the Snowflake sink connector with Function Worker or Function Mesh.

Work with Function Worker

You can use the Snowflake sink connector as a standalone connector or a built-in connector as below.

Use it as standalone connector

If you already have a Pulsar cluster, you can use the Snowflake sink connector as a standalone connector directly.

This example shows how to create a Snowflake sink connector on a Pulsar cluster using the pulsar-admin sinks create command.

PULSAR_HOME/bin/pulsar-admin sinks create \
--sink-config-file <snowflake-sink-config.yaml >

Use it as built-in connector

You can make the Snowflake sink connector as a built-in one and use it on a standalone cluster.

This example describes how to use the Snowflake sink connector to fetch data from Pulsar topics and save data to Snowflake tables in standalone mode.

Prerequisites

Steps

  1. Copy the NAR package to the Pulsar connectors directory.

    cp pulsar-io-snowflake-2.11.4.3.nar PULSAR_HOME/connectors/pulsar-io-snowflake-2.11.4.3.nar
    
  2. Start a Pulsar cluster in standalone mode.

    PULSAR_HOME/bin/pulsar standalone
    
  3. Run the Snowflake sink connector locally.

    PULSAR_HOME/bin/pulsar-admin sinks localrun \
    --sink-config-file <snowflake-sink-config.yaml>
    

    Or, you can create a connector for the Pulsar cluster.

    PULSAR_HOME/bin/pulsar-admin sinks create \
    --sink-config-file <snowflake-sink-config.yaml>
    
  4. Send messages to a Pulsar topic.

    This example sends ten “hello” messages to the test-snowflake-pulsar topic in the default namespace of the public tenant.

    PULSAR_HOME/bin/pulsar-client produce public/default/test-snowflake-pulsar --messages hello -n 10
    
  5. Query the data from the Snowflake table. For details, see Snowflake Quick Tour.

Work with Function Mesh

This example describes how to create a Snowflake sink connector for a Kubernetes cluster using Function Mesh.

Prerequisites

Step

  1. Define the Snowflake sink connector with a YAML file and save it as sink-sample.yaml.

    This example shows how to publish the Snowflake sink connector to Function Mesh with a Docker image.

     apiVersion: compute.functionmesh.io/v1alpha1
     kind: Sink
     metadata:
       name: snowflake-sink-sample
     spec:
       image: streamnative/pulsar-io-snowflake:2.11.4.3
       className: org.apache.pulsar.ecosystem.io.snowflake.SnowflakeSinkConnector
       replicas: 1
       maxReplicas: 1
       retainOrdering: true
       input:
         topics: 
           - persistent://public/default/test-snowflake-pulsar
       sinkConfig:
          user: TEST
          host: ry77682.us-central1.gcp.snowflakecomputing.com:443
          schema: DEMO
          warehouse: SNDEV
          database: TESTDB
          privateKey: SECRETS
       pulsar:
         pulsarConfig: "test-pulsar-sink-config"
       resources:
         limits:
          cpu: "0.2"
          memory: 1.1G
         requests:
          cpu: "0.1"
          memory: 1G
       java:
         jar: connectors/pulsar-io-snowflake-2.11.4.3.nar
       clusterName: test-pulsar
       autoAck: false
    
  2. Apply the YAML file to create the Snowflake sink connector.

    Input

    kubectl apply -f <path-to-sink-sample.yaml>
    

    Output

    sink.compute.functionmesh.io/snowflake-sink-sample created
    
  3. Check whether the Snowflake sink connector is created successfully.

    Input

    kubectl get all
    

    Output

    NAME                                         READY   STATUS      RESTARTS   AGE
    pod/snowflake-sink-sample-0                   1/1    Running     0          77s
    

    After that, you can produce and consume messages using the Snowflake sink connector between Pulsar and Snowflake.