Prerequisites
- Setup the Iceberg Catalog
- Create the Iceberg connector control topic, which cannot be used by other connectors.
Quick Start
- Setup the kcctl client: doc
-
Set up the Iceberg Catalog, we can use the below yaml file to create a local iceberg catalog in k8s:
-
Initialize the Iceberg table:
-
Create a JSON file like the following:
-
Run the following command to create the connector:
Quick Start 2 - Write to AWS S3 Table
This is a real example which sink data to AWS S3 Iceberg table, below are the steps.-
Create an AWS S3 table bucket, this is a new bucket type, A regular S3 bucket won’t work for S3 Tables, here we use
kc-test-iceberg-table
as an example. -
Create a new database in the table bucket, you can go to the AWS Lake Formation console, and then go to the Data Catalog/Databases section to create it, here we use
my_s3_namespace
as an example. -
Create an AWS IAM user and attach the following policy to it, replace
eu-north-1
,account-id
,kc-test-iceberg-table
andmy_s3_namespace
with your realregion
,account-id
,table-bucket
anddatabase
respectively.: -
Create an access key for the IAM user, you will need the
Access Key ID
andSecret Access Key
later. - In the Lake Formation console, grant the user database permissions (Create table, Describe) on [account-id]:s3tablescatalog/kc-test-iceberg-table/my_s3_namespace, and table permissions (Super on ALL_TABLES or at least the table you’ll write).
-
Create a JSON file like the following, replace the fields such as
region
,account-id
,table_bucket
,access_key_id
andsecret_access_key
with your real values: -
Run the following command to create the connector:
-
Produce some test data to the
events
topic, you can use the following command to produce some test data: -
Wait until the data is written to the Iceberg table, you can check the logs of the connector to see something like below:
-
Check the Iceberg table in the AWS Athena console, you should see the
events
table in themy_s3_namespace
database, and you can run a query like below to see the data:
Limitations
- Each Iceberg sink connector must have its own control topic.
Configuration
The following Required properties are used to configure the connector.Parameter | Description |
---|---|
topics | Comma-separated list of the Kafka topics you want to replicate. (You can define either the topics or the topics.regex setting, but not both.) |
topics.regex | Java regular expression of topics to replicate. (You can define either the topics or the topics.regex setting, but not both.) |
iceberg.control.topic | The name of the control topic. It cannot be used by other Iceberg connectors. |
iceberg.catalog.type | The type of Iceberg catalog. Allowed options are: REST , HIVE , HADOOP . |
iceberg.tables | Comma-separated list of Iceberg table names, which are specified using the format {namespace}.{table} . |
Parameter | Description |
---|---|
iceberg.control.commit.timeout-ms | Commit timeout interval in ms. The default is 30000 (30 sec). |
iceberg.tables.route-field | For multi-table fan-out, the name of the field used to route records to tables. Required when iceberg.tables.dynamic-enabled is set to true . |
iceberg.tables.cdc-field | Name of the field containing the CDC operation, I , U , or D , default is none |