The Lakehouse sink connector (including the Hudi, Iceberg, and Delta Lake sink connectors) fetches data from a Pulsar topic and saves data to the Lakehouse tables.
How to get
This section describes how to build the Lakehouse sink connector.
You can get the Lakehouse sink connector using one of the following methods:
- Download the NAR package from the download page.
- Build it from the source code.
To build the Lakehouse sink connector from the source code, follow these steps.
Clone the source code to your machine.
git clone https://github.com/streamnative/pulsar-io-lakehouse.git
Build the connector in the
pulsar-io-lakehouse
directory.Build the NAR package for your local file system.
mvn clean install -DskipTests
Build the NAR package for your cloud storage (Including AWS, GCS and Azure related package dependency).
mvn clean install -P cloud -DskipTests
After the connector is successfully built, a NAR package is generated under the target directory.
ls target pulsar-io-lakehouse-{{connector:version}}.nar
How to configure
Before using the Lakehouse sink connector, you need to configure it. This table lists the properties and the descriptions.
For a list of Hudi configurations, see Write Client Configs.
Name | Type | Required | Default | Description |
---|---|---|---|---|
type | String | true | N/A | The type of the Lakehouse source connector. Available values: hudi , iceberg , and delta . |
maxCommitInterval | Integer | false | 120 | The maximum flush interval (in units of seconds) for each batch. By default, it is set to 120s. |
maxRecordsPerCommit | Integer | false | 10_000_000 | The maximum number of records for each batch to commit. By default, it is set to 10_000_000 . |
maxCommitFailedTimes | Integer | false | 5 | The maximum commit failure times until failing the process. By default, it is set to 5 . |
sinkConnectorQueueSize | Integer | false | 10_000 | The maximum queue size of the Lakehouse sink connector to buffer records before writing to Lakehouse tables. |
partitionColumns | List<String> | false | Collections.empytList() | The partition columns for Lakehouse tables. |
processingGuarantees | Int | true | " " (empty string) | The processing guarantees. Currently the Lakehouse connector only supports EFFECTIVELY_ONCE . |
hudi.table.name | String | true | N/A | The name of the Hudi table that Pulsar topic sinks data to. |
hoodie.table.type | String | false | COPY_ON_WRITE | The type of the Hudi table of the underlying data for one write. It cannot be changed between writes. |
hoodie.base.path | String | true | N/A | The base path of the lake storage where all table data is stored. It always has a specific prefix with the storage scheme (for example, hdfs://, s3:// etc). Hudi stores all the main metadata about commits, savepoints, cleaning audit logs etc in the .hoodie directory. |
hoodie.datasource.write.recordkey.field | String | false | UUID | The record key field. It is used as the recordKey component of HoodieKey . You can obtain the value by invoking .toString() on the field value. You can use the dot notation for nested fields such as a.b.c. |
hoodie.datasource.write.partitionpath.field | String | true | N/A | The partition path field. It is used as the partitionPath component of the HoodieKey . You can obtain the value by invoking .toString() . |
Note
The Lakehouse sink connector uses the Hadoop file system to read and write data to and from cloud objects, such as AWS, GCS, and Azure. If you want to configure Hadoop related properties, you should use the prefix hadoop.
.
You can create a configuration file (JSON or YAML) to set the properties if you use Pulsar Function Worker to run connectors in a cluster.
Example
The Hudi table that is stored in the file system
{ "tenant": "public", "namespace": "default", "name": "hudi-sink", "inputs": [ "test-hudi-pulsar" ], "archive": "connectors/pulsar-io-hudi-{{connector:version}}.nar", "parallelism": 1, "configs": { "type": "hudi", "hoodie.table.name": "hudi-connector-test", "hoodie.table.type": "COPY_ON_WRITE", "hoodie.base.path": "file:///tmp/data/hudi-sink", "hoodie.datasource.write.recordkey.field": "id", "hoodie.datasource.write.partitionpath.field": "id", } }
The Hudi table that is stored in the AWS S3
{ "tenant": "public", "namespace": "default", "name": "hudi-sink", "inputs": [ "test-hudi-pulsar" ], "archive": "connectors/pulsar-io-hudi-{{connector:version}}.nar", "parallelism": 1, "configs": { "type": "hudi", "hoodie.table.name": "hudi-connector-test", "hoodie.table.type": "COPY_ON_WRITE", "hoodie.base.path": "s3a://bucket/path/to/hudi", "hoodie.datasource.write.recordkey.field": "id", "hoodie.datasource.write.partitionpath.field": "id", "hadoop.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain" } }
Data format types
The Lakehouse sink Connector provides multiple output format options, including Avro and Parquet. The default format is Parquet. With the current implementation, there are some limitations for different formats:
This table lists the Pulsar Schema types supported by the writers.
Pulsar Schema | Writer: Avro | Writer: Parquet |
---|---|---|
Primitive | ✗ | ✗ |
Avro | ✔ | ✔ |
Json | ✔ | ✔ |
Protobuf * | ✗ | ✗ |
ProtobufNative * | ✗ | ✗ |
*: The Protobuf schema is based on the Avro schema. It uses Avro as an intermediate format, so it may not provide the best effort conversion.
*: The ProtobufNative record holds the Protobuf descriptor and the message. When writing to Avro format, the connector uses avro-protobuf to do the conversion.
How to use
You can use the Lakehouse sink connector with Function Worker. You can use the Lakehouse sink connector as a non built-in connector or a built-in connector.
If you already have a Pulsar cluster, you can use the Lakehouse sink connector as a non built-in connector directly.
This example shows how to create a Lakehouse sink connector on a Pulsar cluster using the pulsar-admin sinks create
command.
PULSAR_HOME/bin/pulsar-admin sinks create \
--sink-config-file <lakehouse-sink-config.yaml>