Skip to main content
The Debezium MySQL Source connector is a Kafka Connect connector that captures row-level changes in a MySQL database and streams them to Kafka topics.

Prerequisites

  • A running MySQL server
  • Binlog enabled in MySQL server

Quick Start

  1. Setup the kcctl client: doc
  2. Create a JSON file like the following:
    {
        "name": "debezium-mysql-source",
        "config": {
            "connector.class": "io.debezium.connector.mysql.MySqlConnector",
            "tasks.max": "1",
            "database.hostname": "{host}",
            "database.port": "3306",
            "database.user": "{username}",
            "database.password": "{password}",
            "database.server.id": "1",
            "topic.prefix": "fullfillment",
            "database.include.list": "inventory",
            "database.history.kafka.topic": "dbhistory.inventory",
            "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
            "schema.history.internal.kafka.topic": "schemahistory.fullfillment"
        }
    }
    
  3. Run the following command to create the connector:
kcctl create -f <filename>.json

Configuration

The Debezium MySQL Source connector is configured using the following properties:
PropertyRequiredDefaultDescription
nametrueNo defaultUnique name for the connector. Attempting to register again with the same name will fail. (This property is required by all Kafka Connect connectors.)
connector.classtrueNo defaultThe name of the Java class for the connector. Always use a value of io.debezium.connector.mysql.MySqlConnector for the MySQL connector.
database.hostnametrueNo defaultThe address of the MySQL database server.
database.porttrue3306The port number of the MySQL database server.
database.usertrueNo defaultThe name of the MySQL database user to be used when connecting to the database.
database.passwordtrueNo defaultThe password to be used when connecting to the database.
database.server.idtrueNo defaultA numeric ID for the database server. This ID must be unique among all database servers in the same cluster.
topic.prefixtrueNo defaultA logical name for the database server. This name is used as a prefix for all Kafka topics that receive records from this connector.
schema.history.internal.kafka.topictrueNo defaultThe full name of the Kafka topic where the connector stores the database schema history.
schema.history.internal.kafka.bootstrap.serverstrueNo defaultA list of host/port pairs that the connector uses for establishing an initial connection to the Kafka cluster
field.name.adjustment.modefalseNo defaultSpecifies how field names should be adjusted for compatibility with the message converter used by the connector.
schema.name.adjustment.modefalseNo defaultSpecifies how the connector adjusts schema names for compatibility with the message converter used by the connector.
bigint.unsigned.handling.modefalselongSpecifies how the connector represents BIGINT UNSIGNED columns in change events.
binary.handling.modefalsebytesSpecifies how the connector represents values for binary columns, such as, blob, binary, varbinary, in change events.
column.exclude.listfalseempty stringAn optional, comma-separated list of regular expressions that match the fully-qualified names of columns to exclude from change event record values. Other columns in the source record are captured as usual. Fully-qualified names for columns are of the form databaseName.tableName.columnName.
column.include.listfalseempty stringAn optional, comma-separated list of regular expressions that match the fully-qualified names of columns to include in change event record values. Other columns are omitted from the event record. Fully-qualified names for columns are of the form databaseName.tableName.columnName.
column.mask.hash.v2.hashAlgorithm.with.salt.saltfalseNo defaultAn optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Fully-qualified names for columns are of the form [databaseName].[tableName].[columnName].
column.mask.with.length.charsfalseNo defaultAn optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns.
column.propagate.source.typefalseNo defaultAn optional, comma-separated list of regular expressions that match the fully-qualified names of columns for which you want the connector to emit extra parameters that represent column metadata.
column.truncate.to.length.charsfalseNo defaultAn optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Set this property if you want to truncate the data in a set of columns when it exceeds the number of characters specified by the length in the property name. Set length to a positive integer value, for example, column.truncate.to.20.chars.
connect.timeout.msfalse30A positive integer value that specifies the maximum time in milliseconds that the connector waits to establish a connection to the [connector-name] database server before the connection request times out.
database.exclude.listfalseempty stringAn optional comma-separated list of regular expressions that match database names to be excluded from monitoring.
database.include.listfalseempty stringAn optional comma-separated list of regular expressions that match database names to be monitored.
decimal.handling.modefalsepreciseSpecifies how the connector handles values for DECIMAL and NUMERIC columns in change events.
gtid.source.excludesfalseNo defaultA comma-separated list of regular expressions that match source domain IDs in the GTID set that the connector uses to find the binlog position on the [connector-name] server.
gtid.source.includesfalseNo defaultA comma-separated list of regular expressions that match source domain IDs in the GTID set used that the connector uses to find the binlog position on the [connector-name] server.
include.queryfalsefalseBoolean value that specifies whether the change event that the connector emits includes the SQL query that generated the change.
include.schema.changesfalsetrueBoolean value that specifies whether the connector publishes changes in the database schema to a Kafka topic with the same name as the topic prefix.
include.schema.commentsfalsefalseBoolean value that specifies whether the connector parses and publishes table and column comments on metadata objects.
inconsistent.schema.handling.modefalsefailSpecifies how the connector responds to binlog events that refer to tables that are not present in the internal schema representation. That is, the internal representation is not consistent with the database.
message.key.columnsfalseNo defaultA list of expressions that specify the columns that the connector uses to form custom message keys for change event records that it publishes to the Kafka topics for specified tables.
skip.messages.without.changefalsefalseSpecifies whether the connector emits messages for records when it does not detect a change in the included columns.
table.exclude.listfalseempty stringAn optional comma-separated list of regular expressions that match fully-qualified table names to be excluded from monitoring.
table.include.listfalseempty stringAn optional comma-separated list of regular expressions that match fully-qualified table names to be monitored.
time.precision.modefalseadaptive_time_microsecondsSpecifies the type of precision that the connector uses to represent time, date, and timestamps values.
tombstones.on.deletefalsetrueSpecifies whether a delete event is followed by a tombstone event. After a source record is deleted, the connector can emit a tombstone event (the default behavior) to enable Kafka to completely delete all events that pertain to the key of the deleted row in case [link-kafka-docs]/#compaction[log compaction] is enabled for the topic.
For more information about the configuration properties, see the Official Debezium MySQL Connector documentation.
I