1. Reference
  2. Connector Guide

Cloud Storage Sink

The Cloud Storage Sink supports exporting data from Pulsar topics to cloud storage (such as AWS S3 and Google GCS) in Avro, JSON, Parquet, or other formats. In accordance with your environment, the Cloud Storage Sink can guarantee exactly-once support for data exported to cloud storage.

Features

The Cloud Storage Sink provides the following features:

  • Exactly-once Delivery: Records exported using a deterministic partitioner are delivered with exactly-once semantics.
  • Output Formats: Records can be exported in Bytes, JSON, AVRO, or Parquet format.
  • Partitioning: The connector supports partitioning by time or by topic name. Time-based partitioning options range in precision from days to seconds.
  • Flushing: The connector supports flushing based on time and size. This means the connector uploads outstanding records after a fixed interval, even if the size limit has not been reached.

For more details, see the Cloud Storage Sink documentation on the StreamNative Hub website.

Limitations

To learn about the limitations regarding Pulsar schema and output data types, see the Cloud Storage Sink documentation on the StreamNative Hub website.

Quick Start

This section describes how to create a Cloud Storage Sink using the following options: the StreamNative Cloud Console, pulsarctl CLI tool, pulsar-admin CLI tool, and REST API. The following example shows how to export records from a Pulsar topic to AWS S3. For details about exporting data to other cloud storage providers, see the Cloud Storage Sink documentation on the StreamNative Hub website.

Prerequisites

When using AWS S3, it is recommended to configure the following permission policies:

  • s3:AbortMultipartUpload
  • s3:GetObject*
  • s3:PutObject*
  • s3:List*

If you do not want to provide the region configuration, you should enable the s3:GetBucketLocation permission policy.

Using the StreamNative Cloud Console

This section describes how to create a Cloud Storage Sink using the StreamNative Cloud Console.

Step 1: Launch your Pulsar cluster

For setup instructions, see work with clusters.

Step 2: Add your connector

  1. In the left navigation menu, click Connectors.
  2. Select the Created Sinks tab.
  3. Click Create a Sink and select the Cloud Storage Sink.

Step 3: Configure your connector

At the Create Cloud Storage Sink screen, enter your desired configuration. The following is a basic example. For a full list of configuration properties, see the configuration properties section.

FieldValue
Service AccountSelect the Service Account that you want to use to create the connector.
Sink namemy-cloud-storage-sink
Input Topic - TenantSelect the tenant for the input topics.
Input Topic - NamespaceSelect the namespace for the input topics.
Input Topic - TopicSelect the input topics.
Provideraws-s3
Access Key IdEnter your AWS Access Key ID. For details about how to get the Access Key ID, see the AWS General Reference.
Secret Access KeyEnter your AWS Secret Access key. For details about how to get the Secret Access Key, see the AWS General Reference.
Endpointhttps://s3.us-east-1.amazonaws.com
Time Partition PatternMM-dd (This will create a new output folder for each day.)
BucketEnter your AWS bucket name.
Regionus-east-1

Step 4: Launch your connector

Click SUBMIT to launch your connector.

Using the pulsarctl CLI

This section describes how to create a Cloud Storage Sink using the pulsarctl CLI tool.

Step 1: Set up pulsarctl

  1. Install the pulsarctl CLI tool. For details, see the pulsarctl repository.
  2. Connect to your Pulsar cluster. For details, see connect to cluster through pulsarctl.

Step 2: Create the connector configuration file

Create a YAML file that contains the connector configuration properties. The following is a basic example:

tenant: "public"
namespace: "default"
name: "my-cloud-storage-sink"
inputs:
  - "my-input-topic"
sink-type: "cloud-storage"

configs:
  provider: "aws-s3"
  accessKeyId: "<your-access-key-id>"
  secretAccessKey: "<your-secret-access-key>"
  bucket: "<your-bucket-name>"
  region: "us-east-1"
  endpoint: "https://s3.us-east-1.amazonaws.com"
  formatType: "json"
  partitionerType: "time"
  timePartitionPattern: "MM-dd"
  timePartitionDuration: "1d"
  batchSize: 10
  batchTimeMs: 1000

Step 3: Launch your connector

Run the following command to load the configuration and start the connector.

pulsarctl sinks create --sink-config-file <file-name>.yaml

You should see the following output:

Created successfully

Using the pulsar-admin CLI

This section describes how to create a Cloud Storage Sink using the pulsar-admin CLI tool.

Step 1: Set up pulsar-admin

  1. Download the Pulsar binary from the Apache Pulsar Download site.

  2. Extract the binary and navigate to the target directory.

    tar xzvf apache-pulsar-VERSION-bin.tar.gz
    cd /apache-pulsar-VERSION/bin/
    
  3. Connect to your Pulsar cluster. For details, see connect to cluster through pulsar-admin.

Step 2: Create the connector configuration file

Create a YAML file that contains the connector configuration properties. The following is a basic example:

tenant: "public"
namespace: "default"
name: "my-cloud-storage-sink"
inputs:
  - "my-input-topic"
sink-type: "cloud-storage"

configs:
  provider: "aws-s3"
  accessKeyId: "<your-access-key-id>"
  secretAccessKey: "<your-secret-access-key>"
  bucket: "<your-bucket-name>"
  region: "us-east-1"
  endpoint: "https://s3.us-east-1.amazonaws.com"
  formatType: "json"
  partitionerType: "time"
  timePartitionPattern: "MM-dd"
  timePartitionDuration: "1d"
  batchSize: 10
  batchTimeMs: 1000

Step 3: Launch your connector

Enter the following command to load the configuration and start the connector.

./bin/pulsar-admin sinks create --sink-config-file <file-name>.yaml

You should see the following output:

Created successfully

Using the REST API

This section describes how to create a Cloud Storage Sink using the REST API.

Step 1: Setup

To use the REST API, you have to authenticate by token.

  1. Visit the StreamNative Cloud Console.
  2. On the left navigation pane, click Service Accounts.
  3. In the row of the service account you want to use, in the Token column, click Generate new token, then click expires in 7 days to copy the token to your clipboard.

Step 2: Launch your connector

To launch your connector, submit a POST request to pass your connector configuration as shown below. To learn how to retrieve YOUR-HTTP-SERVICE-URL, see get a service URL.

curl -X "POST" "<YOUR-HTTP-SERVICE-URL>/admin/v3/sinks/public/default" \
    -H "Accept: application/json" \
    -H "Authorization: Bearer <YOUR-TOKEN>" \
    -H "Content-Type: multipart/form-data; charset=utf-8; boundary=__X_PAW_BOUNDARY__" \
    -F "sinkConfig={\"tenant\":\"public\",\"namespace\":\"default\",\"name\":\"my-cloud-storage-sink\",\"inputs\":[\"my-input-topic\"],\"sinkType\":\"cloud-storage\",\"configs\":{\"provider\":\"aws-s3\",\"accessKeyId\":\"<YOUR-ACCESS-KEY-ID>\",\"secretAccessKey\":\"<YOUR-SECRET-ACCESS-KEY>\",\"bucket\":\"<YOUR-BUCKET-NAME>\",\"region\":\"us-east-1\",\"endpoint\":\"https:\/\/s3.us-east-1.amazonaws.com\",\"formatType\":\"json\",\"partitionerType\":\"time\",\"timePartitionPattern\":\"MM-dd\",\"timePartitionDuration\":\"1d\",\"batchSize\":10,\"batchTimeMs\":1000}};type=application/json" -vvv

Configuration Properties

For a full list of configuration properties, see the Cloud Storage Sink documentation on the StreamNative Hub website.

Previous
Monitor connectors