source
Kafka Connect YugabyteDB CDC Source
The official YugabyteDB CDC Kafka Source Connector for capturing change data from YugabyteDB.

Available on
StreamNative Cloud console

Authored by
YugaByte
Support type
Community
License
Apache License 2.0

The YugabyteDB CDC Kafka Source connector is a Kafka Connect connector that reads change data from YugabyteDB and writes it to Kafka topics.

Prerequisites

  • Collect the following information about your YugabyteDB cluster:
    • Master addresses
    • DB Stream ID
    • DB User and password

Quick Start

  1. Setup the kcctl client: doc
  2. Setup a YugabyteDB cluster, you can create one in k8s cluster with below yaml file:
    helm repo add yugabytedb https://charts.yugabyte.com
    helm repo update
    
    helm upgrade --install yb-demo yugabytedb/yugabyte --version 2024.1.0
    kubectl wait -l app=yb-master --for=condition=Ready pod --timeout=10m
    kubectl wait -l app=yb-tserver --for=condition=Ready pod --timeout=10m
    
    # wait for the pod to be ready
    sleep 30
    
    kubectl exec yb-tserver-0 -- /home/yugabyte/bin/ysqlsh -c "CREATE TABLE IF NOT EXISTS test_kafka_connect(id int primary key, message text);"
    
    # create change stream
    stream_id=$(kubectl exec yb-master-0 -- yb-admin --master_addresses yb-masters.default.svc.cluster.local:7100 create_change_data_stream ysql.yugabyte)
    stream_id=$(echo $stream_id | awk '{print $4}')
    
  3. Create a JSON file like the following:
    {
        "name": "yugabyte-cdc-source",
        "config": {
            "connector.class": "io.debezium.connector.yugabytedb.YugabyteDBConnector",
            "kafka.topic": "kafka-yugabyte-output",
            "tasks.max": "1",
            "database.hostname": "yb-tservers.default.svc.cluster.local",
            "database.port": "5433",
            "database.master.addresses": "yb-masters.default.svc.cluster.local:7100",
            "database.dbname": "yugabyte",
            "table.include.list": "public.test_kafka_connect",
            "database.streamid": "${STREAM_ID}",
            "snapshot.mode": "never",
            "database.user": "yugabyte",
            "database.password": "yugabyte",
            "database.server.name": "dbserver1",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "key.converter.schemas.enable": "false",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable": "false"
        }
    }
    
  4. Run the following command to create the connector:
    kcctl create -f <filename>.json
    

Configuration

The YugabyteDB CDC Kafka Source connector is configured using the following Required properties:

PropertyDescription
database.master.addressesThe addresses of the YugabyteDB master nodes.
database.server.nameThe name of the YugabyteDB server.
database.dbnameThe name of the YugabyteDB database.
database.userThe user name for the YugabyteDB database.
database.passwordThe password for the YugabyteDB database.
database.streamidThe stream ID for the YugabyteDB database.
database.hostnameThe hostname for the YugabyteDB database.
database.portThe port for the YugabyteDB database.

For more information about the properties, see the offical YugabyteDB CDC Kafka Source Connector documentation.