offloader
Filesystem offloader
Use filesystem offloader with Pulsar
Authored by
ASF
Support type
StreamNative
License
Apache License 2.0

Overview

To deliver an event streaming service, Pulsar needs to manage large numbers of messages and data in real-time, and this requires keeping large amounts of data on the platform, or readily accessible. As the data amount increases, it becomes significantly more expensive to store, manage, and retrieve data, so administrators and developers look to external stores for long-term storage.

Pulsar leverages a unique tiered storage solution that addresses some of these key challenges faced by other distributed log systems. This tiered storage solution extends the storage capabilities of Pulsar by offloading data from Apache BookKeeper to scalable cloud-native storage or filesystems without adding storage. Older topic data can be offloaded to long-term storage that readily scales with the volume of data.

In this way, on the one hand, tiered storage is much cheaper than the storage in Pulsar clusters; on the other hand, there is no perceivable difference in consuming a topic no matter whether data is stored on tiered storage or on Pulsar clusters. They produce and consume messages in exactly the same way.

Additionally, Pulsar is able to retain both historic and real-time data and provides a unified view as infinite event streams, which can be easily reprocessed or backloaded into new systems. You can integrate Pulsar with a unified data processing engine (such as Apache Flink or Apache Spark) to unlock many new use cases stemming from infinite data retention.

How it works

Pulsar's segment oriented architecture allows for topic backlogs to effectively grow very large without limit. However, this can become expensive over time. One way to alleviate this cost is to use tiered storage. With tiered storage, older messages in the backlog can be moved from BookKeeper to a cheaper storage mechanism, while still allowing clients to access the backlog as if nothing had changed.

Currently, Pulsar supports AWS S3, GCS, and filesystem for long term storage. Offloading to long term storage can be triggered via REST API or CLI tools. You can pass as many as topics you want to retain on BookKeeper and brokers copy the backlog data to long term storage. The original data is deleted from BookKeeper after a configured delay.

A topic in Pulsar is backed by a log, known as a managed ledger. This log is composed of an ordered list of segments. Pulsar only writes to the final segment of the log. All previous segments are sealed. The data within the segment is immutable. This is known as a segment oriented architecture.

The tiered storage offloading mechanism takes advantage of this segment oriented architecture. When offloading is requested, the segments of the log are copied, one-by-one, to tiered storage. All segments of the log, apart from the segment currently being written to, can be offloaded.

To offload data to cloud storge, you need to configure the bucket and credentials for the cloud storage service on the broker. The configured bucket must exist before attempting to offload. Otherwise, the offload operation fails.

To offload data to filesystem, you need to configure some required parameters. For details, see configuration.

Pulsar uses multi-part objects to upload the segment data. It is possible that a broker could crash while uploading the data. It is recommended that you add a life cycle rule for your bucket to expire incomplete multi-part upload after a day or two to avoid getting charged for incomplete uploads.

Installation

This section describes how to install the filesystem offloader.

Prerequisite

  • Pulsar 2.4.2 or later versions

