The Apache Iceberg Kafka Connect Sink connector is a Kafka Connect connector that writes data from Kafka topics to Apache Iceberg tables.
Prerequisites
- Setup the Iceberg Catalog
- Create the Iceberg connector control topic, which cannot be used by other connectors.
Quick Start
-
Setup the kcctl client: doc
-
Set up the Iceberg Catalog, we can use the below yaml file to create a local iceberg catalog in k8s:
apiVersion: v1
data:
spark-defaults.conf: |
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.
# Example:
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.demo org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.demo.type rest
spark.sql.catalog.demo.uri http://iceberg-rest.default.svc.cluster.local:8181
spark.sql.catalog.demo.io-impl org.apache.iceberg.aws.s3.S3FileIO
spark.sql.catalog.demo.warehouse s3://warehouse/
spark.sql.catalog.demo.s3.endpoint http://minio.default.svc.cluster.local:9000
spark.sql.defaultCatalog demo
spark.eventLog.enabled true
spark.eventLog.dir /home/iceberg/spark-events
spark.history.fs.logDirectory /home/iceberg/spark-events
spark.sql.catalogImplementation in-memory
kind: ConfigMap
metadata:
name: spark-config
namespace: default
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: spark-iceberg
namespace: default
spec:
replicas: 1
selector:
matchLabels:
app: spark-iceberg
template:
metadata:
labels:
app: spark-iceberg
spec:
containers:
- name: spark-iceberg
image: tabulario/spark-iceberg
ports:
- name: one
containerPort: 8888
- name: two
containerPort: 8080
- name: three
containerPort: 10000
- name: four
containerPort: 10001
env:
- name: AWS_ACCESS_KEY_ID
value: admin
- name: AWS_SECRET_ACCESS_KEY
value: password
- name: AWS_REGION
value: us-east-1
volumeMounts:
- name: spark-config
mountPath: /opt/spark/conf/spark-defaults.conf
subPath: spark-defaults.conf
volumes:
- name: spark-config
configMap:
name: spark-config
---
apiVersion: v1
kind: Service
metadata:
name: spark-iceberg
namespace: default
spec:
selector:
app: spark-iceberg
ports:
- protocol: TCP
name: one
port: 8888
targetPort: 8888
- protocol: TCP
name: two
port: 8080
targetPort: 8080
- protocol: TCP
name: three
port: 10000
targetPort: 10000
- protocol: TCP
port: 10001
name: four
targetPort: 10001
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: iceberg-rest
namespace: default
spec:
replicas: 1
selector:
matchLabels:
app: iceberg-rest
template:
metadata:
labels:
app: iceberg-rest
spec:
containers:
- name: iceberg-rest
image: tabulario/iceberg-rest
ports:
- name: one
containerPort: 8181
env:
- name: AWS_ACCESS_KEY_ID
value: admin
- name: AWS_SECRET_ACCESS_KEY
value: password
- name: AWS_REGION
value: us-east-1
- name: CATALOG_WAREHOUSE
value: s3://warehouse/
- name: CATALOG_IO__IMPL
value: org.apache.iceberg.aws.s3.S3FileIO
- name: CATALOG_S3_ENDPOINT
value: http://minio.default.svc.cluster.local:9000
---
apiVersion: v1
kind: Service
metadata:
name: iceberg-rest
namespace: default
spec:
selector:
app: iceberg-rest
ports:
- protocol: TCP
name: one
port: 8181
targetPort: 8181
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: minio
namespace: default
spec:
replicas: 1
selector:
matchLabels:
app: minio
template:
metadata:
labels:
app: minio
spec:
hostname: warehouse
subdomain: minio
containers:
- name: minio
image: minio/minio
args:
- server
- /data
- --console-address
- ":9001"
ports:
- name: one
containerPort: 9000
- name: two
containerPort: 9001
env:
- name: MINIO_ROOT_USER
value: admin
- name: MINIO_ROOT_PASSWORD
value: password
- name: MINIO_DOMAIN
value: minio.default.svc.cluster.local
---
apiVersion: v1
kind: Service
metadata:
name: minio
namespace: default
spec:
selector:
app: minio
ports:
- protocol: TCP
name: one
port: 9000
targetPort: 9000
- protocol: TCP
name: two
port: 9001
targetPort: 9001
-
Initialize the Iceberg table:
kubectl apply -f iceberg-spark.yaml
kubectl wait -l app=spark-iceberg --for=condition=Ready pod --timeout=5m
kubectl wait -l app=iceberg-rest --for=condition=Ready pod --timeout=5m
kubectl wait -l app=minio --for=condition=Ready pod --timeout=5m
sleep 30
# initialize the bucket
minio_pod_name=$(kubectl get pods -l app=minio -o=jsonpath='{.items[0].metadata.name}')
kubectl exec $minio_pod_name -- /usr/bin/mc config host add minio http://minio.default.svc.cluster.local:9000 admin password
kubectl exec $minio_pod_name -- /usr/bin/mc rm -r --force minio/warehouse || true
kubectl exec $minio_pod_name -- /usr/bin/mc mb minio/warehouse
kubectl exec $minio_pod_name -- /usr/bin/mc policy set public minio/warehouse
-
Create a JSON file like the following:
{
"name": "iceberg-sink",
"config": {
"connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
"topics": "kafka-iceberg-input",
"iceberg.tables": "sink.kafka",
"iceberg.catalog": "demo",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "http://iceberg-rest.default.svc.cluster.local:8181",
"iceberg.catalog.client.region": "us-east-1",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"iceberg.catalog.warehouse": "s3://warehouse",
"iceberg.catalog.s3.endpoint": "http://minio.default.svc.cluster.local:9000",
"iceberg.catalog.s3.path-style-access": "true",
"iceberg.catalog.s3.access-key-id": "admin",
"iceberg.catalog.s3.secret-access-key": "password",
"iceberg.tables.auto-create-enabled": "true",
"iceberg.tables.evolve-schema-enabled": "true",
"iceberg.control.commit.interval-ms": "1000",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"tasks.max": "1"
}
}
-
Run the following command to create the connector:
kcctl create -f <filename>.json
Limitations
- Each Iceberg sink connector must have its own control topic.
Configuration
The following Required properties are used to configure the connector.
Parameter | Description |
---|
topics | Comma-separated list of the Kafka topics you want to replicate. (You can define either the topics or the topics.regex setting, but not both.) |
topics.regex | Java regular expression of topics to replicate. (You can define either the topics or the topics.regex setting, but not both.) |
iceberg.control.topic | The name of the control topic. It cannot be used by other Iceberg connectors. |
iceberg.catalog.type | The type of Iceberg catalog. Allowed options are: REST , HIVE , HADOOP . |
iceberg.tables | Comma-separated list of Iceberg table names, which are specified using the format {namespace}.{table} . |
The following Advanced properties are used to configure the connector.
Parameter | Description |
---|
iceberg.control.commit.timeout-ms | Commit timeout interval in ms. The default is 30000 (30 sec). |
iceberg.tables.route-field | For multi-table fan-out, the name of the field used to route records to tables. Required when iceberg.tables.dynamic-enabled is set to true . |
iceberg.tables.cdc-field | Name of the field containing the CDC operation, I , U , or D , default is none |
For more information about the properties, see the official documentation.