Skip to main content
The Debezium JDBC connector is a Kafka Connect sink connector implementation that can consume events from multiple source topics, and then write those events to a relational database by using a JDBC driver. This connector supports a wide variety of database dialects, including Db2, MySQL, Oracle, PostgreSQL, and SQL Server.

Quick Start

  1. Setup the kcctl client: doc
  2. Create a JSON file like the following:
    {
        "name": "debezium-jdbc-sink",
        "config": {
            "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
            "tasks.max": "1",
            "connection.url": "jdbc:postgresql://hostname:port/db",
            "connection.user": "user",
            "connection.password": "password",
            "insert.mode": "upsert",
            "delete.enabled": "true",
            "primary.key.mode": "record_key",
            "schema.evolution": "basic",
            "use.time.zone": "UTC",
            "topics": "orders"
        }
    }
    
  3. Run the following command to create the connector:
    kcctl create -f <filename>.json
    

Configuration

The Debezium JDBC Sink connector is configured using the following properties:
PropertyRequiredDefaultDescription
nametrueNo defaultUnique name for the connector. A failure results if you attempt to reuse this name when registering a connector. This property is required by all Kafka Connect connectors.
connector.classtrueNo defaultThe name of the Java class for the connector. For the Debezium JDBC connector, specify the value io.debezium.connector.jdbc.JdbcSinkConnector.
tasks.maxtrue1Maximum number of tasks to use for this connector.
topicsfalseNo defaultList of topics to consume, separated by commas. Do not use this property in combination with the topics.regex property.
topics.regexfalseNo defaultA regular expression that specifies the topics to consume. Internally, the regular expression is compiled to a java.util.regex.Pattern. Do not use this property in combination with the topics property.
connection.providerfalseorg.hibernate.c3p0.internal.C3P0ConnectionProviderThe connection provider implementation to use.
connection.urltrueNo defaultThe JDBC connection URL used to connect to the database.
connection.usernametrueNo defaultThe name of the database user account that the connector uses to connect to the database.
connection.passwordtrueNo defaultThe password that the connector uses to connect to the database.
connection.pool.min_sizefalse5Specifies the minimum number of connections in the pool.
connection.pool.max_sizefalse32Specifies the maximum number of concurrent connections that the pool maintains.
connection.pool.acquire_incrementfalse32Specifies the number of connections that the connector attempts to acquire if the connection pool exceeds its maximum size.
connection.pool.timeoutfalse1800Specifies the number of seconds that an unused connection is kept before it is discarded.
connection.restart.on.errorsfalsefalseSpecifies whether the connector retries after a transient JDBC connection error.

When enabled (true), the connector treats connection issues (such as socket closures or timeouts) as retriable, allowing it to retry processing instead of failing the task. This reduces downtime and improves resilience against temporary disruptions.

Setting this option to true can reduce downtime. However, in master-replica environments with asynchronous replication, it may lead to data loss if retries occur before all changes are fully replicated.

Use with caution where strong data consistency is required.
use.time.zonefalseUTCSpecifies the timezone used when inserting JDBC temporal values.
delete.enabledfalsefalseSpecifies whether the connector processes DELETE or tombstone events and removes the corresponding row from the database. Use of this option requires that you set the primary.key.mode to record.key.
truncate.enabledfalsefalseSpecifies whether the connector processes TRUNCATE events and truncates the corresponding tables from the database.

Although support for TRUNCATE statements has been available in Db2 since version 9.7, currently, the JDBC connector is unable to process standard TRUNCATE events that the Db2 connector emits.

To ensure that the JDBC connector can process TRUNCATE events received from Db2, perform the truncation by using an alternative to the standard TRUNCATE TABLE statement. For example:

ALTER TABLE [table_name] ACTIVATE NOT LOGGED INITIALLY WITH EMPTY TABLE

The user account that submits the preceding query requires ALTER privileges on the table to be truncated.
insert.modefalseinsertSpecifies the strategy used to insert events into the database. The following options are available:

insert

Specifies that all events should construct INSERT-based SQL statements. Use this option only when no primary key is used, or when you can be certain that no updates can occur to rows with existing primary key values.

update

Specifies that all events should construct UPDATE-based SQL statements. Use this option only when you can be certain that the connector receives only events that apply to existing rows.

upsert

Specifies that the connector adds events to the table using upsert semantics. That is, if the primary key does not exist, the connector performs an INSERT operation, and if the key does exist, the connector performs an UPDATE operation. When idempotent writes are required, the connector should be configured to use this option.
primary.key.modefalsenoneSpecifies how the connector resolves the primary key columns from the event.

none

Specifies that no primary key columns are created.

kafka

Specifies that the connector uses Kafka coordinates as the primary key columns. The key coordinates are defined from the topic name, partition, and offset of the event, and are mapped to columns with the following names:

__connect_topic

__connect_partition

__connect_offset

record_key

Specifies that the primary key columns are sourced from the event’s record key. If the record key is a primitive type, the primary.key.fields property is required to specify the name of the primary key column. If the record key is a struct type, the primary.key.fields property is optional, and can be used to specify a subset of columns from the event’s key as the table’s primary key.

record_value

Specifies that the primary key columns is sourced from the event’s value. You can set the primary.key.fields property to define the primary key as a subset of fields from the event’s value; otherwise all fields are used by default.
primary.key.fieldsfalseNo defaultEither the name of the primary key column or a comma-separated list of fields to derive the primary key from.