Step

  1. Download the Pulsar tarball using one of the following ways:

    • Download the Pulsar tarball from the Apache mirror.

    • Download the Pulsar tarball from the Pulsar download page.

    • Download the Pulsar tarball using the wget command.

      This example downloads Pulsar 2.5.1.

      wget https://archive.apache.org/dist/pulsar/pulsar-2.5.1/apache-pulsar-2.5.1-bin.tar.gz
      
  2. Download and untar the Pulsar offloaders package.

    This example downloads Pulsar offloader 2.5.1.

    wget https://downloads.apache.org/pulsar/pulsar-2.5.1/apache-pulsar-offloaders-2.5.1-bin.tar.gz
    
    tar xvfz apache-pulsar-offloaders-2.5.1-bin.tar.gz
    

    Note

    * If you run Pulsar in a bare metal cluster, make sure that the offloaders tarball is unzipped in each broker's Pulsar directory.

    • If you run Pulsar in Docker or deploying Pulsar using a Docker image (such as K8S and DCOS), you can use the apachepulsar/pulsar-all image. The apachepulsar/pulsar-all image has already bundled tiered storage offloaders.
  3. Copy the Pulsar offloaders as offloaders in the Pulsar directory.

    mv apache-pulsar-offloaders-2.5.1/offloaders apache-pulsar-2.5.1/offloaders
    
    ls offloaders
    

    Output

    tiered-storage-file-system-2.5.1.nar
    tiered-storage-jcloud-2.5.1.nar
    

    Note

    * If you run Pulsar in a bare metal cluster, make sure that the offloaders tarball is unzipped in each broker's Pulsar directory.

    • If you run Pulsar in Docker or deploying Pulsar using a Docker image (such as K8s and DCOS), you can use the apachepulsar/pulsar-all image. apachepulsar/pulsar-all image has already bundled tiered storage offloaders.

Configuration

Before offloading data from BookKeeper to filesystem, you need to configure some properties of the filesystem offloader driver.

Besides, you can also configure the filesystem offloader to run it automatically or trigger it manually.

Configure filesystem offloader driver

You can configure the filesystem offloader driver in the broker.conf or standalone.conf configuration file.

  • Required configurations are as below.

    ParameterDescriptionExample value
    managedLedgerOffloadDriverOffloader driver name, which is case-insensitive.filesystem
    fileSystemURIConnection address, which is the URI to access the default Hadoop distributed file system.hdfs://127.0.0.1:9000
    offloadersDirectoryOffloader directoryoffloaders
    fileSystemProfilePathHadoop profile path. The configuration file is stored in the Hadoop profile path. It contains various settings for Hadoop performance tuning.conf/filesystem_offload_core_site.xml
  • Optional configurations are as below.

    ParameterDescriptionExample value
    managedLedgerMinLedgerRolloverTimeMinutesMinimum time between ledger rollover for a topic. <br><br>Note: it is not recommended to set this parameter in the production environment.2
    managedLedgerMaxEntriesPerLedgerMaximum number of entries to append to a ledger before triggering a rollover.<br><br>Note: it is not recommended to set this parameter in the production environment.5000

Run filesystem offloader automatically

Namespace policy can be configured to offload data automatically once a threshold is reached. The threshold is based on the size of data that a topic has stored on a Pulsar cluster. Once the topic reaches the threshold, an offload operation is triggered automatically.

Threshold valueAction
> 0It triggers the offloading operation if the topic storage reaches its threshold.
= 0It causes a broker to offload data as soon as possible.
< 0It disables automatic offloading operation.

Automatic offload runs when a new segment is added to a topic log. If you set the threshold on a namespace, but few messages are being produced to the topic, offload does not work until the current segment is full.

You can configure the threshold size using CLI tools, such as pulsarctl or puslar-admin.

Example

This example sets the filesystem offloader threshold size to 10 MB using pulsarctl.

bin/pulsarctl namespaces set-offload-threshold --size 10M my-tenant/my-namespace

Tip

For more information about the pulsarctl namespaces set-offload-threshold options command, including flags, descriptions, default values, and shorthands, see here.

Run filesystem offloader manually

For individual topics, you can trigger filesystem offloader manually using the following methods:

  • Use REST endpoint.

  • Use CLI tools (such as pulsarctl or pulsar-admin).

To trigger via CLI tools, you need to specify the maximum amount of data (threshold) that should be retained on a Pulsar cluster for a topic. If the size of the topic data on the Pulsar cluster exceeds this threshold, segments from the topic are offloaded to filesystem until the threshold is no longer exceeded. Older segments are offloaded first.

