The Aiven JDBC sink connector reads data from Kafka topics and writes data to any JDBC-compliant database.

Prerequisites

  • Valid credentials for the target database.
  • The connection.url for your database.

Quick Start

  1. Setup the kcctl client: doc
  2. Create a MySQL database in your Kubernetes cluster:
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
helm install mysql bitnami/mysql \
  --set auth.rootPassword=secretpassword \
  --set auth.database=test
kubectl wait -l app.kubernetes.io/instance=mysql --for=condition=Ready pod --timeout=5m
  1. Create a JSON file like the following:
{
    "name": "jdbc-sink",
    "config": {
        "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:mysql://mysql.default.svc.cluster.local:3306/test",
        "connection.user": "root",
        "connection.password": "secretpassword",
        "insert.mode": "insert",
        "table.name.format": "test_table",
        "topics": "kafka-jdbc-input",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "tasks.max": "1"
    }
}
  1. Run the following command to create the connector:
kcctl create -f <filename>.json

Configuration

Configure the Aiven JDBC sink connector with the following properties:
PropertyRequiredDefaultDescription
connection.urltrueJDBC connection URL.
connection.usertruenullJDBC connection user.
connection.passwordtruenullJDBC connection password.
db.timezonefalseUTCName of the JDBC timezone that should be used in the connector when querying with time-based criteria. Defaults to UTC.
dialect.namefalse""The name of the database dialect that should be used for this connector. By default this is empty, and the connector automatically determines the dialect based upon the JDBC connection URL. Use this if you want to override that behavior and use a specific dialect. All properly-packaged dialects in the JDBC connector plugin can be used.
sql.quote.identifiersfalsetrueWhether to delimit (in most databases, quote with double quotes) identifiers (e.g., table names and column names) in SQL statements.
insert.modetrueinsertThe insertion mode to use. Supported modes are: insert, multi, upsert, update.
batch.sizefalse3000Specifies how many records to attempt to batch together for insertion into the destination table, when possible.
delete.enabledfalsefalseEnable deletion of rows based on tombstone messages.
table.name.formatfalse${topic}A format string for the destination table name, which may contain ${topic} as a placeholder for the originating topic name.
table.name.normalizefalsefalseWhether or not to normalize destination table names for topics.
topics.to.tables.mappingfalsenullKafka topics to database tables mapping.
pk.modetruenoneThe primary key mode. Supported modes are: none, kafka, record_key, record_value.
pk.fieldsfalse""List of comma-separated primary key field names.
fields.whitelistfalse""List of comma-separated record value field names.
auto.createfalsefalseWhether to automatically create the destination table based on record schema if it is found to be missing.
auto.evolvefalsefalseWhether to automatically add columns in the table schema when found to be missing relative to the record schema.
max.retriesfalse10The maximum number of times to retry on errors before failing the task.
retry.backoff.msfalse3000The time in milliseconds to wait following an error before a retry attempt is made.
For full details, see the Aiven JDBC sink connector configs