When primary.key.mode is set to record_key and the event’s key is a primitive type, it is expected that this property specifies the column name to be used for the key.

When the primary.key.mode is set to record_key with a non-primitive key, or record_value, it is expected that this property specifies a comma-separated list of field names from either the key or value. If the primary.key.mode is set to record_key with a non-primitive key, or record_value, and this property is not specified, the connector derives the primary key from all fields of either the record key or record value, depending on the specified mode.
quote.identifiersfalsefalseSpecifies whether generated SQL statements use quotation marks to delimit table and column names. See the JDBC quoting case-sensitivity section for more details.
schema.evolutionfalsenoneSpecifies how the connector evolves the destination table schemas. For more information, see Schema evolution. The following options are available:

none

Specifies that the connector does not evolve the destination schema.

basic

Specifies that basic evolution occurs. The connector adds missing columns to the table by comparing the incoming event’s record schema to the database table structure.
collection.name.formatfalse${topic}Specifies a string pattern that the connector uses to construct the names of destination tables.
When the property is set to its default value, ${topic}, after the connector reads an event from Kafka, it writes the event record to a destination table with a name that matches the name of the source topic.

You can also configure this property to extract values from specific fields in incoming event records and then use those values to dynamically generate the names of target tables. This ability to generate table names from values in the message source would otherwise require the use of a custom Kafka Connect single message transformation (SMT).

To configure the property to dynamically generate the names of destination tables, set its value to a pattern such as ${source._field_}. When you specify this type of pattern, the connector extracts values from the source block of the Debezium change event, and then uses those values to construct the table name. For example, you might set the value of the property to the pattern ${source.schema}_${source.table}. Based on this pattern, if the connector reads an event in which the schema field in the source block contains the value, user, and the table field contains the value, tab, the connector writes the event record to a table with the name user_tab.
dialect.postgres.postgis.schemafalsepublicSpecifies the schema name where the PostgreSQL PostGIS extension is installed. The default is public; however, if the PostGIS extension was installed in another schema, this property should be used to specify the alternate schema name.
dialect.sqlserver.identity.insertfalsefalseSpecifies whether the connector automatically sets an IDENTITY_INSERT before an INSERT or UPSERT operation into the identity column of SQL Server tables, and then unsets it immediately after the operation. When the default setting (false) is in effect, an INSERT or UPSERT operation into the IDENTITY column of a table results in a SQL exception.
batch.sizefalse500Specifies how many records to attempt to batch together into the destination table.

Note that if you set consumer.max.poll.records in the Connect worker properties to a value lower than batch.size, batch processing will be caped by consumer.max.poll.records and the desired batch.size won’t be reached. You can also configure the connector’s underlying consumer’s max.poll.records using consumer.override.max.poll.records in the connector configuration.
use.reduction.bufferfalsefalseSpecifies whether to enable the Debezium JDBC connector’s reduction buffer.

Choose one of the following settings:

false

(default) The connector writes each change event that it consumes from Kafka as a separate logical SQL change.

true

The connector uses the reduction buffer to reduce change events before it writes them to the sink database. That is, if multiple events refer to the same primary key, the connector consolidates the SQL queries and writes only a single logical SQL change, based on the row state that is reported in the most recent offset record.
Choose this option to reduce the SQL load on the target database.

To optimize query processing in a PostgreSQL sink database when the reduction buffer is enabled, you must also enable the database to execute the batched queries by adding the reWriteBatchedInserts parameter to the JDBC connection URL.
field.include.listfalseempty stringAn optional, comma-separated list of field names that match the fully-qualified names of fields to include from the change event value. Fully-qualified names for fields are of the form fieldName or topicName:fieldName.

If you include this property in the configuration, do not set the field.exclude.list property.
field.exclude.listfalseempty stringAn optional, comma-separated list of field names that match the fully-qualified names of fields to exclude from the change event value. Fully-qualified names for fields are of the form fieldName or topicName:fieldName.

If you include this property in the configuration, do not set the field.include.list property.
flush.max.retriesfalse5Specifies the maximum number of retries that the connector performs after an attempt to flush changes to the target database results in certain database errors. If the number of retries exceeds the retry value, the sink connector enters a FAILED state.
flush.retry.delay.msfalse1000Specifies the number of milliseconds that the connector waits to retry a flush operation that failed.

When you set both the flush.retry.delay.ms and flush.max.retries properties, it can affect the behavior of the Kafka max.poll.interval.ms property. To prevent the connector from rebalancing, set the total retry time (flush.retry.delay.ms * flush.max.retries) to a value that is less than the value of max.poll.interval.ms (default is 5 minutes).
column.naming.strategyfalseio.debezium.connector.jdbc.naming.DefaultColumnNamingStrategySpecifies the fully-qualified class name of a ColumnNamingStrategy implementation that the connector uses to resolve column names from incoming event field names.
collection.naming.strategyfalseio.debezium.connector.jdbc.nnaming.DefaultCollectionNamingStrategySpecifies the fully-qualified class name of a CollectionNamingStrategy implementation that the connector uses to resolve table names from incoming event topic names.
For more information about the configuration properties, see the Official Debezium JDBC Sink Connector documentation.
I