offloader
GCS offloader
Use GCS offloader with Pulsar
Authored by
ASF
Support type
StreamNative
License
Apache License 2.0

Overview

To deliver an event streaming service, Pulsar needs to manage large numbers of messages and data in real-time, and this requires keeping large amounts of data on the platform, or readily accessible. As the data amount increases, it becomes significantly more expensive to store, manage, and retrieve data, so administrators and developers look to external stores for long-term storage.

Pulsar leverages a unique tiered storage solution that addresses some of these key challenges faced by other distributed log systems. This tiered storage solution extends the storage capabilities of Pulsar by offloading data from Apache BookKeeper to scalable cloud-native storage or filesystems without adding storage. Older topic data can be offloaded to long-term storage that readily scales with the volume of data.

In this way, on the one hand, tiered storage is much cheaper than the storage in Pulsar clusters; on the other hand, there is no perceivable difference in consuming a topic no matter whether data is stored on tiered storage or on Pulsar clusters. They produce and consume messages in exactly the same way.

Additionally, Pulsar is able to retain both historic and real-time data and provides a unified view as infinite event streams, which can be easily reprocessed or backloaded into new systems. You can integrate Pulsar with a unified data processing engine (such as Apache Flink or Apache Spark) to unlock many new use cases stemming from infinite data retention.

Installation

Follow the steps below to install the GCS offloader.

Prerequisite

  • Apache jclouds: 2.2.0 or later versions

Step

  1. Download Pulsar tarball using one of the following ways:

    • download the Pulsar tarball from the Apache mirror

    • download from the Pulsar download page

    • use wget

      wget https://archive.apache.org/dist/pulsar/pulsar-2.5.1/apache-pulsar-2.5.1-bin.tar.gz
      
  2. Download and untar the Pulsar offloaders package.

    wget https://downloads.apache.org/pulsar/pulsar-2.5.1/apache-pulsar-offloaders-2.5.1-bin.tar.gz
    
    tar xvfz apache-pulsar-offloaders-2.5.1-bin.tar.gz
    

    Note

    • If you are running Pulsar in a bare metal cluster, make sure that offloaders tarball is unzipped in every broker's Pulsar directory.

    • If you are running Pulsar in Docker or deploying Pulsar using a Docker image (such as K8S and DCOS), you can use the apachepulsar/pulsar-all image instead of the apachepulsar/pulsar image. apachepulsar/pulsar-all image has already bundled tiered storage offloaders.

  3. Copy the Pulsar offloaders as offloaders in the Pulsar directory.

    mv apache-pulsar-offloaders-2.5.1/offloaders apache-pulsar-2.5.1/offloaders
    
    ls offloaders
    

    Output

    As shown in the output, Pulsar uses Apache jclouds to support GCS and AWS S3 for long term storage.

    tiered-storage-file-system-2.5.1.nar
    tiered-storage-jcloud-2.5.1.nar
    

Configuration

Note

Before offloading data from BookKeeper to GCS, you need to configure some properties of the GCS offloader driver.

Besides, you can also configure the GCS offloader to run it automatically or trigger it manually.

Configure GCS offloader driver

You can configure GCS offloader driver in the configuration file broker.conf or standalone.conf.

  • Required configurations are as below.

    Required configurationDescriptionExample value
    managedLedgerOffloadDriverOffloader driver name, which is case-insensitive.google-cloud-storage
    offloadersDirectoryOffloader directoryoffloaders
    gcsManagedLedgerOffloadBucketBucketpulsar-topic-offload
    gcsManagedLedgerOffloadRegionBucket regioneurope-west3
    gcsManagedLedgerOffloadServiceAccountKeyFileAuthentication/Users/user-name/Downloads/project-804d5e6a6f33.json
  • Optional configurations are as below.

    Optional configurationDescriptionExample value
    gcsManagedLedgerOffloadReadBufferSizeInBytesSize of block read1 MB
    gcsManagedLedgerOffloadMaxBlockSizeInBytesSize of block write64 MB
    managedLedgerMinLedgerRolloverTimeMinutesMinimum time between ledger rollover for a topic.2
    managedLedgerMaxEntriesPerLedgerMax number of entries to append to a ledger before triggering a rollover.5000

Bucket (required)

A bucket is a basic container that holds your data. Everything you store in GCS must be contained in a bucket. You can use a bucket to organize your data and control access to your data, but unlike directory and folder, you can not nest a bucket.

