> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

> The official Apache Iceberg Kafka Connect Sink connector.

# Kafka connect iceberg sink

The Apache Iceberg Kafka Connect Sink connector is a Kafka Connect connector that writes data from Kafka topics to Apache Iceberg tables.

<Note title="✅ Available on StreamNative Cloud">
  This connector is available as a built-in connector on StreamNative Cloud.
</Note>

### Prerequisites

* Setup the [Iceberg Catalog](https://iceberg.apache.org/concepts/catalog/)
* Create the Iceberg connector control topic, which cannot be used by other connectors.

### Quick Start

1. Setup the kcctl client: [doc](https://docs.streamnative.io/docs/kafka-connect-setup)

2. Set up the Iceberg Catalog, we can use the below yaml file to create a local iceberg catalog in k8s:
   ```yaml theme={null}
   apiVersion: v1
   data:
     spark-defaults.conf: |
       #
       # Licensed to the Apache Software Foundation (ASF) under one or more
       # contributor license agreements.  See the NOTICE file distributed with
       # this work for additional information regarding copyright ownership.
       # The ASF licenses this file to You under the Apache License, Version 2.0
       # (the "License"); you may not use this file except in compliance with
       # the License.  You may obtain a copy of the License at
       #
       #    http://www.apache.org/licenses/LICENSE-2.0
       #
       # Unless required by applicable law or agreed to in writing, software
       # distributed under the License is distributed on an "AS IS" BASIS,
       # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       # See the License for the specific language governing permissions and
       # limitations under the License.
       #

       # Default system properties included when running spark-submit.
       # This is useful for setting default environmental settings.

       # Example:
       spark.sql.extensions                   org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
       spark.sql.catalog.demo                 org.apache.iceberg.spark.SparkCatalog
       spark.sql.catalog.demo.type            rest
       spark.sql.catalog.demo.uri             http://iceberg-rest.default.svc.cluster.local:8181
       spark.sql.catalog.demo.io-impl         org.apache.iceberg.aws.s3.S3FileIO
       spark.sql.catalog.demo.warehouse       s3://warehouse/
       spark.sql.catalog.demo.s3.endpoint     http://minio.default.svc.cluster.local:9000
       spark.sql.defaultCatalog               demo
       spark.eventLog.enabled                 true
       spark.eventLog.dir                     /home/iceberg/spark-events
       spark.history.fs.logDirectory          /home/iceberg/spark-events
       spark.sql.catalogImplementation        in-memory
   kind: ConfigMap
   metadata:
     name: spark-config
     namespace: default
   ---
   apiVersion: apps/v1
   kind: Deployment
   metadata:
     name: spark-iceberg
     namespace: default
   spec:
     replicas: 1
     selector:
       matchLabels:
         app: spark-iceberg
     template:
       metadata:
         labels:
           app: spark-iceberg
       spec:
         containers:
         - name: spark-iceberg
           image: tabulario/spark-iceberg
           ports:
             - name: one
               containerPort: 8888
             - name: two
               containerPort: 8080
             - name: three
               containerPort: 10000
             - name: four
               containerPort: 10001
           env:
             - name: AWS_ACCESS_KEY_ID
               value: admin
             - name: AWS_SECRET_ACCESS_KEY
               value: password
             - name: AWS_REGION
               value: us-east-1
           volumeMounts:
             - name: spark-config
               mountPath: /opt/spark/conf/spark-defaults.conf
               subPath: spark-defaults.conf
         volumes:
           - name: spark-config
             configMap:
               name: spark-config
   ---
   apiVersion: v1
   kind: Service
   metadata:
     name: spark-iceberg
     namespace: default
   spec:
     selector:
       app: spark-iceberg
     ports:
       - protocol: TCP
         name: one
         port: 8888
         targetPort: 8888
       - protocol: TCP
         name: two
         port: 8080
         targetPort: 8080
       - protocol: TCP
         name: three
         port: 10000
         targetPort: 10000
       - protocol: TCP
         port: 10001
         name: four
         targetPort: 10001
   ---
   apiVersion: apps/v1
   kind: Deployment
   metadata:
     name: iceberg-rest
     namespace: default
   spec:
     replicas: 1
     selector:
       matchLabels:
         app: iceberg-rest
     template:
       metadata:
         labels:
           app: iceberg-rest
       spec:
         containers:
         - name: iceberg-rest
           image: tabulario/iceberg-rest
           ports:
             - name: one
               containerPort: 8181
           env:
             - name: AWS_ACCESS_KEY_ID
               value: admin
             - name: AWS_SECRET_ACCESS_KEY
               value: password
             - name: AWS_REGION
               value: us-east-1
             - name: CATALOG_WAREHOUSE
               value: s3://warehouse/
             - name: CATALOG_IO__IMPL
               value: org.apache.iceberg.aws.s3.S3FileIO
             - name: CATALOG_S3_ENDPOINT
               value: http://minio.default.svc.cluster.local:9000
   ---
   apiVersion: v1
   kind: Service
   metadata:
     name: iceberg-rest
     namespace: default
   spec:
     selector:
       app: iceberg-rest
     ports:
       - protocol: TCP
         name: one
         port: 8181
         targetPort: 8181
   ---
   apiVersion: apps/v1
   kind: Deployment
   metadata:
     name: minio
     namespace: default
   spec:
     replicas: 1
     selector:
       matchLabels:
         app: minio
     template:
       metadata:
         labels:
           app: minio
       spec:
         hostname: warehouse
         subdomain: minio
         containers:
         - name: minio
           image: minio/minio
           args:
             - server
             - /data
             - --console-address
             - ":9001"
           ports:
             - name: one
               containerPort: 9000
             - name: two
               containerPort: 9001
           env:
             - name: MINIO_ROOT_USER
               value: admin
             - name: MINIO_ROOT_PASSWORD
               value: password
             - name: MINIO_DOMAIN
               value: minio.default.svc.cluster.local
   ---
   apiVersion: v1
   kind: Service
   metadata:
     name: minio
     namespace: default
   spec:
     selector:
       app: minio
     ports:
       - protocol: TCP
         name: one
         port: 9000
         targetPort: 9000
       - protocol: TCP
         name: two
         port: 9001
         targetPort: 9001
   ```

3. Initialize the Iceberg table:

   ```bash theme={null}
   kubectl apply -f iceberg-spark.yaml
   kubectl wait -l app=spark-iceberg --for=condition=Ready pod --timeout=5m
   kubectl wait -l app=iceberg-rest --for=condition=Ready pod --timeout=5m
   kubectl wait -l app=minio --for=condition=Ready pod --timeout=5m

   sleep 30

   # initialize the bucket
   minio_pod_name=$(kubectl get pods -l app=minio -o=jsonpath='{.items[0].metadata.name}')
   kubectl exec $minio_pod_name -- /usr/bin/mc config host add minio http://minio.default.svc.cluster.local:9000 admin password
   kubectl exec $minio_pod_name -- /usr/bin/mc rm -r --force minio/warehouse || true
   kubectl exec $minio_pod_name -- /usr/bin/mc mb minio/warehouse
   kubectl exec $minio_pod_name -- /usr/bin/mc policy set public minio/warehouse
   ```

4. Create a JSON file like the following:

   ```json theme={null}
   {
       "name": "iceberg-sink",
       "config": {
           "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
           "topics": "kafka-iceberg-input",
           "iceberg.tables": "sink.kafka",
           "iceberg.catalog": "demo",
           "iceberg.catalog.type": "rest",
           "iceberg.catalog.uri": "http://iceberg-rest.default.svc.cluster.local:8181",
           "iceberg.catalog.client.region": "us-east-1",
           "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
           "iceberg.catalog.warehouse": "s3://warehouse",
           "iceberg.catalog.s3.endpoint": "http://minio.default.svc.cluster.local:9000",
           "iceberg.catalog.s3.path-style-access": "true",
           "iceberg.catalog.s3.access-key-id": "admin",
           "iceberg.catalog.s3.secret-access-key": "password",
           "iceberg.tables.auto-create-enabled": "true",
           "iceberg.tables.evolve-schema-enabled": "true",
           "iceberg.control.commit.interval-ms": "1000",
           "key.converter": "org.apache.kafka.connect.storage.StringConverter",
           "value.converter": "org.apache.kafka.connect.json.JsonConverter",
           "value.converter.schemas.enable": "false",
           "tasks.max": "1"
       }
   }
   ```

5. Run the following command to create the connector:

   ```bash theme={null}
   kcctl apply -f <filename>.json
   ```

### Quick Start 2 - Write to AWS S3 Table

This is a real example which sink data to AWS S3 Iceberg table, below are the steps.

1. Create an AWS S3 **table bucket**, this is a **new bucket type**, A regular S3 bucket won’t work for S3 Tables, here we use `kc-test-iceberg-table` as an example.

2. Create a new database in the table bucket, you can go to the **AWS Lake Formation** console, and then go to the **Data Catalog**/**Databases** section to create it, here we use `my_s3_namespace` as an example.

3. Create an AWS IAM user and attach the following policy to it, replace `eu-north-1`, `account-id`, `kc-test-iceberg-table` and `my_s3_namespace` with your real `region`, `account-id`, `table-bucket` and `database` respectively.:
   ```json theme={null}
   {
       "Version": "2012-10-17",
       "Statement": [
           {
               "Sid": "VisualEditor0",
               "Effect": "Allow",
               "Action": "lakeformation:GetDataAccess",
               "Resource": "*"
           },
           {
               "Sid": "GlueAndCatalogForS3Tables",
               "Effect": "Allow",
               "Action": [
                   "glue:GetCatalog",
                   "glue:GetDatabase",
                   "glue:GetDatabases",
                   "glue:GetTable",
                   "glue:GetTables",
                   "glue:CreateTable",
                   "glue:UpdateTable"
               ],
               "Resource": [
                   "arn:aws:glue:eu-north-1:{account_id}:catalog",
                   "arn:aws:glue:eu-north-1:{account_id}:catalog/s3tablescatalog",
                   "arn:aws:glue:eu-north-1:{account_id}:catalog/s3tablescatalog/kc-test-iceberg-table",
                   "arn:aws:glue:eu-north-1:{account_id}:database/s3tablescatalog/kc-test-iceberg-table/my_s3_namespace",
                   "arn:aws:glue:eu-north-1:{account_id}:table/s3tablescatalog/kc-test-iceberg-table/my_s3_namespace/*"
               ]
           }
       ]
   }
   ```

4. Create an access key for the IAM user, you will need the `Access Key ID` and `Secret Access Key` later.

5. In the **Lake Formation** console, grant the user database permissions (**Create table**, **Describe**) on \[account-id]:s3tablescatalog/kc-test-iceberg-table/my\_s3\_namespace, and table permissions (**Super** on **ALL\_TABLES** or at least the table you’ll write).

6. Create a JSON file like the following, replace the fields such as `region`, `account-id`, `table_bucket`, `access_key_id` and `secret_access_key` with your real values:
   ```JSON theme={null}
   {
       "name": "test-ice",
       "config": {
           "name": "test-ice",
           "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
           "topics": "events",
           "tasks.max": "1",
           "iceberg.tables.auto-create-enabled": "true",
           "iceberg.tables": "my_s3_namespace.events",
           "iceberg.tables.evolve-schema-enabled": "true",
           "header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter",
           "value.converter": "org.apache.kafka.connect.json.JsonConverter",
           "value.converter.schemas.enable": "false",
           "key.converter": "org.apache.kafka.connect.storage.StringConverter",
           "key.converter.schemas.enable": "false",
           "iceberg.catalog": "iceberg",
           "iceberg.catalog.type": "rest",
           "iceberg.catalog.uri": "https://glue.{region}.amazonaws.com/iceberg",
           "iceberg.catalog.warehouse": "{account-id}:s3tablescatalog/{table_bucket}",
           "iceberg.catalog.rest.access-key-id": "{access_key_id}",
           "iceberg.catalog.rest.secret-access-key": "{secret_access_key}",
           "iceberg.catalog.rest.sigv4-enabled": "true",
           "iceberg.catalog.rest.signing-name": "glue",
           "iceberg.catalog.rest.signing-region": "{region}",
           "iceberg.control.topic": "control-iceberg",
           "sn.connector.image.opts": "-Daws.region={region}"
       }
   }
   ```

7. Run the following command to create the connector:

   ```bash theme={null}
   kcctl apply -f <filename>.json
   ```

8. Produce some test data to the `events` topic, you can use the following command to produce some test data:

   ```bash theme={null}
   echo '{"id": 1, "name": "Jack", "message": "hello iceberg", "email": "Jack@test.com"}' | ~/kafka/kafka_2.13-3.1.0/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic events
   ```

9. Wait until the data is written to the Iceberg table, you can check the logs of the connector to see something like below:

   ```
   2025-08-25T08:05:27,622+0000 [iceberg-coord] INFO  io.tabular.iceberg.connect.channel.Coordinator - Commit fa40ffe5-9424-4994-8439-cebe46919ff4 complete, committed to 1 table(s), vtts null
   ```

10. Check the Iceberg table in the AWS Athena console, you should see the `events` table in the `my_s3_namespace` database, and you can run a query like below to see the data:

    ```sql theme={null}
    SELECT * FROM "my_s3_namespace"."events" limit 10;
    ```

### Limitations

* Each Iceberg sink connector must have its own control topic.

### Configuration

The following *Required* properties are used to configure the connector.

| Parameter               | Description                                                                                                                                       |
| ----------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------- |
| `topics`                | Comma-separated list of the Kafka topics you want to replicate. (You can define either the `topics` or the `topics.regex` setting, but not both.) |
| `topics.regex`          | Java regular expression of topics to replicate. (You can define either the `topics` or the `topics.regex` setting, but not both.)                 |
| `iceberg.control.topic` | The name of the control topic. It cannot be used by other Iceberg connectors.                                                                     |
| `iceberg.catalog.type`  | The type of Iceberg catalog. Allowed options are: `REST`, `HIVE`, `HADOOP`.                                                                       |
| `iceberg.tables`        | Comma-separated list of Iceberg table names, which are specified using the format `{namespace}.{table}`.                                          |

The following *Advanced* properties are used to configure the connector.

| Parameter                           | Description                                                                                                                                      |
| ----------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------ |
| `iceberg.control.commit.timeout-ms` | Commit timeout interval in ms. The default is 30000 (30 sec).                                                                                    |
| `iceberg.tables.route-field`        | For multi-table fan-out, the name of the field used to route records to tables. Required when `iceberg.tables.dynamic-enabled` is set to `true`. |
| `iceberg.tables.cdc-field`          | Name of the field containing the CDC operation, `I`, `U`, or `D`, default is none                                                                |

For more information about the properties, see the [official documentation](https://github.com/tabular-io/iceberg-kafka-connect/blob/v0.6.19/README.md).
