Kafka Connect Debezium SQL Server Source connector
Property | Required | Default | Description |
---|---|---|---|
name | true | No default | Unique name for the connector. Attempting to register again with the same name will fail. (This property is required by all Kafka Connect connectors.) |
connector.class | true | No default | The name of the Java class for the connector. Always use a value of io.debezium.connector.sqlserver.SqlServerConnector for the SQL Server connector. |
tasks.max | true | 1 | Specifies the maximum number of tasks that the connector can use to capture data from the database instance. If the database.names list contains more than one element, you can increase the value of this property to a number less than or equal to the number of elements in the list. |
database.hostname | true | No default | IP address or hostname of the SQL Server database server. |
database.port | false | 1433 | Integer port number of the SQL Server database server. If both database.port and database.instance are specified, database.instance is ignored. See JDBC driver for SQL server documentation for more details. |
database.user | true | No default | Username to use when connecting to the SQL Server database server. Can be omitted when using Kerberos authentication, which can be configured using pass-through properties. |
database.password | true | No default | Password to use when connecting to the SQL Server database server. |
database.instance | false | No default | Specifies the instance name of the SQL Server named instance. If both database.port and database.instance are specified, database.instance is ignored. See JDBC driver for SQL server documentation for more details. |
database.names | true | No default | The comma-separated list of the SQL Server database names from which to stream the changes. |
topic.prefix | true | No default | Topic prefix that provides a namespace for the SQL Server database server that you want Debezium to capture. The prefix should be unique across all other connectors, since it is used as the prefix for all Kafka topic names that receive records from this connector. Only alphanumeric characters, hyphens, dots and underscores must be used in the database server logical name. Do not change the value of this property. If you change the name value, after a restart, instead of continuing to emit events to the original topics, the connector emits subsequent events to topics whose names are based on the new value. The connector is also unable to recover its database schema history topic. |
schema.include.list | false | No default | An optional, comma-separated list of regular expressions that match names of schemas for which you want to capture changes. Any schema name not included in schema.include.list is excluded from having its changes captured. By default, the connector captures changes for all non-system schemas. To match the name of a schema, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the schema; it does not match substrings that might be present in a schema name. If you include this property in the configuration, do not also set the schema.exclude.list property. |
schema.exclude.list | false | No default | An optional, comma-separated list of regular expressions that match names of schemas for which you do not want to capture changes. Any schema whose name is not included in schema.exclude.list has its changes captured, with the exception of system schemas. To match the name of a schema, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the schema; it does not match substrings that might be present in a schema name. If you include this property in the configuration, do not set the schema.include.list property. |
table.include.list | false | No default | An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables that you want Debezium to capture. By default, the connector captures all non-system tables for the designated schemas. When this property is set, the connector captures changes only from the specified tables. Each identifier is of the form schemaName.tableName. To match the name of a table, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the table; it does not match substrings that might be present in a table name. If you include this property in the configuration, do not also set the table.exclude.list property. |
table.exclude.list | false | No default | An optional comma-separated list of regular expressions that match fully-qualified table identifiers for the tables that you want to exclude from being captured. Debezium captures all tables that are not included in table.exclude.list. Each identifier is of the form schemaName.tableName. To match the name of a table, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the table; it does not match substrings that might be present in a table name. If you include this property in the configuration, do not also set the table.include.list property. |
column.include.list | false | empty string | An optional comma-separated list of regular expressions that match the fully-qualified names of columns that should be included in the change event message values. Fully-qualified names for columns are of the form schemaName.tableName.columnName. Each change event record that Debezium emits for a table includes an event key that contains fields for each column in the table’s primary key or unique key. To ensure that event keys are generated correctly, if you set this property, be sure to explicitly list the primary key columns of any captured tables. To match the name of a column, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the column; it does not match substrings that might be present in a column name. If you include this property in the configuration, do not also set the column.exclude.list property. |
column.exclude.list | false | empty string | An optional comma-separated list of regular expressions that match the fully-qualified names of columns that should be excluded from change event message values. Fully-qualified names for columns are of the form schemaName.tableName.columnName. Note that primary key columns are always included in the event’s key, also if excluded from the value. To match the name of a column, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the column; it does not match substrings that might be present in a column name. If you include this property in the configuration, do not also set the column.include.list property. |
skip.messages.without.change | false | false | Specifies whether to skip publishing messages when there is no change in included columns. This would essentially filter messages if there is no change in columns included as per column.include.list or column.exclude.list properties. |
column.mask.hash.hashAlgorithm.with.salt.salt | false | n/a | An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Fully-qualified names for columns are of the form <schemaName>.<tableName>.<columnName> .To match the name of a column Debezium applies the regular expression that you specify as an _anchored regular expression. That is, the specified expression is matched against the entire name string of the column; the expression does not match substrings that might be present in a column name. In the resulting change event record, the values for the specified columns are replaced with pseudonyms. A pseudonym consists of the hashed value that results from applying the specified hashAlgorithm and salt. Based on the hash function that is used, referential integrity is maintained, while column values are replaced with pseudonyms. Supported hash functions are described in the MessageDigest section of the Java Cryptography Architecture Standard Algorithm Name Documentation. In the following example, CzQMA0cB5K is a randomly selected salt. column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName If necessary, the pseudonym is automatically shortened to the length of the column. The connector configuration can include multiple properties that specify different hash algorithms and salts. Depending on the hashAlgorithm used, the salt selected, and the actual data set, the resulting data set might not be completely masked. Hashing strategy version 2 should be used to ensure fidelity if the value is being hashed in different places or systems. |
column.mask.hash.v2.hashAlgorithm.with.salt.salt | false | n/a | An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Fully-qualified names for columns are of the form <schemaName>.<tableName>.<columnName> .To match the name of a column Debezium applies the regular expression that you specify as an _anchored regular expression. That is, the specified expression is matched against the entire name string of the column; the expression does not match substrings that might be present in a column name. In the resulting change event record, the values for the specified columns are replaced with pseudonyms. A pseudonym consists of the hashed value that results from applying the specified hashAlgorithm and salt. Based on the hash function that is used, referential integrity is maintained, while column values are replaced with pseudonyms. Supported hash functions are described in the MessageDigest section of the Java Cryptography Architecture Standard Algorithm Name Documentation. In the following example, CzQMA0cB5K is a randomly selected salt. column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName If necessary, the pseudonym is automatically shortened to the length of the column. The connector configuration can include multiple properties that specify different hash algorithms and salts. Depending on the hashAlgorithm used, the salt selected, and the actual data set, the resulting data set might not be completely masked. Hashing strategy version 2 should be used to ensure fidelity if the value is being hashed in different places or systems. |
time.precision.mode | false | adaptive | Time, date, and timestamps can be represented with different kinds of precision, including: adaptive (the default) captures the time and timestamp values exactly as in the database using either millisecond, microsecond, or nanosecond precision values based on the database column’s type; or connect always represents time and timestamp values using Kafka Connect’s built-in representations for Time, Date, and Timestamp, which uses millisecond precision regardless of the database columns’ precision. For more information, see temporal values. |
decimal.handling.mode | false | precise | Specifies how the connector should handle values for DECIMAL and NUMERIC columns: precise (the default) represents them precisely using java.math.BigDecimal values represented in change events in a binary form. double represents them using double values, which may result in a loss of precision but is easier to use. string encodes values as formatted strings, which is easy to consume but semantic information about the real type is lost. |
include.schema.changes | false | true | Boolean value that specifies whether the connector publishes changes in the database schema to a Kafka topic with the same name as the topic prefix. The connector records each schema change with a key that contains the database name, and a value that is a JSON structure that describes the schema update. This mechanism for recording schema changes is independent of the connector’s internal recording of changes to the database schema history. |
tombstones.on.delete | false | true | Controls whether a delete event is followed by a tombstone event. true - a delete operation is represented by a delete event and a subsequent tombstone event. false - only a delete event is emitted. After a source record is deleted, emitting a tombstone event (the default behavior) allows Kafka to completely delete all events that pertain to the key of the deleted row in case log compaction is enabled for the topic. |
column.truncate.to.length.chars | false | n/a | An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Set this property if you want to truncate the data in a set of columns when it exceeds the number of characters specified by the length in the property name. Set length to a positive integer value, for example, column.truncate.to.20.chars. The fully-qualified name of a column observes the following format: <schemaName>.<tableName>.<columnName> . To match the name of a column, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the column; the expression does not match substrings that might be present in a column name.You can specify multiple properties with different lengths in a single configuration. |
column.mask.with.length.chars | false | n/a | An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Set this property if you want the connector to mask the values for a set of columns, for example, if they contain sensitive data. Set length to a positive integer to replace data in the specified columns with the number of asterisk (*) characters specified by the length in the property name. Set length to 0 (zero) to replace data in the specified columns with an empty string. The fully-qualified name of a column observes the following format: schemaName.tableName.columnName. To match the name of a column, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the column; the expression does not match substrings that might be present in a column name. You can specify multiple properties with different lengths in a single configuration. |
column.propagate.source.type | false | n/a | An optional, comma-separated list of regular expressions that match the fully-qualified names of columns for which you want the connector to emit extra parameters that represent column metadata. When this property is set, the connector adds the following fields to the schema of event records: __debezium.source.column.type __debezium.source.column.length __debezium.source.column.scale These parameters propagate a column’s original type name and length (for variable-width types), respectively. Enabling the connector to emit this extra data can assist in properly sizing specific numeric or character-based columns in sink databases. The fully-qualified name of a column observes the following format: schemaName.tableName.columnName. To match the name of a column, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the column; the expression does not match substrings that might be present in a column name. |
datatype.propagate.source.type | false | n/a | An optional, comma-separated list of regular expressions that specify the fully-qualified names of data types that are defined for columns in a database. When this property is set, for columns with matching data types, the connector emits event records that include the following extra fields in their schema: __debezium.source.column.type __debezium.source.column.length __debezium.source.column.scale These parameters propagate a column’s original type name and length (for variable-width types), respectively. Enabling the connector to emit this extra data can assist in properly sizing specific numeric or character-based columns in sink databases. The fully-qualified name of a column observes the following format: schemaName.tableName.typeName. To match the name of a data type, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the data type; the expression does not match substrings that might be present in a type name. For the list of SQL Server-specific data type names, see the SQL Server data type mappings. |
message.key.columns | false | n/a | A list of expressions that specify the columns that the connector uses to form custom message keys for change event records that it publishes to the Kafka topics for specified tables. By default, Debezium uses the primary key column of a table as the message key for records that it emits. In place of the default, or to specify a key for tables that lack a primary key, you can configure custom message keys based on one or more columns. To establish a custom message key for a table, list the table, followed by the columns to use as the message key. Each list entry takes the following format: <fully-qualified_tableName>:<keyColumn>,<keyColumn> To base a table key on multiple column names, insert commas between the column names. Each fully-qualified table name is a regular expression in the following format: <schemaName>.<tableName> The property can include entries for multiple tables. Use a semicolon to separate table entries in the list. The following example sets the message key for the tables inventory.customers and purchase.orders: inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4 For the table inventory.customer, the columns pk1 and pk2 are specified as the message key. For the purchaseorders tables in any schema, the columns pk3 and pk4 server as the message key. There is no limit to the number of columns that you use to create custom message keys. However, it’s best to use the minimum number that are required to specify a unique key. |
binary.handling.mode | false | bytes | Specifies how binary (binary, varbinary) columns should be represented in change events, including: bytes represents binary data as byte array (default), base64 represents binary data as base64-encoded String, base64-url-safe represents binary data as base64-url-safe-encoded String, hex represents binary data as hex-encoded (base16) String |
schema.name.adjustment.mode | false | none | Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings: none does not apply any adjustment. avro replaces the characters that cannot be used in the Avro type name with underscore. avro_unicode replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java |
field.name.adjustment.mode | false | none | Specifies how field names should be adjusted for compatibility with the message converter used by the connector. Possible settings: none does not apply any adjustment. avro replaces the characters that cannot be used in the Avro type name with underscore. avro_unicode replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java For more information, see Avro naming. |
converters | false | No default | Enumerates a comma-separated list of the symbolic names of the custom converter instances that the connector can use. For example, isbn You must set the converters property to enable the connector to use a custom converter. For each converter that you configure for a connector, you must also add a .type property, which specifies the fully-qualified name of the class that implements the converter interface. The .type property uses the following format: <converterSymbolicName>.type For example, isbn.type: io.debezium.test.IsbnConverter If you want to further control the behavior of a configured converter, you can add one or more configuration parameters to pass values to the converter. To associate any additional configuration parameter with a converter, prefix the parameter names with the symbolic name of the converter. For example, isbn.schema.name: io.debezium.sqlserver.type.Isbn |
snapshot.mode | false | initial | A mode for taking an initial snapshot of the structure and optionally data of captured tables. Once the snapshot is complete, the connector will continue reading change events from the database’s redo logs. The following values are supported: always Perform snapshot on each connector start. After the snapshot completes, the connector begins to stream event records for subsequent database changes. initial The connector performs a database snapshot as described in the default workflow for creating an initial snapshot. After the snapshot completes, the connector begins to stream event records for subsequent database changes. initial_only The connector performs a database snapshot and stops before streaming any change event records, not allowing any subsequent change events to be captured. schema_only Deprecated, see no_data. no_data The connector captures the structure of all relevant tables, performing all the steps described in the default snapshot workflow, except that it does not create READ events to represent the data set at the point of the connector’s start-up (Step 7.b). recovery Set this option to restore a database schema history topic that is lost or corrupted. After a restart, the connector runs a snapshot that rebuilds the topic from the source tables. You can also set the property to periodically prune a database schema history topic that experiences unexpected growth. Do not use this mode to perform a snapshot if schema changes were committed to the database after the last connector shutdown. when_needed After the connector starts, it performs a snapshot only if it detects one of the following circumstances: It cannot detect any topic offsets. A previously recorded offset specifies a log position that is not available on the server. configuration_based With this option, you control snapshot behavior through a set of connector properties that have the prefix ‘snapshot.mode.configuration.based’. custom The custom snapshot mode lets you inject your own implementation of the io.debezium.spi.snapshot.Snapshotter interface. Set the snapshot.mode.custom.name configuration property to the name provided by the name() method of your implementation. For more information, see custom snapshotter SPI. |
snapshot.mode.configuration.based.snapshot.data | false | false | If the snapshot.mode is set to configuration_based, set this property to specify whether the connector includes table data when it performs a snapshot. |
snapshot.mode.configuration.based.snapshot.schema | false | false | If the snapshot.mode is set to configuration_based, set this property to specify whether the connector includes the table schema when it performs a snapshot. |
snapshot.mode.configuration.based.start.stream | false | false | If the snapshot.mode is set to configuration_based, set this property to specify whether the connector begins to stream change events after a snapshot completes. |
snapshot.mode.configuration.based.snapshot.on.schema.error | false | false | If the snapshot.mode is set to configuration_based, set this property to specify whether the connector includes table schema in a snapshot if the schema history topic is not available. |
snapshot.mode.configuration.based.snapshot.on.data.error | false | false | If the snapshot.mode is set to configuration_based, this property specifies whether the connector attempts to snapshot table data if it does not find the last committed offset in the transaction log. Set the value to true to instruct the connector to perform a new snapshot. |
snapshot.mode.custom.name | false | No default | If snapshot.mode is set to custom, use this setting to specify the name of the custom implementation that is provided in the name() method that is defined in the ‘io.debezium.spi.snapshot.Snapshotter’ interface. After a connector restart, Debezium calls the specified custom implementation to determine whether to perform a snapshot. For more information, see custom snapshotter SPI. |
snapshot.locking.mode | false | exclusive | Controls whether and for how long the connector holds a table lock. Table locks prevent certain types of changes table operations from occurring while the connector performs a snapshot. You can set the following values: exclusive Controls how the connector holds locks on tables while performing the schema snapshot when snapshot.isolation.mode is REPEATABLE_READ or EXCLUSIVE. The connector will hold a table lock for exclusive table access for just the initial portion of the snapshot while the database schemas and other metadata are being read. The remaining work in a snapshot involves selecting all rows from each table, and this is done using a flashback query that requires no locks. However, in some cases it may be desirable to avoid locks entirely which can be done by specifying none. This mode is only safe to use if no schema changes are happening while the snapshot is taken. none Prevents the connector from acquiring any table locks during the snapshot. Use this setting only if no schema changes might occur during the creation of the snapshot. custom The connector performs a snapshot according to the implementation specified by the snapshot.locking.mode.custom.name property, which is a custom implementation of the io.debezium.spi.snapshot.SnapshotLock interface. |
snapshot.locking.mode.custom.name | false | No default | When snapshot.locking.mode is set as custom, use this setting to specify the name of the custom implementation provided in the name() method that is defined by the ‘io.debezium.spi.snapshot.SnapshotLock’ interface. For more information, see custom snapshotter SPI. |
snapshot.query.mode | false | select_all | Specifies how the connector queries data while performing a snapshot. Set one of the following options: select_all The connector performs a select all query by default, optionally adjusting the columns selected based on the column include and exclude list configurations. custom The connector performs a snapshot query according to the implementation specified by the snapshot.query.mode.custom.name property, which defines a custom implementation of the io.debezium.spi.snapshot.SnapshotQuery interface. This setting enables you to manage snapshot content in a more flexible manner compared to using the snapshot.select.statement.overrides property. |
snapshot.query.mode.custom.name | false | No default | When snapshot.query.mode is set to custom, use this setting to specify the name of the custom implementation provided in the name() method that is defined by the ‘io.debezium.spi.snapshot.SnapshotQuery’ interface. For more information, see custom snapshotter SPI. |
snapshot.include.collection.list | false | All tables specified in table.include.list | An optional, comma-separated list of regular expressions that match the fully-qualified names (<dbName>.<schemaName>.<tableName> ) of the tables to include in a snapshot. The specified items must be named in the connector’s table.include.list property. This property takes effect only if the connector’s snapshot.mode property is set to a value other than never.This property does not affect the behavior of incremental snapshots. To match the name of a table, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the table; it does not match substrings that might be present in a table name. |
snapshot.isolation.mode | false | repeatable_read | Mode to control which transaction isolation level is used and how long the connector locks tables that are designated for capture. The following values are supported: read_uncommitted read_committed repeatable_read snapshot exclusive (exclusive mode uses repeatable read isolation level, however, it takes the exclusive lock on all tables to be read). The snapshot, read_committed and read_uncommitted modes do not prevent other transactions from updating table rows during initial snapshot. The exclusive and repeatable_read modes do prevent concurrent updates. Mode choice also affects data consistency. Only exclusive and snapshot modes guarantee full consistency, that is, initial snapshot and streaming logs constitute a linear history. In case of repeatable_read and read_committed modes, it might happen that, for instance, a record added appears twice - once in initial snapshot and once in streaming phase. Nonetheless, that consistency level should do for data mirroring. For read_uncommitted there are no data consistency guarantees at all (some data might be lost or corrupted). |
event.processing.failure.handling.mode | false | fail | Specifies how the connector should react to exceptions during processing of events. fail will propagate the exception (indicating the offset of the problematic event), causing the connector to stop. warn will cause the problematic event to be skipped and the offset of the problematic event to be logged. skip will cause the problematic event to be skipped. |
poll.interval.ms | false | 500 (0.5 seconds) | Positive integer value that specifies the number of milliseconds that the connector waits before it checks the database for new change events. The value that you specify influences the behavior of heartbeat.interval.ms. The connector can emit heartbeat messages only during the specified polling cycle. To prevent this setting from delaying heartbeat emissions, set it to a value that is less than or equal to the value of heartbeat.interval.ms. |
max.queue.size | false | 8192 | Positive integer value that specifies the maximum number of records that the blocking queue can hold. When Debezium reads events streamed from the database, it places the events in the blocking queue before it writes them to Kafka. The blocking queue can provide backpressure for reading change events from the database in cases where the connector ingests messages faster than it can write them to Kafka, or when Kafka becomes unavailable. Events that are held in the queue are disregarded when the connector periodically records offsets. Always set the value of max.queue.size to be larger than the value of max.batch.size. |
max.queue.size.in.bytes | false | 0 | A long integer value that specifies the maximum volume of the blocking queue in bytes. By default, volume limits are not specified for the blocking queue. To specify the number of bytes that the queue can consume, set this property to a positive long value. If max.queue.size is also set, writing to the queue is blocked when the size of the queue reaches the limit specified by either property. For example, if you set max.queue.size=1000, and max.queue.size.in.bytes=5000, writing to the queue is blocked after the queue contains 1000 records, or after the volume of the records in the queue reaches 5000 bytes. |
max.batch.size | false | 2048 | Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector. |
heartbeat.interval.ms | false | 0 | Specifies an interval in milliseconds that determines how frequently the connector sends messages to a Kafka heartbeat topic, regardless of whether changes occur in the database. By default, the connector does not send heartbeat messages. Setting this property can help to confirm whether the connector is still receiving change events from the database. This can be especially important in databases where captured tables remain unchanged for long periods. When a database experiences frequent long intervals during which no changes occur in captured tables, although the connector continues to read from the transaction log as usual, it only rarely commits offset values to Kafka. As a result, after a connector restart, because the offset value is stale, the connector must send a high number of change events. By contrast, when you configure the connector to send regular heartbeat messages, it can update the offset in Kafka more frequently. Because the offset values in Kafka remain current, fewer change events must be re-sent after a connector restarts. Heartbeats are only emitted during polling cycles. That is, in a Debezium environment, the actual interval between sending heartbeat messages is jointly controlled by the settings of the heartbeat.interval.ms and poll.interval.ms properties. The actual frequency for sending heartbeat messages is based on the lower of the two values. To prevent delays in sending heartbeat messages, reducing their effectiveness, set this property to a value that is greater than or equal to the value of poll.interval.ms. For example, if you set poll.interval.ms to 100, set heartbeat.interval.ms to 5000. |
heartbeat.action.query | false | No default | Specifies a query that the connector executes on the source database when the connector sends a heartbeat message. This is useful for keeping offsets from becoming stale when capturing changes from a low-traffic database. Create a heartbeat table in the low-traffic database, and set this property to a statement that inserts records into that table, for example: INSERT INTO test_heartbeat_table (text) VALUES (‘test_heartbeat’) This allows the connector to receive changes from the low-traffic database and acknowledge their LSNs, which prevents offsets from become stale. |
snapshot.delay.ms | false | No default | An interval in milli-seconds that the connector should wait before taking a snapshot after starting up; Can be used to avoid snapshot interruptions when starting multiple connectors in a cluster, which may cause re-balancing of connectors. |
streaming.delay.ms | false | 0 | Specifies the time, in milliseconds, that the connector delays the start of the streaming process after it completes a snapshot. Setting a delay interval helps to prevent the connector from restarting snapshots in the event that a failure occurs immediately after the snapshot completes, but before the streaming process begins. Set a delay value that is higher than the value of the offset.flush.interval.ms property that is set for the Kafka Connect worker. |
snapshot.fetch.size | false | 2000 | Specifies the maximum number of rows that should be read in one go from each table while taking a snapshot. The connector will read the table contents in multiple batches of this size. Defaults to 2000. |
query.fetch.size | false | No default | Specifies the number of rows that will be fetched for each database round-trip of a given query. Defaults to the JDBC driver’s default fetch size. |
snapshot.lock.timeout.ms | false | 10000 | An integer value that specifies the maximum amount of time (in milliseconds) to wait to obtain table locks when performing a snapshot. If table locks cannot be acquired in this time interval, the snapshot will fail (also see snapshots). When set to 0 the connector will fail immediately when it cannot obtain the lock. Value -1 indicates infinite waiting. |
snapshot.select.statement.overrides | false | No default | Specifies the table rows to include in a snapshot. Use the property if you want a snapshot to include only a subset of the rows in a table. This property affects snapshots only. It does not apply to events that the connector reads from the log. The property contains a comma-separated list of fully-qualified table names in the form <schemaName>.<tableName> . For example,“snapshot.select.statement.overrides”: “inventory.products,customers.orders” For each table in the list, add a further configuration property that specifies the SELECT statement for the connector to run on the table when it takes a snapshot. The specified SELECT statement determines the subset of table rows to include in the snapshot. Use the following format to specify the name of this SELECT statement property: snapshot.select.statement.overrides. <schemaName>.<tableName> . For example, snapshot.select.statement.overrides.customers.orders.Example: From a customers.orders table that includes the soft-delete column, delete_flag, add the following properties if you want a snapshot to include only those records that are not soft-deleted: “snapshot.select.statement.overrides”: “customer.orders”, “snapshot.select.statement.overrides.customer.orders”: “SELECT * FROM customers.orders WHERE delete_flag = 0 ORDER BY id DESC” In the resulting snapshot, the connector includes only the records for which delete_flag = 0. |
source.struct.version | false | v2 | Schema version for the source block in CDC events; Debezium 0.10 introduced a few breaking changes to the structure of the source block in order to unify the exposed structure across all the connectors. By setting this option to v1 the structure used in earlier versions can be produced. Note that this setting is not recommended and is planned for removal in a future Debezium version. |
provide.transaction.metadata | false | false | When set to true Debezium generates events with transaction boundaries and enriches data events envelope with transaction metadata. |
retriable.restart.connector.wait.ms | false | 10000 (10 seconds) | The number of milli-seconds to wait before restarting a connector after a retriable error occurs. |
skipped.operations | false | t | A comma-separated list of the operation types that you want the connector to skip during streaming. You can configure the connector to skip the following types of operations: c (insert/create) u (update) d (delete) t (truncate) Set the value to none if you do not want the connector to skip any operations. Because the Debezium SQL Server connector does not support truncate change events, setting the default t value has the same effect as setting the value to none. |
signal.data.collection | false | No default value | Fully-qualified name of the data collection that is used to send signals to the connector. Use the following format to specify the collection name: <databaseName>.<schemaName>.<tableName> |
signal.enabled.channels | false | source | List of the signaling channel names that are enabled for the connector. By default, the following channels are available: source kafka file jmx Optionally, you can also implement a custom signaling channel. |
notification.enabled.channels | false | No default | List of notification channel names that are enabled for the connector. By default, the following channels are available: sink log jmx Optionally, you can also implement a custom notification channel. |
incremental.snapshot.allow.schema.changes | false | false | Allow schema changes during an incremental snapshot. When enabled the connector will detect schema change during an incremental snapshot and re-select a current chunk to avoid locking DDLs. Note that changes to a primary key are not supported and can cause incorrect results if performed during an incremental snapshot. Another limitation is that if a schema change affects only columns’ default values, then the change won’t be detected until the DDL is processed from the transaction log stream. This doesn’t affect the snapshot events’ values, but the schema of snapshot events may have outdated defaults. |
incremental.snapshot.chunk.size | false | 1024 | The maximum number of rows that the connector fetches and reads into memory during an incremental snapshot chunk. Increasing the chunk size provides greater efficiency, because the snapshot runs fewer snapshot queries of a greater size. However, larger chunk sizes also require more memory to buffer the snapshot data. Adjust the chunk size to a value that provides the best performance in your environment. |
incremental.snapshot.watermarking.strategy | false | insert_insert | Specifies the watermarking mechanism that the connector uses during an incremental snapshot to deduplicate events that might be captured by an incremental snapshot and then recaptured after streaming resumes. You can specify one of the following options: insert_insert When you send a signal to initiate an incremental snapshot, for every chunk that Debezium reads during the snapshot, it writes an entry to the signaling data collection to record the signal to open the snapshot window. After the snapshot completes, Debezium inserts a second entry that records the signal to close the window. insert_delete When you send a signal to initiate an incremental snapshot, for every chunk that Debezium reads, it writes a single entry to the signaling data collection to record the signal to open the snapshot window. After the snapshot completes, this entry is removed. No entry is created for the signal to close the snapshot window. Set this option to prevent rapid growth of the signaling data collection. |
max.iteration.transactions | false | 500 | Specifies the maximum number of transactions per iteration to be used to reduce the memory footprint when streaming changes from multiple tables in a database. When set to 0, the connector uses the current maximum LSN as the range to fetch changes from. When set to a value greater than zero, the connector uses the n-th LSN specified by this setting as the range to fetch changes from. Defaults to 500. |
incremental.snapshot.option.recompile | false | false | Uses OPTION(RECOMPILE) query option to all SELECT statements used during an incremental snapshot. This can help to solve parameter sniffing issues that may occur but can cause increased CPU load on the source database, depending on the frequency of query execution. |
topic.naming.strategy | false | io.debezium.schema.SchemaTopicNamingStrategy | The name of the TopicNamingStrategy class that should be used to determine the topic name for data change, schema change, transaction, heartbeat event etc., defaults to SchemaTopicNamingStrategy. |
topic.delimiter | false | . | Specify the delimiter for topic name, defaults to .. |
topic.cache.size | false | 10000 | The size used for holding the topic names in bounded concurrent hash map. This cache will help to determine the topic name corresponding to a given data collection. |
topic.heartbeat.prefix | false | __debezium-heartbeat | Controls the name of the topic to which the connector sends heartbeat messages. The topic name has this pattern: topic.heartbeat.prefix.topic.prefix For example, if the topic prefix is fulfillment, the default topic name is __debezium-heartbeat.fulfillment. |
topic.transaction | false | transaction | Controls the name of the topic to which the connector sends transaction metadata messages. The topic name has this pattern: topic.prefix.topic.transaction For example, if the topic prefix is fulfillment, the default topic name is fulfillment.transaction. For more information, see Transaction Metadata. |
snapshot.max.threads | false | 1 | Specifies the number of threads that the connector uses when performing an initial snapshot. To enable parallel initial snapshots, set the property to a value greater than 1. In a parallel initial snapshot, the connector processes multiple tables concurrently. When you enable parallel initial snapshots, the threads that perform each table snapshot can require varying times to complete their work. If a snapshot for one table requires significantly more time to complete than the snapshots for other tables, threads that have completed their work sit idle. In some environments, a network device such as a load balancer or firewall, terminates connections that remain idle for an extended interval. After the snapshot completes, the connector is unable to close the connection, resulting in an exception, and an incomplete snapshot, even in cases where the connector successfully transmitted all snapshot data. If you experience this problem, revert the value of snapshot.max.threads to 1, and retry the snapshot. |
custom.metric.tags | false | No default | Defines tags that customize MBean object names by adding metadata that provides contextual information. Specify a comma-separated list of key-value pairs. Each key represents a tag for the MBean object name, and the corresponding value represents a value for the key, for example, k1=v1,k2=v2 The connector appends the specified tags to the base MBean object name. Tags can help you to organize and categorize metrics data. You can define tags to identify particular application instances, environments, regions, versions, and so forth. For more information, see Customized MBean names. |
errors.max.retries | false | -1 | Specifies how the connector responds after an operation that results in a retriable error, such as a connection error. Set one of the following options: -1 No limit. The connector always restarts automatically, and retries the operation, regardless of the number of previous failures. 0 Disabled. The connector fails immediately, and never retries the operation. User intervention is required to restart the connector. > 0 The connector restarts automatically until it reaches the specified maximum number of retries. After the next failure, the connector stops, and user intervention is required to restart it. |
data.query.mode | false | function | Controls how the connector queries CDC data. The following modes are supported: function: The data is queried by calling cdc.[fn_cdc_get_all_changes_#] function. This is the default mode. direct: Makes the connector to query change tables directly. |
database.query.timeout.ms | false | 600000 (10 minutes) | Specifies the time, in milliseconds, that the connector waits for a query to complete. Set the value to 0 (zero) to remove the timeout limit. |
streaming.fetch.size | false | 0 | Specifies the maximum number of rows that should be read in one go from each table while streaming. The connector will read the table contents in multiple batches of this size. Defaults to 0 which means no limit. |