Example

  • This example triggers the filesystem offloader to run manually using pulsarctl.

    bin/pulsarctl topic offload persistent://my-tenant/my-namespace/topic1 10M
    

    Output

    Offload triggered for persistent://my-tenant/my-namespace/topic1 for messages before 2:0:-1
    

    Tip

    For more information about the pulsarctl topic offload options command, including flags, descriptions, default values, and shorthands, see here.

  • This example checks filesystem offloader status using pulsarctl.

    bin/pulsarctl topic offload-status persistent://my-tenant/my-namespace/topic1
    

    Output

    Offload is currently running
    

    To wait for filesystem to complete the job, add the -w flag.

    bin/pulsarctl topic offload-status -w persistent://my-tenant/my-namespace/topic1
    

    Output

    Offload was a success
    

    If there is an error in the offloading operation, the error is propagated to the pulsarctl topic offload-status command.

    bin/pulsarctl topic offload-status persistent://my-tenant/my-namespace/topic1
    

    Output

    Error in offload
    null
    
    Reason: Error offloading: org.apache.bookkeeper.mledger.ManagedLedgerException: java.util.concurrent.CompletionException: com.amazonaws.services.s3.model.AmazonS3Exception: Anonymous users cannot initiate multipart uploads.  Please authenticate. (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: 798758DE3F1776DF; S3 Extended Request ID: dhBFz/lZm1oiG/oBEepeNlhrtsDlzoOhocuYMpKihQGXe6EG8puRGOkK6UwqzVrMXTWBxxHcS+g=), S3 Extended Request ID: dhBFz/lZm1oiG/oBEepeNlhrtsDlzoOhocuYMpKihQGXe6EG8puRGOkK6UwqzVrMXTWBxxHcS+g=
    

    Tip

    For more information about the pulsarctl topic offload-status options command, including flags, descriptions, default values, and shorthands, see here.

Usage

This section provides step-by-step instructions on how to use the filesystem offloader to move data from Pulsar to Hadoop Distributed File System (HDFS) or Network File system (NFS).

To move data from Pulsar to HDFS, follow these steps.

Step1: Prepare the HDFS environment

This tutorial sets up a Hadoop single node cluster and uses Hadoop 3.2.1.

Tip

For more information about setting up a Hadoop single node cluster, see here.

  1. Download and uncompress Hadoop 3.2.1.

    wget https://mirrors.bfsu.edu.cn/apache/hadoop/common/hadoop-3.2.1/hadoop-3.2.1.tar.gz  
    
    tar -zxvf hadoop-3.2.1.tar.gz -C $HADOOP_HOME
    
  2. Configure Hadoop.

    # $HADOOP_HOME/etc/hadoop/core-site.xml
    <configuration>
        <property>
            <name>fs.defaultFS</name>
            <value>hdfs://localhost:9000</value>
        </property>
    </configuration>
    
    # $HADOOP_HOME/etc/hadoop/hdfs-site.xml
    <configuration>
        <property>
            <name>dfs.replication</name>
            <value>1</value>
        </property>
    </configuration>
    
  3. Set passphraseless ssh.

    # Now check that you can ssh to the localhost without a passphrase:
    $ ssh localhost
    # If you cannot ssh to localhost without a passphrase, execute the following commands
    $ ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
    $ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
    $ chmod 0600 ~/.ssh/authorized_keys
    
  4. Start HDFS.

    # don't execute this command repeatedly, repeat execute will cauld the clusterId of the datanode is not consistent with namenode
    $HADOOP_HOME/bin/hadoop namenode -format
    $HADOOP_HOME/sbin/start-dfs.sh
    
  5. Navigate to the HDFS website.

    You can see the Overview page.

    1. At the top navigation bar, click Datanodes to check DataNode information.

    1. Click HTTP Address to get more detailed information about localhost:9866.

    As can be seen below, the size of Capacity Used is 4 KB, which is the initial value.

Step 2: Install the filesystem offloader

For details, see installation.

Step 3: Configure the filesystem offloader

As indicated in the configuration section, you need to configure some properties for the filesystem offloader driver before using it. This tutorial assumes that you have configured the filesystem offloader driver as below and run Pulsar in standalone mode.

