sink
Cloud Storage Sink
The Cloud Storage sink connector pulls messages from Pulsar topics and persists messages to Cloud Storage.

Available on
StreamNative Cloud console

Authored by
ASF
Support type
StreamNative
License
Apache License 2.0

Overview

The Cloud Storage sink connector periodically polls data from Pulsar and in turn moves it to objects in cloud storage (AWS S3, Google GCS, etc.) in either Avro, JSON, or Parquet formats without duplicates. Depending on your environment, the Cloud Storage sink connector can export data by guaranteeing exactly-once delivery semantics to its consumers.

The Cloud Storage sink connector provides partitioners that support default partitioning based on Pulsar partitions and time-based partitioning in days or hours. A partitioner is used to split the data of every Pulsar partition into chunks. Each chunk of data acts as an object whose virtual path encodes the Pulsar partition and the start offset of this data chunk. The size of each data chunk is determined by the number of records written to objects in cloud storage and by schema compatibility. If no partitioner is specified in the configuration, the default partitioner, which preserves Pulsar partitioning, is used.

The Cloud Storage sink connector provides the following features:

  • Ensure exactly-once delivery. Records, which are exported using a deterministic partitioner, are delivered with exactly-once semantics regardless of the eventual consistency of cloud storage.
  • Support data formats with or without a Schema. The Cloud Storage sink connector supports writing data to objects in cloud storage in either Avro, JSON, or Parquet format. Generally, the Cloud Storage sink connector may accept any data format that provides an implementation of the Format interface.
  • Support time-based partitioner. The Cloud Storage sink connector supports the TimeBasedPartitioner class based on the publishTime timestamp of Pulsar messages. Time-based partitioning options are daily or hourly.
  • Support more kinds of object storage. The Cloud Storage sink connector uses jclouds as an implementation of cloud storage. You can use the JAR package of the jclouds object storage to connect to more types of object storage. If you need to customize credentials, you can register ʻorg.apache.pulsar.io.jcloud.credential.JcloudsCredential` via the Service Provider Interface (SPI).

Installation

Download the NAR file of the Cloud Storage sink connector and then you can use the connector with Pulsar IO. For details, see Usage.

In addition, you can also build the Cloud Storage sink connector from the source code. For details, see here.

Configuration

The Cloud Storage sink connector supports the following properties.

Cloud Storage sink connector configuration

NameTypeRequiredDefaultDescription
providerStringTruenullThe Cloud Storage type, such as aws-s3,gcs.
accessKeyIdStringTruenullThe Cloud Storage access key ID.
secretAccessKeyStringTruenullThe Cloud Storage secret access key.
roleStringFalsenullThe Cloud Storage role.
roleSessionNameStringFalsenullThe Cloud Storage role session name.
bucketStringTruenullThe Cloud Storage bucket.
endpointStringFalsenullThe Cloud Storage endpoint.
formatTypeStringFalse"json"The data format type. Available options are JSON, Avro, or Parquet. By default, it is set to JSON.
partitionerTypeStringFalse"partition"The partitioning type. It can be configured by topic partitions or by time. By default, the partition type is configured by topic partitions.
timePartitionPatternStringFalse"yyyy-MM-dd"The format pattern of the time-based partitioning. For details, refer to the Java date and time format.
timePartitionDurationStringFalse"1d"The time interval for time-based partitioning, such as 1d, or 1h.
batchSizeintFalse10The number of records submitted in batch.
batchTimeMslongFalse1000The interval for batch submission.

Configure Cloud Storage sink connector

Before using the Cloud Storage sink connector, you need to create a configuration file through one of the following methods.

  • JSON

    {
       "tenant": "public",
       "namespace": "default",
       "name": "cloud-storage-sink",
       "inputs": [
          "user-avro-topic"
       ],
       "archive": "connectors/pulsar-io-cloud-storage-0.0.1.nar",
       "parallelism": 1,
       "configs": {
          "provider": "aws-s3",
          "accessKeyId": "accessKeyId",
          "secretAccessKey": "secretAccessKey",
          "role": "none",
          "roleSessionName": "none",
          "bucket": "testBucket",
          "region": "local",
          "endpoint": "us-standard",
          "formatType": "parquet",
          "partitionerType": "time",
          "timePartitionPattern": "yyyy-MM-dd",
          "timePartitionDuration": "1d",
          "batchSize": 10,
          "batchTimeMs": 1000
       }
    }
    
  • YAML

    tenant: "public"
    namespace: "default"
    name: "cloud-storage-sink"
    inputs: 
      - "user-avro-topic"
    archive: "connectors/pulsar-io-cloud-storage-0.0.1.nar"
    parallelism: 1
    
    configs:
      provider: "aws-s3",
      accessKeyId: "accessKeyId"
      secretAccessKey: "secretAccessKey"
      role: "none"
      roleSessionName: "none"
      bucket: "testBucket"
      region: "local"
      endpoint: "us-standard"
      formatType: "parquet"
      partitionerType: "time"
      timePartitionPattern: "yyyy-MM-dd"
      timePartitionDuration: "1d"
      batchSize: 10
      batchTimeMs: 1000
    

Usage

  1. Prepare the Cloud Storage service. In this example, we use s3mock as an example.

    docker pull apachepulsar/s3mock:latest
    docker run -p 9090:9090 -e initialBuckets=pulsar-integtest apachepulsar/s3mock:latest
    
  2. Put the pulsar-io-cloud-storage-2.5.1.nar in the Pulsar connector catalog.

    cp pulsar-io-cloud-storage-2.5.1.nar $PULSAR_HOME/connectors/pulsar-io-cloud-storage-2.5.1.nar
    
  3. Start Pulsar in the standalone mode.

    $PULSAR_HOME/bin/pulsar standalone
    
  4. Run the Cloud Storage sink connector locally.

    $PULSAR_HOME/bin/pulsar-admin sink localrun --sink-config-file cloud-storage-sink-config.yaml
    
  5. Send Pulsar messages. Currently, only Avro or JSON schema is supported.

      try (
                 PulsarClient pulsarClient = PulsarClient.builder()
                         .serviceUrl("pulsar://localhost:6650")
                         .build();
                 Producer<TestRecord> producer = pulsarClient.newProducer(Schema.AVRO(TestRecord.class))
                         .topic("public/default/test-parquet-avro")
                         .create();
                 ) {
                 List<TestRecord> testRecords = Arrays.asList(
                         new TestRecord("key1", 1, null),
                         new TestRecord("key2", 1, new TestRecord.TestSubRecord("aaa"))
                 );
                 for (TestRecord record : testRecords) {
                     producer.send(record);
                 }
             }
    
  6. Validate Cloud Storage data.

    To get the path, you can use the jclould to verify the file, as shown below.

    Properties overrides = new Properties();
    overrides.put(“jclouds.s3.virtual-host-buckets”,false);
    BlobStoreContext blobStoreContext = ContextBuilder.newBuilder(“aws-s3”)
            .credentials(
                    “accessKeyId”,
                    “secretAccessKey”
            )
            .endpoint(“http://localhost:9090) // replace to s3mock url
            .overrides(overrides)
            .buildView(BlobStoreContext.class);
    BlobStore blobStore = blobStoreContext.getBlobStore();
    final long sequenceId = FunctionCommon.getSequenceId(message.getMessageId());
    final String path = “public/default/test-parquet-avro” + File.separator +2020-09-14" + File.separator + sequenceId +.parquet”;
    final boolean blobExists = blobStore.blobExists(“testBucket”, path);
    Assert.assertTrue(“the sink record does not exist”, blobExists);
    

    You can find the data in your testBucket Bucket. The path is something like public/default/test-parquet-avro/2020-09-14/1234.parquet. The path consists of three parts, the basic part of the topic, partition information, and format suffix.

    • Basic part of topic: public/default/test-parquet-avro/ This part consists of the tenant, namespace, and topic name of the input topic.
    • Partition information: 2020-09-14/${messageSequenceId} The date is generated based on the partitionerType parameter in the configuration. And the ${messageSequenceId} is generated by FunctionCommon.getSequenceId(message.getMessageId()).
    • Format suffix: .parquet This part is generated based on the formatType parameter in the configuration.