1. Configure Private Cloud
  2. Private Preview
  3. Lakehouse Storage

Configure Lakehouse Storage

Note

This feature is currently in private preview. If you want to try it out or have any questions, submit a ticket to the support team.

Overview

This tutorial offers a detailed guide on configuring Lakehouse Storage within a private cloud environment.

Supported Features

The Lakehouse Storage supports the following features:

  • Storage options: HDFS, and Delta
  • Data production and consumption with AVRO schema and Pulsar primitive schema
  • Prerequisites: JDK 17+ and Pulsar 3.0.0+

Configuration Steps

To set up Lakehouse tiered storage, create an offload.conf file in the conf directory. The configuration parameters include:

Offload Framework Configurations

ConfigurationDescriptionRequiredDefault Value
metadataServiceUriThe metadata service uri for bookkeeper client, for example, zk://localhost:2181/ledgersYN/A
pulsarWebServiceUrlThe pulsar web service URL, for example, http://localhost:8080YN/A
pulsarServiceUrlThe pulsar protocol service URL, for example, pulsar://localhost:6650YN/A
offloadProviderThe offloader driver's name, for example, delta or icebergYN/A
maxContainerCountPerTopicThe maximum containers will be used by a single topic.N2
containerSizeInBytesThe maximum bytes for each container. By default, the container can fill with 24 MB messages to write.N25165824
enableSharedMessageContainerEnable shared message container or not. Once the shared message container is enabled, all the offload processors will get the container from the shared message container poolNtrue
sharedMessageContainerPoolMemoryThe max shared message container pool memory size. Default: 0.125 * maxDirectMemorySizeN0.125 * maxDirectMemory
offloadSubNameoffload subscription nameN__OFFLOAD
initialDelayTimeThe initial delay time in millisecondsN10000
maxRetryTimesThe max retry times of the delay, when the backoff reaches the max retry number, the delay time won't increase anymoreN5
maxContainerSwitchTimeThe max time that message container waiting to switch. Time unit: second. Default: 60 seconds.N60
maxReadCacheSizeAmount of memory to use for caching data payload in offload framework. This memory is allocated from JVM direct memory and it's shared across all the topics running in the same broker. By default, uses 1/8th of available direct memory.NMath.max(64MB, (int) (0.125 * maxDirectMemory()))
readContainerBatchSizeThe bytes size of entries one read container hold to read. Default is 5MB.N5242880
offloadWriterMaxRetryTimesOffload writer the max retry times of the delay to retry for retryable write error.N3
embeddedStatServiceEnabledEnable the embedded stat serviceNtrue
statServicePortStat service default portN8090
offloadWriterThreadsThe number of thread pool threads for lakehouse writerN2
offloadReaderThreadsThe number of thread pool threads for Lakehouse readerN2
offloadCoreThreadsThe number of thread pool threads for the core processorN2
offloadHighPriorityTaskThreadsMaximum number of thread pool threads for the core processor high-priority taskN2
clusterThe tiered storage offloading cluster name, and it will be shown on the Prometheus metrics label. If you run the offloader within the Pulsar broker, you can set it the same with the Pulsar cluster name. Default: pulsarNpulsar
schemaRegistryTypeThe schema registry type, we support 'pulsar','kop','fisher', default is 'pulsar'.Npulsar
messageFormatThe msg's format, we support 'avro','json','csv','kop', default is 'avro'.Navro
skipDirtyDataWhether to skip the dirty data, if skip, it will also ack the msg although encode failed.Nfalse
appendPulsarMetaFieldWhether to append pulsar meta filed(such as: **ledgerId, **entryId). If disabled, can't read the msg from tiered storage.Ntrue
partitionKeyThe custom partition key in the lakehouse table, support multi partition key split by ','. It only works when 'appendPulsarMetaField' is disabled. Default value is 'none'Nnone
dataManagedByPulsarWhether the data lifecycle is managed by Pulsar or not. If setting it to false, all the delete request from Pulsar won't be processed to the tiered storageNtrue

Offload Writer/Reader Configurations

ConfigurationDescriptionRequiredDefault Value
storagePathThe data stored path. For example, gs://offload-dev (GCS offload) or s3a://offload-dev (S3 offload)Ndata
hdfsResourceFileHadoop profile path. The configuration file is stored in the Hadoop profile path. It contains various settings for Hadoop performance tuning.Nconf/filesystem_offload_core_site.xml
maxCommitIntervalThe max commit interval before committing data to the Lakehouse table. Set it to -1 to disable the time-based flush. Default: -1N-1
maxCommitSizeThe max commits size before committing data to the Lakehouse table. Set it to -1 to disable the message size based flush. Default: -1N-1
googleCloudProjectIDGCS offload project configuration. For example: example-projectRequired if offloading data to GCSN/A
googleCloudServiceAccountFileGCS offload authentication. For example: /Users/user-name/Downloads/project-804d5e6a6f33.jsonRequired if offloading data to GCSN/A
hadoop.xxxAll the Hadoop-related configurations can be set with hadoop. prefixNN/A
delta.compressTypeThe compression type for the delta table, Available options are: UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD. Default: SNAPPYNSNAPPY
delta.fileTypeThe file type for the delta table, Available options are: PARQUET. Default: PARQUETNPARQUET
delta.rowGroupSizeThe row group size for the delta table. Default: 10MBN10485760
delta.maxLedgerNumOnOneIndexThe delta max ledger num on one index, default is 1024.N1024
delta.reader.countThe max opened delta reader count, default is 50.N50
delta.compactThresholdSizeThe delta compact threshold size, default is 100MB.N104857600

If you want to offload data to S3, you need to export AWS_ACCESS_KEY_ID and AWS_SECRET_KEY before starting up the broker service and setting the storagePath with s3a:// prefix.

Limitations

The limitations of Lakehouse Storage include:

  • Schema evolution does not support field deletion
  • Supported schema types vary for different table formats
    • We only support AVRO and Pulsar primitive schema for Delta Lake tables
  • Inability to configure different table formats for distinct namespaces or topics

Delta Lake-specific limitations:

  • Avoid using Spark for small file compaction to prevent interference with streaming reads from Delta tables. Separate support for the compaction feature is available.

Notice

In Lakehouse Storage, the RawReader interface is utilized to retrieve messages from Pulsar topics and write them to the Lakehouse. A critical component of this process is the offload cursor, which marks the progress of the offloading operation. It's crucial to note that prematurely advancing the offload cursor can lead to data loss in writing to lakehouse tables.

When configuring Time-to-Live (TTL) settings at the namespace or topic level, consider the following two key options:

  1. Do Not Configure TTL at Namespace or Topic Level:
  • This option ensures that no TTL constraints are imposed at the namespace or topic level, allowing data to persist without automatic expiration based on time.
  1. Configure TTL at Namespace or Topic Level:
  • If TTL settings are implemented at the namespace or topic level, it is crucial to ensure that the TTL value is greater than the retention policy set for the topic or namespace.

By carefully considering these options and aligning TTL configurations with retention policies, organizations can effectively manage the data lifecycle and retention within the Lakehouse Storage, thereby safeguarding against unintended data loss scenarios.

Previous
Get Started