- Configure Private Cloud
- Private Preview
- Lakehouse Storage
Configure Lakehouse Storage
Note
This feature is currently in private preview. If you want to try it out or have any questions, submit a ticket to the support team.
Overview
This tutorial offers a detailed guide on configuring Lakehouse Storage within a private cloud environment.
Supported Features
The Lakehouse Storage supports the following features:
- Storage options: HDFS, and Delta
- Data production and consumption with AVRO schema and Pulsar primitive schema
- Prerequisites: JDK 17+ and Pulsar 3.0.0+
Configuration Steps
To set up Lakehouse tiered storage, create an offload.conf
file in the conf
directory. The configuration parameters include:
Offload Framework Configurations
Configuration | Description | Required | Default Value |
---|---|---|---|
metadataServiceUri | The metadata service uri for bookkeeper client, for example, zk://localhost:2181/ledgers | Y | N/A |
pulsarWebServiceUrl | The pulsar web service URL, for example, http://localhost:8080 | Y | N/A |
pulsarServiceUrl | The pulsar protocol service URL, for example, pulsar://localhost:6650 | Y | N/A |
offloadProvider | The offloader driver's name, for example, delta or iceberg | Y | N/A |
maxContainerCountPerTopic | The maximum containers will be used by a single topic. | N | 2 |
containerSizeInBytes | The maximum bytes for each container. By default, the container can fill with 24 MB messages to write. | N | 25165824 |
enableSharedMessageContainer | Enable shared message container or not. Once the shared message container is enabled, all the offload processors will get the container from the shared message container pool | N | true |
sharedMessageContainerPoolMemory | The max shared message container pool memory size. Default: 0.125 * maxDirectMemorySize | N | 0.125 * maxDirectMemory |
offloadSubName | offload subscription name | N | __OFFLOAD |
initialDelayTime | The initial delay time in milliseconds | N | 10000 |
maxRetryTimes | The max retry times of the delay, when the backoff reaches the max retry number, the delay time won't increase anymore | N | 5 |
maxContainerSwitchTime | The max time that message container waiting to switch. Time unit: second. Default: 60 seconds. | N | 60 |
maxReadCacheSize | Amount of memory to use for caching data payload in offload framework. This memory is allocated from JVM direct memory and it's shared across all the topics running in the same broker. By default, uses 1/8th of available direct memory. | N | Math.max(64MB, (int) (0.125 * maxDirectMemory())) |
readContainerBatchSize | The bytes size of entries one read container hold to read. Default is 5MB. | N | 5242880 |
offloadWriterMaxRetryTimes | Offload writer the max retry times of the delay to retry for retryable write error. | N | 3 |
embeddedStatServiceEnabled | Enable the embedded stat service | N | true |
statServicePort | Stat service default port | N | 8090 |
offloadWriterThreads | The number of thread pool threads for lakehouse writer | N | 2 |
offloadReaderThreads | The number of thread pool threads for Lakehouse reader | N | 2 |
offloadCoreThreads | The number of thread pool threads for the core processor | N | 2 |
offloadHighPriorityTaskThreads | Maximum number of thread pool threads for the core processor high-priority task | N | 2 |
cluster | The tiered storage offloading cluster name, and it will be shown on the Prometheus metrics label. If you run the offloader within the Pulsar broker, you can set it the same with the Pulsar cluster name. Default: pulsar | N | pulsar |
schemaRegistryType | The schema registry type, we support 'pulsar','kop','fisher', default is 'pulsar'. | N | pulsar |
messageFormat | The msg's format, we support 'avro','json','csv','kop', default is 'avro'. | N | avro |
skipDirtyData | Whether to skip the dirty data, if skip, it will also ack the msg although encode failed. | N | false |
appendPulsarMetaField | Whether to append pulsar meta filed(such as: **ledgerId, **entryId). If disabled, can't read the msg from tiered storage. | N | true |
partitionKey | The custom partition key in the lakehouse table, support multi partition key split by ','. It only works when 'appendPulsarMetaField' is disabled. Default value is 'none' | N | none |
dataManagedByPulsar | Whether the data lifecycle is managed by Pulsar or not. If setting it to false, all the delete request from Pulsar won't be processed to the tiered storage | N | true |
Offload Writer/Reader Configurations
Configuration | Description | Required | Default Value |
---|---|---|---|
storagePath | The data stored path. For example, gs://offload-dev (GCS offload) or s3a://offload-dev (S3 offload) | N | data |
hdfsResourceFile | Hadoop profile path. The configuration file is stored in the Hadoop profile path. It contains various settings for Hadoop performance tuning. | N | conf/filesystem_offload_core_site.xml |
maxCommitInterval | The max commit interval before committing data to the Lakehouse table. Set it to -1 to disable the time-based flush. Default: -1 | N | -1 |
maxCommitSize | The max commits size before committing data to the Lakehouse table. Set it to -1 to disable the message size based flush. Default: -1 | N | -1 |
googleCloudProjectID | GCS offload project configuration. For example: example-project | Required if offloading data to GCS | N/A |
googleCloudServiceAccountFile | GCS offload authentication. For example: /Users/user-name/Downloads/project-804d5e6a6f33.json | Required if offloading data to GCS | N/A |
hadoop.xxx | All the Hadoop-related configurations can be set with hadoop. prefix | N | N/A |
delta.compressType | The compression type for the delta table, Available options are: UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD. Default: SNAPPY | N | SNAPPY |
delta.fileType | The file type for the delta table, Available options are: PARQUET. Default: PARQUET | N | PARQUET |
delta.rowGroupSize | The row group size for the delta table. Default: 10MB | N | 10485760 |
delta.maxLedgerNumOnOneIndex | The delta max ledger num on one index, default is 1024. | N | 1024 |
delta.reader.count | The max opened delta reader count, default is 50. | N | 50 |
delta.compactThresholdSize | The delta compact threshold size, default is 100MB. | N | 104857600 |
If you want to offload data to S3, you need to export AWS_ACCESS_KEY_ID
and AWS_SECRET_KEY
before starting up the broker service and setting the storagePath
with s3a://
prefix.
Limitations
The limitations of Lakehouse Storage include:
- Schema evolution does not support field deletion
- Supported schema types vary for different table formats
- We only support AVRO and Pulsar primitive schema for Delta Lake tables
- Inability to configure different table formats for distinct namespaces or topics
Delta Lake-specific limitations:
- Avoid using Spark for small file compaction to prevent interference with streaming reads from Delta tables. Separate support for the compaction feature is available.
Notice
In Lakehouse Storage, the RawReader interface is utilized to retrieve messages from Pulsar topics and write them to the Lakehouse. A critical component of this process is the offload cursor, which marks the progress of the offloading operation. It's crucial to note that prematurely advancing the offload cursor can lead to data loss in writing to lakehouse tables.
When configuring Time-to-Live (TTL) settings at the namespace or topic level, consider the following two key options:
- Do Not Configure TTL at Namespace or Topic Level:
- This option ensures that no TTL constraints are imposed at the namespace or topic level, allowing data to persist without automatic expiration based on time.
- Configure TTL at Namespace or Topic Level:
- If TTL settings are implemented at the namespace or topic level, it is crucial to ensure that the TTL value is greater than the retention policy set for the topic or namespace.
By carefully considering these options and aligning TTL configurations with retention policies, organizations can effectively manage the data lifecycle and retention within the Lakehouse Storage, thereby safeguarding against unintended data loss scenarios.