Example

This example names the bucket as pulsar-topic-offload.

gcsManagedLedgerOffloadBucket=pulsar-topic-offload

Bucket region (required)

Bucket region is the region where a bucket is located. If a bucket region is not specified, the default region (us multi-regional location) is used.

Tip

For more information about bucket location, see here.

Example

This example sets the bucket region as europe-west3.

gcsManagedLedgerOffloadRegion=europe-west3

Authentication (required)

To enable a broker access GCS, you need to configure gcsManagedLedgerOffloadServiceAccountKeyFile in the configuration file broker.conf.

gcsManagedLedgerOffloadServiceAccountKeyFile is a JSON file, containing GCS credentials of a service account.

Example

To generate service account credentials or view the public credentials that you've already generated, follow the following steps.

  1. Navigate to the Service accounts page.

  2. Select a project or create a new one.

  3. Click Create service account.

  4. In the Create service account window, type a name for the service account and select Furnish a new private key.

    If you want to grant G Suite domain-wide authority to the service account, select Enable G Suite Domain-wide Delegation.

  5. Click Create.

    Note

    Make sure the service account you create has permission to operate GCS, you need to assign Storage Admin permission to your service account here.

  6. You can get the following information and set this in broker.conf.

    gcsManagedLedgerOffloadServiceAccountKeyFile="/Users/user-name/Downloads/project-804d5e6a6f33.json"
    

    Tip

    • For more information about how to create gcsManagedLedgerOffloadServiceAccountKeyFile, see here.

    • For more information about Google Cloud IAM, see here.

Size of block read/write

You can configure the size of a request sent to or read from GCS in the configuration file broker.conf.

ConfigurationDescription
gcsManagedLedgerOffloadReadBufferSizeInBytesBlock size for each individual read when reading back data from GCS.<br><br>The default value is 1 MB.
gcsManagedLedgerOffloadMaxBlockSizeInBytesMaximum size of a "part" sent during a multipart upload to GCS. <br><br>It can not be smaller than 5 MB. <br><br>The default value is 64 MB.

Configure GCS offloader to run automatically

Namespace policy can be configured to offload data automatically once a threshold is reached. The threshold is based on the size of data that a topic has stored on a Pulsar cluster. Once the topic reaches the threshold, an offload operation is triggered automatically.

Threshold valueAction

0 | It triggers the offloading operation if the topic storage reaches its threshold. = 0|It causes a broker to offload data as soon as possible. < 0 |It disables automatic offloading operation.

Automatic offloading runs when a new segment is added to a topic log. If you set the threshold on a namespace, but few messages are being produced to the topic, offload does not work until the current segment is full.

You can configure the threshold size using CLI tools, such as pulsarctl or pulsar-admin.

The offload configurations in broker.conf and standalone.conf are used for the namespaces that do not have namespace level offload policies. Each namespace can have its own offload policy. If you want to set offload policy for each namespace, use the command pulsar-admin namespaces set-offload-policies options command.

Example

This example sets the GCS offloader threshold size to 10 MB using pulsarctl.

bin/pulsarctl namespaces set-offload-threshold --size 10M my-tenant/my-namespace

Tip

For more information about the pulsarctl namespaces set-offload-threshold options command, including flags, descriptions, default values, and shorthands, see here.

Configure GCS offloader to run manually

For individual topics, you can trigger GCS offloader manually using the following methods:

  • Use REST endpoint

  • Use CLI tools (such as pulsarctl or pulsar-admin).

    To trigger the GCS via CLI tools, you need to specify the maximum amount of data (threshold) that should be retained on a Pulsar cluster for a topic. If the size of the topic data on the Pulsar cluster exceeds this threshold, segments from the topic are moved to GCS until the threshold is no longer exceeded. Older segments are moved first.

