How to get
This section describes how to build the Lakehouse source connector. You can get the Lakehouse source connector using one of the following methods:- Download the NAR package from the download page.
- Build it from the source code.
-
Clone the source code to your machine.
-
Build the connector in the
pulsar-io-lakehousedirectory.-
Build the NAR package for your local file system.
-
Build the NAR package for your cloud storage (Including AWS, GCS, and Azure-related package dependency).
-
Build the NAR package for your local file system.
How to configure
Before using the Lakehouse source connector, you need to configure it. This table lists the properties and the descriptions.- Delta Lake
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
type | String | true | N/A | The type of the Lakehouse source connector. Available values: delta. |
checkpointInterval | int | false | 30 | The checkpoint interval (in units of seconds). By default, it is set to 30s. |
queueSize | int | false | 10_000 | The buffer queue size of the Lakehouse source connector. The buffer queue is used for store records before they are sent to Pulsar topics. By default, it is set to 10_000. |
fetchHistoryData | bool | false | false | Configure whether to fetch the history data of the table. By default, it is set to false. |
startSnapshotVersion | long | false | -1 | The Delta snapshot version to start capturing data change. Available values: [-1: LATEST, -2: EARLIEST]. The startSnapshotVersion and startTimestamp are mutually exclusive. |
startTimestamp | long | false | N/A | The Delta snapshot timestamp (in units of seconds) to start capturing data change. The startSnapshotVersion and startTimestamp are mutually exclusive. |
tablePath | String | true | N/A | The path of the Delta table. |
parquetParseThreads | int | false | Runtime.getRuntime().availableProcessors() | The parallelism of paring Delta Parquet files. By default, it is set to Runtime.getRuntime().availableProcessors(). |
maxReadBytesSizeOneRound | long | false | Total memory * 0.2 | The maximum read bytes size from Parquet files in one fetch round. By default, it is set to 20% of the heap memory. |
maxReadRowCountOneRound | int | false | 100_000 | The maximum read number of rows processed in one round. By default, it is set to 1_000_000. |
The Lakehouse source connector uses the Hadoop file system to read and write data to and from cloud objects, such as AWS, GCS, and Azure. If you want to configure Hadoop related properties, you should use the prefix
hadoop..Examples
You can create a configuration file (JSON or YAML) to set the properties if you use Pulsar Function Worker to run connectors in a cluster.- Delta Lake
-
The Delta table that is stored in the file system
-
The Delta table that is stored in cloud storage (AWS S3, GCS, or Azure)
Data format types
Currently, The Lakehouse source connector only supports reading Delta table changelogs, which adopt aparquet storage format.
How to use
You can use the Lakehouse source connector with Function Worker. You can use the Lakehouse source connector as a non built-in connector or a built-in connector.- Use it as a non built-in connector
- Use it as a built-in connector
If you already have a Pulsar cluster, you can use the Lakehouse source connector as a non built-in connector directly.This example shows how to create a Lakehouse source connector on a Pulsar cluster using the
pulsar-admin sources create command.Demos
This table lists demos that show how to run the Delta Lake, Hudi, and Iceberg source connectors with other external systems. Currently, only the demo on the Delta Lake source connector is available.| Connector | Link |
|---|---|
| Delta Lake | For details, see the Delta Lake demo. |
| Hudi | |
| Iceberg |

































