Overview
To deliver an event streaming service, Pulsar needs to manage large numbers of messages and data in real-time, and this requires keeping large amounts of data on the platform, or readily accessible. As the data amount increases, it becomes significantly more expensive to store, manage, and retrieve data, so administrators and developers look to external stores for long-term storage.
Pulsar leverages a unique tiered storage solution that addresses some of these key challenges faced by other distributed log systems. This tiered storage solution extends the storage capabilities of Pulsar by offloading data from Apache BookKeeper to scalable cloud-native storage or files ystems without adding storage. Older topic data can be offloaded to long-term storage that readily scales with the volume of data.
Tiered storage uses Apache jclouds to support AWS S3 and GCS (Google Cloud Storage) for long term storage.
With jclouds, it is easy to add support for more cloud storage providers in the future.
Tiered storage uses Apache Hadoop to support file systems for long term storage.
With Hadoop, it is easy to add support for more file systems in the future.
In this way, on the one hand, tiered storage is much cheaper than the storage in Pulsar clusters; on the other hand, there is no perceivable difference in consuming a topic no matter whether data is stored on tiered storage or on Pulsar clusters. They produce and consume messages in exactly the same way.
Additionally, Pulsar is able to retain both historic and real-time data and provides a unified view as infinite event streams, which can be easily reprocessed or backloaded into new systems. You can integrate Pulsar with a unified data processing engine (such as Apache Flink or Apache Spark) to unlock many new use cases stemming from infinite data retention.
Installation
Follow the steps below to install the AWS S3 offloader.
Prerequisite
- Apache jclouds: 2.2.0 or later versions
Step
Download Pulsar tarball using one of the following ways:
download from the Apache mirror
download from the Pulsar downloads page
use wget:
wget https://archive.apache.org/dist/pulsar/pulsar-2.5.1/apache-pulsar-2.5.1-bin.tar.gz
Download and untar the Pulsar offloaders package.
wget https://downloads.apache.org/pulsar/pulsar-2.5.1/apache-pulsar-offloaders-2.5.1-bin.tar.gz tar xvfz apache-pulsar-offloaders-2.5.1-bin.tar.gz
Copy the Pulsar offloaders as
offloaders
in the Pulsar directory.mv apache-pulsar-offloaders-2.5.1/offloaders apache-pulsar-2.5.1/offloaders ls offloaders
Output
As shown from the output, Pulsar uses Apache jclouds to support AWS S3 and GCS for long term storage.
tiered-storage-file-system-2.5.1.nar tiered-storage-jcloud-2.5.1.nar
Note
If you are running Pulsar in a bare metal cluster, make sure that
offloaders
tarball is unzipped in every broker's Pulsar directory.If you are running Pulsar in Docker or deploying Pulsar using a Docker image (such as K8s and DCOS), you can use the
apachepulsar/pulsar-all
image instead of theapachepulsar/pulsar
image.apachepulsar/pulsar-all
image has already bundled tiered storage offloaders.
Configuration
Note
Before offloading data from BookKeeper to AWS S3, you need to configure some properties of the AWS S3 offload driver.
Besides, you can also configure the AWS S3 offloader to run it automatically or trigger it manually.
Configure AWS S3 offloader driver
You can configure the AWS S3 offloader driver in the configuration file broker.conf
or standalone.conf
.
Required configurations are as below.
Required configuration Description Example value managedLedgerOffloadDriver
Offloader driver name, which is case-insensitive. <br><br>Note: there is a third driver type, S3, which is identical to AWS S3, though S3 requires that you specify an endpoint URL using s3ManagedLedgerOffloadServiceEndpoint
. This is useful if using an S3 compatible data store other than AWS S3.aws-s3 offloadersDirectory
Offloader directory offloaders s3ManagedLedgerOffloadBucket
Bucket pulsar-topic-offload Optional configurations are as below.
Optional Description Example value s3ManagedLedgerOffloadRegion
Bucket region eu-west-3 s3ManagedLedgerOffloadReadBufferSizeInBytes
Size of block read 1 MB s3ManagedLedgerOffloadMaxBlockSizeInBytes
Size of block write 64 MB managedLedgerMinLedgerRolloverTimeMinutes
Minimum time between ledger rollover for a topic<br><br>Note: it is not recommended that you set this configuration in the product environment. 2 managedLedgerMaxEntriesPerLedger
Maximum number of entries to append to a ledger before triggering a rollover.<br><br>Note: it is not recommended that you set this configuration in the product environment. 5000
Bucket (required)
A bucket is a basic container that holds your data. Everything you store in AWS S3 must be contained in a bucket. You can use a bucket to organize your data and control access to your data, but unlike directory and folder, you cannot nest a bucket.
Example
This example names the bucket as pulsar-topic-offload.
s3ManagedLedgerOffloadBucket=pulsar-topic-offload
Bucket region
A bucket region is the region where a bucket is located. If a bucket region is not specified, the default region (US East (N. Virginia)
) is used.
Tip
For more information about AWS regions and endpoints, see here.
Example
This example sets the bucket region as europe-west-3.
s3ManagedLedgerOffloadRegion=eu-west-3
Authentication (required)
To be able to access AWS S3, you need to authenticate with AWS S3.
Pulsar does not provide any direct methods of configuring authentication for AWS S3, but relies on the mechanisms supported by the DefaultAWSCredentialsProviderChain.
Once you have created a set of credentials in the AWS IAM console, you can configure credentials using the following method.
Use EC2 instance metadata credentials.
If you are on AWS instance with an instance profile that provides credentials, StreamNative uses these credentials if no other mechanism is provided.
Set the environment variables
AWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
inconf/pulsar_env.sh
."export" is important so that the variables are made available in the environment of spawned processes.
export AWS_ACCESS_KEY_ID=ABC123456789 export AWS_SECRET_ACCESS_KEY=ded7db27a4558e2ea8bbf0bf37ae0e8521618f366c
Add the Java system properties
aws.accessKeyId
andaws.secretKey
toPULSAR_EXTRA_OPTS
inconf/pulsar_env.sh
.PULSAR_EXTRA_OPTS="${PULSAR_EXTRA_OPTS} ${PULSAR_MEM} ${PULSAR_GC} -Daws.accessKeyId=ABC123456789 -Daws.secretKey=ded7db27a4558e2ea8bbf0bf37ae0e8521618f366c -Dio.netty.leakDetectionLevel=disabled -Dio.netty.recycler.maxCapacity.default=1000 -Dio.netty.recycler.linkCapacity=1024"
Set the access credentials in
~/.aws/credentials
.[default] aws_access_key_id=ABC123456789 aws_secret_access_key=ded7db27a4558e2ea8bbf0bf37ae0e8521618f366c
Assume an IAM role.
This example uses the
DefaultAWSCredentialsProviderChain
for assuming this role.The broker must be rebooted for credentials specified in
pulsar_env
to take effect.s3ManagedLedgerOffloadRole=<aws role arn> s3ManagedLedgerOffloadRoleSessionName=pulsar-s3-offload
Size of block read/write
You can configure the size of a request sent to or read from AWS S3 in the configuration file broker.conf
or standalone.conf
.
Configuration | Description | Default value |
---|---|---|
s3ManagedLedgerOffloadReadBufferSizeInBytes | Block size for each individual read when reading back data from AWS S3. | 1 MB |
s3ManagedLedgerOffloadMaxBlockSizeInBytes | Maximum size of a "part" sent during a multipart upload to GCS. It cannot be smaller than 5 MB. | 64 MB |
Configure AWS S3 offloader to run automatically
Namespace policy can be configured to offload data automatically once a threshold is reached. The threshold is based on the size of data that a topic has stored on a Pulsar cluster. Once the topic reaches the threshold, an offloading operation is triggered automatically.
Threshold value | Action |
---|
0 | It triggers the offloading operation if the topic storage reaches its threshold. = 0|It causes a broker to offload data as soon as possible. < 0 |It disables automatic offloading operation.
Automatic offloading runs when a new segment is added to a topic log. If you set the threshold on a namespace, but few messages are being produced to the topic, offload does not work until the current segment is full.
You can configure the threshold size using CLI tools, such as pulsarctl or pulsar-admin.
The offload configurations in broker.conf
and standalone.conf
are used for the namespaces that do not have namespace level offload policies. Each namespace can have its own offload policy. If you want to set offload policy for each namespace, use the command pulsar-admin namespaces set-offload-policies options
command.
Example
This example sets the AWS S3 offloader threshold size to 10 MB using pulsar-admin.
bin/pulsar-admin namespaces set-offload-threshold --size 10M my-tenant/my-namespace
Tip
For more information about the
pulsar-admin namespaces set-offload-threshold options
command, including flags, descriptions, and default values, see here.
Configure AWS S3 offloader to run manually
For individual topics, you can trigger AWS S3 offloader manually using the following methods:
Use REST endpoint
Use CLI tools (such as pulsarctl or pulsar-admin).
To trigger it via CLI tools, you need to specify the maximum amount of data (threshold) that should be retained on a Pulsar cluster for a topic. If the size of the topic data on the Pulsar cluster exceeds this threshold, segments from the topic are moved to AWS S3 until the threshold is no longer exceeded. Older segments are moved first.
Example
This example triggers AWS S3 offloader to run manually using pulsar-admin.
bin/pulsar-admin topics offload --size-threshold 10M my-tenant/my-namespace/topic1
Output
Offload triggered for persistent://my-tenant/my-namespace/topic1 for messages before 2:0:-1
Tip
For more information about the
pulsar-admin topics offload options
command, including flags, descriptions, and default values, see here.This example checks AWS S3 offloader status using pulsar-admin.
bin/pulsar-admin topics offload-status persistent://my-tenant/my-namespace/topic1
Output
Offload is currently running
To wait for AWS S3 offloader to complete the job, add the
-w
flag.bin/pulsar-admin topics offload-status -w persistent://my-tenant/my-namespace/topic1
Output
Offload was a success
If there is an error in offloading, the error is propagated to the
pulsar-admin topics offload-status
command.bin/pulsar-admin topics offload-status persistent://my-tenant/my-namespace/topic1
Output
Error in offload null Reason: Error offloading: org.apache.bookkeeper.mledger.ManagedLedgerException: java.util.concurrent.CompletionException: com.amazonaws.services.s3.model.AmazonS3Exception: Anonymous users cannot initiate multipart uploads. Please authenticate. (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: 798758DE3F1776DF; S3 Extended Request ID: dhBFz/lZm1oiG/oBEepeNlhrtsDlzoOhocuYMpKihQGXe6EG8puRGOkK6UwqzVrMXTWBxxHcS+g=), S3 Extended Request ID: dhBFz/lZm1oiG/oBEepeNlhrtsDlzoOhocuYMpKihQGXe6EG8puRGOkK6UwqzVrMXTWBxxHcS+g=
Tip
For more information about the
pulsar-admin topics offload-status options
command, including flags, descriptions, and default values, see here.
Usage
This tutorial provides step-by-step instructions on how to use AWS S3 offloader with Pulsar.
Step 1: configure AWS S3 offloader driver
As indicated in the configuration chapter, you need to configure some properties for the AWS S3 offloader driver before using it. This tutorial assumes that you have configured the AWS S3 offloader driver as below and run Pulsar in standalone mode.
Set the following configurations in
conf/standalone.conf
.managedLedgerOffloadDriver=aws-s3 s3ManagedLedgerOffloadBucket=test-pulsar-offload s3ManagedLedgerOffloadRegion=us-west-2
Note
For testing purposes, you can set the following two configurations to speed up ledger rollover, but it is not recommended that you set them in the product environment.
managedLedgerMinLedgerRolloverTimeMinutes=1 managedLedgerMaxEntriesPerLedger=5000
Set the following configurations in
conf/pulsar_env.sh
.export AWS_ACCESS_KEY_ID=ABCDEFG123456789 export AWS_SECRET_ACCESS_KEY=QWERYHBDSSGJJBVCCDCCC
Step 2: create AWS S3 bucket
Before uploading data to AWS S3, you need to create a bucket in one of the AWS regions to store your data. After creating a bucket, you can upload an unlimited number of data objects to the bucket.
Buckets have configuration properties, including geographical region, access settings for the objects in the bucket, and other metadata.
Sign in to the AWS Management Console and open the Amazon S3 console.
Click Create bucket.
Set your Bucket name and Region.
Note
After creating a bucket, you cannot change its name. For information about naming buckets, see rules for bucket naming.
The bucket name should be the same as the value of
s3ManagedLedgerOffloadBucket
and the region should be the same as the value ofs3ManagedLedgerOffloadRegion
that you configured in Step 1: configure AWS S3 offloader driver.In Bucket settings for Block Public Access, choose the block public access settings that you want to apply to the bucket.
Click Create bucket. Now you have successfully created a bucket.
Step 3: create a group
Sign in to the AWS Management Console and open the IAM console.
In the navigation pane, click Groups > Create New Group.
In the Group Name box, type the name of the group and click Next Step.
In the list of policies, select the check box for each policy that you want to apply to all members of the group and click Next Step.
Check all of the choices you made up to this point. When you are ready to proceed, choose Create Group.
You have successfully created a group as below.
Step 4: create a user
Sign in to the AWS Management Console and open the IAM console.
In the navigation pane, click Users > Add user.
- Type your user name (not case-sensitive) and select AWS access type.
Click Next Permissions.
On the Set permissions page, specify how you want to assign permissions to your user.
Click Next Tags.
(Optional) Set tags for your user and click Next: Review.
Tip
For more information about using tags in IAM, see tagging IAM users and roles.
- Check all of the choices you made up to this point. When you are ready to proceed, choose Create user.
To view the users' access keys (
access key IDs
andsecret access keys
), click Show next to each password and access key that you want to see.The
access key IDs
should be the same as the value ofAWS_ACCESS_KEY_ID
and thesecret access keys
should be the same as the value ofAWS_SECRET_ACCESS_KEY
that you configured in Step 1: configure AWS S3 offloader driver.Note
To save the access keys, click Download .csv and then save the file to a safe location. This is your only opportunity to view or download the secret access keys, and you must provide this information to your users before they can use the AWS API. Make sure you have saved the user's new access key ID and secret access key in a safe and secure place. You do not have access to the secret keys again after this step.
Step 5: offload data from BookKeeper to AWS S3
Execute the following commands in the repository where you download Pulsar tarball. For example, ~/path/to/apache-pulsar-2.5.1
.
Start Pulsar standalone.
./bin/pulsar standalone -a 127.0.0.1
To ensure the data generated is not deleted immediately, it is recommended to set the retention policy, which can be either a size limit or a time limit. The larger value you set for the retention policy, the longer the data can be retained.
./bin/pulsar-admin namespaces set-retention public/default --size -10G --time 3d
Tip
For more information about the
pulsar-admin namespaces set-retention options
command, including flags, descriptions, and default values, see here.Produce data using pulsar-perf.
./bin/pulsar-perf produce -r 1000 -s 2048 test-topic
The offloading operation starts after a ledger rollover is triggered. To ensure offload data successfully, it is recommended that you wait until several ledger rollovers are triggered. In this case, you might need to wait for a second. You can check the ledger status using pulsar-admin.
./bin/pulsar-admin topics stats-internal test-topic
After ledger rollover, trigger the offloading operation manually.
You can also trigger the offloading operation automatically. For more information, see Configure AWS S3 offloader to run automatically
./bin/pulsar-admin topics offload --size-threshold 10M public/default/test-topic
Output
Offload triggered for persistent://public/default/test-topic for messages before 12:0:-1
Check the offloading operation status.
./bin/pulsar-admin topics offload-status -w public/default/test-topic
You might need to wait for a while until the offloading operation finishes.
Output
Offload was a success
At last, you can see the data is offloaded to AWS S3 successfully.
How it works
Pulsar's segment oriented architecture allows for topic backlogs to effectively grow very large without limit. However, this can become expensive over time. One way to alleviate this cost is to use tiered storage. With tiered storage, older messages in the backlog can be moved from BookKeeper to a cheaper storage mechanism, while still allowing clients to access the backlog as if nothing had changed.
Currently, Pulsar supports AWS S3, GCS, and filesystem for long term storage. Offloading to long term storage can be triggered via REST API or CLI tools. You can pass as many as topics you want to retain on BookKeeper and brokers copy the backlog data to long term storage. The original data is deleted from BookKeeper after a configured delay.
A topic in Pulsar is backed by a log, known as a managed ledger. This log is composed of an ordered list of segments. Pulsar only writes to the final segment of the log. All previous segments are sealed. The data within the segment is immutable. This is known as a segment oriented architecture. The tiered storage offloading mechanism takes advantage of this segment oriented architecture. When offloading is requested, the segments of the log are copied, one-by-one, to tiered storage. All segments of the log, apart from the segment currently being written to, can be offloaded.
On the broker, you need to configure the bucket and credentials for the cloud storage service. The configured bucket must exist before attempting to offload. If it does not exist, the offload operation fails.
Pulsar uses multi-part objects to upload the segment data. It is possible that a broker could crash while uploading the data. It is recommended that you add a life cycle rule for your bucket to expire incomplete multi-part upload after a day or two to avoid getting charged for incomplete uploads.
Reference
For more information about tiered storage for Pulsar topics, see here.