Example

  • This example triggers GCS offloader to run manually using pulsarctl with the command pulsarctl topic offload (topic-name) (threshold).

    bin/pulsarctl topics offload persistent://my-tenant/my-namespace/topic1 10M
    

    Output

    Offload triggered for persistent://my-tenant/my-namespace/topic1 for messages before 2:0:-1
    

    Tip

    For more information about the pulsarctl topics offload options command, including flags, descriptions, default values, and shorthands, see here.

  • This example checks GCS offloader status using pulsarctl with the command pulsarctl topic offload-status options.

    bin/pulsarctl topics offload-status persistent://my-tenant/my-namespace/topic1
    

    Output

    Offload is currently running
    

    To wait for GCS to complete the job, add the -w flag.

    bin/pulsarctl topics offload-status -w persistent://my-tenant/my-namespace/topic1
    

    Output

    Offload was a success
    

    If there is an error in offloading, the error is propagated to the pulsarctl topic offload-status command.

    bin/pulsarctl topic offload-status persistent://my-tenant/my-namespace/topic1
    

    Output

    Error in offload
    null
    
    Reason: Error offloading: org.apache.bookkeeper.mledger.ManagedLedgerException: java.util.concurrent.CompletionException: com.amazonaws.services.s3.model.AmazonS3Exception: Anonymous users cannot initiate multipart uploads.  Please authenticate. (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: 798758DE3F1776DF; S3 Extended Request ID: dhBFz/lZm1oiG/oBEepeNlhrtsDlzoOhocuYMpKihQGXe6EG8puRGOkK6UwqzVrMXTWBxxHcS+g=), S3 Extended Request ID: dhBFz/lZm1oiG/oBEepeNlhrtsDlzoOhocuYMpKihQGXe6EG8puRGOkK6UwqzVrMXTWBxxHcS+g=
    

    Tip

    For more information about the pulsarctl topics offload-status options command, including flags, descriptions, default values, and shorthands, see here.

Usage

This tutorial provides step-by-step instructions on how to use GCS with Pulsar.

Step 1: configure GCS offloader driver

As indicated in the configuration chapter before using the GCS offloader, you need to configure some properties for the GCS offloader driver. This tutorial assumes that you have configured the GCS offloader driver in standalone.conf as below and run Pulsar in standalone mode.

managedLedgerOffloadDriver=google-cloud-storage

gcsManagedLedgerOffloadBucket=pulsar-topic-offload-1

gcsManagedLedgerOffloadRegion=europe-west3

gcsManagedLedgerOffloadServiceAccountKeyFile=/Users/user-name/Downloads/affable-ray-226821-6251d04987e9.json

offloadersDirectory=offloaders

managedLedgerMinLedgerRolloverTimeMinutes=2 

managedLedgerMaxEntriesPerLedger=5000

Step 2: create GCS bucket

  1. Navigate to Google Cloud Console, and select Storage at the left navigation panel.

  2. To create a GCS bucket, click Browser > CREATE BUCKET.

    Note

    To ensure broker can access the bucket, you need to assign Storage Object Creator and Storage Object Viewer roles to your service account. For how to assign roles to your service account, see Step 4: grant access to GCS service account.

  3. Name your bucket.

    The bucket name should be the same as the value of gcsManagedLedgerOffloadBucket that you configured in Step 1: configure GCS offloader driver.

  4. Set your bucket region.

    The bucket region should be the same as the value of gcsManagedLedgerOffloadRegion that you configured in Step 1: configure GCS offloader driver.

  5. Click Create.

    Now you have successfully created a GCS bucket.

Step 3: create GCS service account

  1. Navigate to Google Cloud Console, and select IAM & Admin at the left navigation panel.

  2. To create a new service account, click Service Accounts > CREATE SERVICE ACCOUNT.

  3. Name your service account.

    The service account ID is automatically generated.

  4. Click Create.

  5. Grant privilege to your service account.

    This tutorial skips this task here and completes it in Step 4: grant access to GCS service account.

    Click Continue.

  6. Click CREATE KEY.

  7. Click JSON > Create, and save this JSON file to your local machine.

    The JSON file is the value you need to set for gcsManagedLedgerOffloadServiceAccountKeyFile in broker.conf.

  8. Copy the key ID in the JSON file to the key ID field and click Done.

Step 4: grant access to GCS service account

  1. On IAM and Admin homepage, click IAM > Add.

  2. Fill in your GCS service account name that you created in Step 3.

  3. Assign Storage Object Creator and Storage Object Viewer roles to your service account.

  4. Click Save.

Step 5: offload data from BookKeeper to GCS