Set the following configurations in the conf/standalone.conf file.

managedLedgerOffloadDriver=filesystem
fileSystemURI=hdfs://127.0.0.1:9000
fileSystemProfilePath=conf/filesystem_offload_core_site.xml

Note

For testing purposes, you can set the following two configurations to speed up ledger rollover, but it is not recommended that you set them in the production environment.

managedLedgerMinLedgerRolloverTimeMinutes=1
managedLedgerMaxEntriesPerLedger=100

Step 4: Offload data from BookKeeper to filesystem

Execute the following commands in the repository where you download Pulsar tarball. For example, ~/path/to/apache-pulsar-2.5.1.

  1. Start Pulsar standalone.

    ./bin/pulsar standalone -a 127.0.0.1
    
  2. To ensure the data generated is not deleted immediately, it is recommended to set the retention policy, which can be either a size limit or a time limit. The larger value you set for the retention policy, the longer the data can be retained.

    ./bin/pulsarctl namespaces set-retention public/default --size 100M --time 2d
    

    Tip

    For more information about the pulsarctl namespaces set-retention options command, including flags, descriptions, default values, and shorthands, see here.

  3. Produce data using pulsar-client.

    ./bin/pulsar-client produce -m "Hello FileSystem Offloader" -n 1000 public/default/fs-test
    
  4. The offloading operation starts after a ledger rollover is triggered. To ensure offload data successfully, it is recommended that you wait until several ledger rollovers are triggered. In this case, you might need to wait for a second. You can check the ledger status using pulsarctl.

    ./bin/pulsarctl topics internal-stats public/default/fs-test
    

    Output

    The data of the ledger 696 is not offloaded.

    {
    "version": 1,
    "creationDate": "2020-06-16T21:46:25.807+08:00",
    "modificationDate": "2020-06-16T21:46:25.821+08:00",
    "ledgers": [
    {
        "ledgerId": 696,
        "isOffloaded": false
    }
    ],
    "cursors": {}
    }
    
  5. Wait a second and send more messages to the topic.

    ./bin/pulsar-client produce -m "Hello FileSystem Offloader" -n 1000 public/default/fs-test
    
  6. Check the ledger status using pulsarctl.

    ./bin/pulsarctl topics internal-stats public/default/fs-test
    

    Output

    The ledger 696 is rollovered.

    {
    "version": 2,
    "creationDate": "2020-06-16T21:46:25.807+08:00",
    "modificationDate": "2020-06-16T21:48:52.288+08:00",
    "ledgers": [
    {
        "ledgerId": 696,
        "entries": 1001,
        "size": 81695,
        "isOffloaded": false
    },
    {
        "ledgerId": 697,
        "isOffloaded": false
    }
    ],
    "cursors": {}
    }
    
  7. Trigger the offloading operation manually using pulsarctl.

    ./bin/pulsarctl topic offload -s 0 public/default/fs-test
    

    Output

    Data in ledgers before the ledge 697 is offloaded.

    # offload info, the ledgers before 697 will be offloaded
    Offload triggered for persistent://public/default/fs-test3 for messages before 697:0:-1
    
  8. Check the ledger status using pulsarctl.

    ./bin/pulsarctl topic internal-info public/default/fs-test
    

    Output

    The data of the ledger 696 is offloaded.

    {
    "version": 4,
    "creationDate": "2020-06-16T21:46:25.807+08:00",
    "modificationDate": "2020-06-16T21:52:13.25+08:00",
    "ledgers": [
    {
        "ledgerId": 696,
        "entries": 1001,
        "size": 81695,
        "isOffloaded": true
    },
    {
        "ledgerId": 697,
        "isOffloaded": false
    }
    ],
    "cursors": {}
    }
    

    And the Capacity Used is changed from 4 KB to 116.46 KB.

Reference

For more information about tiered storage for Pulsar topics, see here.