Execute the following commands in the repository where you download Pulsar tarball. For example, ~/path/to/apache-pulsar-2.5.1.

  1. Start Pulsar standalone.

    ./bin/pulsar standalone -a 127.0.0.1
    
  2. To ensure the data generated is not deleted immediately, it is recommended to set the retention policy, which can be either a size limit or a time limit. The larger value you set for the retention policy, the longer the data can be retained.

    ./bin/pulsarctl namespaces set-retention public/default --size 10G --time 3d
    

    Tip

    For more information about the pulsarctl namespaces set-retention options command, including flags, descriptions, default values, and shorthands, see here.

  3. Produce data using pulsar-perf.

    ./bin/pulsar-perf produce -r 1000 -s 2048 test-topic
    
  4. The offloading operation starts after a ledger rollover is trigged. To ensure offload data successfully, it is recommended that you wait until several ledger rollovers are triggered. In this case, you might need to wait for a second. You can check the ledger status using pulsarctl.

    ./bin/pulsarctl topics internal-stats test-topic
    

    Output

    As shown below, there are ledger 10, ledger 11, and ledger 12 in the output.

    "entriesAddedCounter" : 107982,
    "numberOfEntries" : 107982,
    "totalSize" : 508276193,
    "currentLedgerEntries" : 1953,
    "currentLedgerSize" : 9167863,
    "lastLedgerCreatedTimestamp" : "2020-05-12T00:07:27.273+08:00",
    "waitingCursorsCount" : 0,
    "pendingAddEntriesCount" : 1,
    "lastConfirmedEntry" : "12:1951",
    "state" : "LedgerOpened",
    "ledgers" : [ {
        "ledgerld" : 10,
        "entries" : 52985,
        "size" : 249500259,
        "offloaded" : false
    }, {
        "ledgerld" : 11,
        "entries" : 53045,
        "size" : 249614295,
        "offloaded" : false
    }, {
        "ledgerId" : 12,
        "entries" : 0,
        "size" : 0,
        "offloaded" : false
    }, ]
    "cursors" : {  }
    

    Tip

    For more information about the pulsarctl topics internal-stats options command, including flags, descriptions, default values, and shorthands, see here.

  5. After ledger rollover, trigger the offloading operation manually.

    You can also trigger the offloading operation automatically. For more information, see Configure GCS offloader to run automatically.

    ./bin/pulsarctl topics offload --size-threshold 10M public/default/test-topic
    

    Output

    Offload triggered for persistent://public/default/test-topic for messages before 12:0:-1
    

    Tip

    For more information about the pulsarctl topics offload options command, including flags, descriptions, default values, and shorthands, see here.

  6. Check the offloading operation status.

    ./bin/pulsarctl topics offload-status -w public/default/test-topic
    

    You might need to wait for a while until the offloading operation finishes.

    Output

    Offload was a success
    

    Tip

    For more information about the pulsarctl topics offload-status options command, including flags, descriptions, default values, and shorthands, see here.

    At last, you can see the data is offloaded to GCS successfully.

How it works

Pulsar's segment oriented architecture allows for topic backlogs to effectively grow very large without limit. However, this can become expensive over time. One way to alleviate this cost is to use tiered storage. With tiered storage, older messages in the backlog can be moved from BookKeeper to a cheaper storage mechanism, while still allowing clients to access the backlog as if nothing had changed.

Currently, Pulsar supports AWS S3, GCS, and filesystem for long term storage. Offloading to long term storage can be triggered via REST API or CLI tools. You can pass as many as topics you want to retain on BookKeeper and brokers copy the backlog data to long term storage. The original data is deleted from BookKeeper after a configured delay.

A topic in Pulsar is backed by a log, known as a managed ledger. This log is composed of an ordered list of segments. Pulsar only writes to the final segment of the log. All previous segments are sealed. The data within the segment is immutable. This is known as a segment oriented architecture.

The tiered storage offloading mechanism takes advantage of this segment oriented architecture. When offloading is requested, the segments of the log are copied, one-by-one, to tiered storage. All segments of the log, apart from the segment currently being written to, can be offloaded.

On the broker, you need to configure the bucket and credentials for the cloud storage service. The configured bucket must exist before attempting to offload. If it does not exist, the offload operation fails. Pulsar uses multi-part objects to upload the segment data. It is possible that a broker could crash while uploading the data. It is recommended that you add a life cycle rule your bucket to expire incomplete multi-part upload after a day or two to avoid getting charged for incomplete uploads.

Reference

For more information about tiered storage for Pulsar topics, see here.