The Debezium PostgreSQL Source connector is a Kafka Connect connector that captures row-level changes in a PostgreSQL database and streams them to Kafka topics.

Prerequisites

  • A running PostgreSQL server
  • PostgreSQL version 10 or later

Quick Start

  1. Setup the kcctl client: doc
  2. Create a JSON file like the following:
    {
        "name": "debezium-postgres-source",
        "config": {
            "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
            "tasks.max": "1",
            "database.hostname": "{host}",
            "database.port": "5432",
            "database.user": "{username}",
            "database.password": "{password}",
            "database.dbname" : "postgres",
            "topic.prefix": "fullfillment",
            "schema.include.list": "public",
            "plugin.name": "pgoutput"
        }
    }
    
  3. Run the following command to create the connector:
    kcctl create -f <filename>.json
    

Configuration

The Debezium PostgreSQL Source connector is configured using the following properties:
PropertyRequiredDefaultDescription
nametrueNo defaultUnique name for the connector. Attempting to register again with the same name will fail. This property is required by all Kafka Connect connectors.
connector.classtrueNo defaultThe name of the Java class for the connector. Always use a value of io.debezium.connector.postgresql.PostgresConnector for the PostgreSQL connector.
plugin.namefalsedecoderbufsThe name of the PostgreSQL logical decoding plug-in installed on the PostgreSQL server.

Supported values are decoderbufs, and pgoutput.
slot.namefalsedebeziumThe name of the PostgreSQL logical decoding slot that was created for streaming changes from a particular plug-in for a particular database/schema. The server uses this slot to stream events to the Debezium connector that you are configuring.

Slot names must conform to PostgreSQL replication slot naming rules, which state: “Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character.”
slot.drop.on.stopfalsefalseWhether or not to delete the logical replication slot when the connector stops in a graceful, expected way. The default behavior is that the replication slot remains configured for the connector when the connector stops. When the connector restarts, having the same replication slot enables the connector to start processing where it left off.

Set to true in only testing or development environments. Dropping the slot allows the database to discard WAL segments. When the connector restarts it performs a new snapshot or it can continue from a persistent offset in the Kafka Connect offsets topic.
slot.failoverfalsefalseSpecifies whether the connector creates a failover slot. If you omit this setting, or if the primary server runs PostgreSQL 16 or earlier, the connector does not create a failover slot.

PostgreSQL uses the synchronized_standby_slots parameter to configure replication slot synchronization between primary and standby servers. Set this parameter on the primary server to specify the physical replication slots that it synchronizes with on standby servers.
publication.namefalsedbz_publicationThe name of the PostgreSQL publication created for streaming changes when using pgoutput.

This publication is created at start-up if it does not already exist and it includes all tables. Debezium then applies its own include/exclude list filtering, if configured, to limit the publication to change events for the specific tables of interest. The connector user must have superuser permissions to create this publication, so it is usually preferable to create the publication before starting the connector for the first time.

If the publication already exists, either for all tables or configured with a subset of tables, Debezium uses the publication as it is defined.
database.hostnametrueNo defaultIP address or hostname of the PostgreSQL database server.
database.porttrue5432Integer port number of the PostgreSQL database server.
database.userfalseNo defaultName of the PostgreSQL database user for connecting to the PostgreSQL database server.
database.passwordfalseNo defaultPassword to use when connecting to the PostgreSQL database server.
database.dbnametrueNo defaultThe name of the PostgreSQL database from which to stream the changes.
topic.prefixtrueNo defaultTopic prefix that provides a namespace for the particular PostgreSQL database server or cluster in which Debezium is capturing changes. The prefix should be unique across all other connectors, since it is used as a topic name prefix for all Kafka topics that receive records from this connector. Only alphanumeric characters, hyphens, dots and underscores must be used in the database server logical name.



Do not change the value of this property. If you change the name value, after a restart, instead of continuing to emit events to the original topics, the connector emits subsequent events to topics whose names are based on the new value.
schema.include.listfalseNo defaultAn optional, comma-separated list of regular expressions that match names of schemas for which you want to capture changes. Any schema name not included in schema.include.list is excluded from having its changes captured. By default, all non-system schemas have their changes captured.


To match the name of a schema, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the schema; it does not match substrings that might be present in a schema name.
If you include this property in the configuration, do not also set the schema.exclude.list property.
schema.exclude.listfalseNo defaultAn optional, comma-separated list of regular expressions that match names of schemas for which you do not want to capture changes. Any schema whose name is not included in schema.exclude.list has its changes captured, with the exception of system schemas.


To match the name of a schema, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the schema; it does not match substrings that might be present in a schema name.
If you include this property in the configuration, do not set the schema.include.list property.
table.include.listfalseNo defaultAn optional, comma-separated list of regular expressions that match fully-qualified table identifiers for tables whose changes you want to capture. When this property is set, the connector captures changes only from the specified tables. Each identifier is of the form schemaName.tableName. By default, the connector captures changes in every non-system table in each schema whose changes are being captured.


To match the name of a table, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the table; it does not match substrings that might be present in a table name.
If you include this property in the configuration, do not also set the table.exclude.list property.
table.exclude.listfalseNo defaultAn optional, comma-separated list of regular expressions that match fully-qualified table identifiers for tables whose changes you do not want to capture. Each identifier is of the form schemaName.tableName. When this property is set, the connector captures changes from every table that you do not specify.


To match the name of a table, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the table; it does not match substrings that might be present in a table name.
If you include this property in the configuration, do not set the table.include.list property.
column.include.listfalseNo defaultAn optional, comma-separated list of regular expressions that match the fully-qualified names of columns that should be included in change event record values. Fully-qualified names for columns are of the form schemaName.tableName.columnName.


To match the name of a column, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the expression is used to match the entire name string of the column; it does not match substrings that might be present in a column name.
If you include this property in the configuration, do not also set the column.exclude.list property.
column.exclude.listfalseNo defaultAn optional, comma-separated list of regular expressions that match the fully-qualified names of columns that should be excluded from change event record values. Fully-qualified names for columns are of the form schemaName.tableName.columnName.


To match the name of a column, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the expression is used to match the entire name string of the column; it does not match substrings that might be present in a column name.
If you include this property in the configuration, do not set the column.include.list property.
skip.messages.without.changefalsefalseSpecifies whether to skip publishing messages when there is no change in included columns. This would essentially filter messages if there is no change in columns included as per column.include.list or column.exclude.list properties.

This property is applied only when the REPLICA IDENTITY of the table is set to FULL.
time.precision.modefalseadaptiveTime, date, and timestamps can be represented with different kinds of precision:

adaptive captures the time and timestamp values exactly as in the database using either millisecond, microsecond, or nanosecond precision values based on the database column’s type.

adaptive_time_microseconds captures the date, datetime and timestamp values exactly as in the database using either millisecond, microsecond, or nanosecond precision values based on the database column’s type. An exception is TIME type fields, which are always captured as microseconds.

connect always represents time and timestamp values by using Kafka Connect’s built-in representations for Time, Date, and Timestamp, which use millisecond precision regardless of the database columns’ precision. For more information, see temporal values.
decimal.handling.modefalsepreciseSpecifies how the connector should handle values for DECIMAL and NUMERIC columns:

precise represents values by using java.math.BigDecimal to represent values in binary form in change events.

double represents values by using double values, which might result in a loss of precision but which is easier to use.

string encodes values as formatted strings, which are easy to consume but semantic information about the real type is lost. For more information, see Decimal types.
hstore.handling.modefalsejsonSpecifies how the connector should handle values for hstore columns:

map represents values by using MAP.

json represents values by using json string. This setting encodes values as formatted strings such as {"key":"val"}. For more information, see PostgreSQL HSTORE type.
interval.handling.modefalsenumericSpecifies how the connector should handle values for interval columns:

numeric represents intervals using approximate number of microseconds.

string represents intervals exactly by using the string pattern representation P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S. For example: P1Y2M3DT4H5M6.78S. For more information, see PostgreSQL basic types.
database.sslmodefalsepreferWhether to use an encrypted connection to the PostgreSQL server. Options include:

disable uses an unencrypted connection.

allow attempts to use an unencrypted connection first and, failing that, a secure (encrypted) connection.

prefer attempts to use a secure (encrypted) connection first and, failing that, an unencrypted connection.

require uses a secure (encrypted) connection, and fails if one cannot be established.

verify-ca behaves like require but also verifies the server TLS certificate against the configured Certificate Authority (CA) certificates, or fails if no valid matching CA certificates are found.

verify-full behaves like verify-ca but also verifies that the server certificate matches the host to which the connector is trying to connect. For more information, see the PostgreSQL documentation.
database.sslcertfalseNo defaultThe path to the file that contains the SSL certificate for the client. For more information, see the PostgreSQL documentation.
database.sslkeyfalseNo defaultThe path to the file that contains the SSL private key of the client. For more information, see the PostgreSQL documentation.
database.sslpasswordfalseNo defaultThe password to access the client private key from the file specified by database.sslkey. For more information, see the PostgreSQL documentation.
database.sslrootcertfalseNo defaultThe path to the file that contains the root certificate(s) against which the server is validated. For more information, see the PostgreSQL documentation.
database.sslfactoryfalseNo defaultA name of the class that creates SSL Sockets. Use org.postgresql.ssl.NonValidatingFactory to disable SSL validation in development environments.
database.tcpKeepAlivefalsetrueEnable TCP keep-alive probe to verify that the database connection is still alive. For more information, see the PostgreSQL documentation.
tombstones.on.deletefalsetrueControls whether a delete event is followed by a tombstone event.

true - a delete operation is represented by a delete event and a subsequent tombstone event.

false - only a delete event is emitted.

After a source record is deleted, emitting a tombstone event (the default behavior) allows Kafka to completely delete all events that pertain to the key of the deleted row in case log compaction is enabled for the topic.
column.truncate.to.length.charsfalsen/aAn optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Set this property if you want to truncate the data in a set of columns when it exceeds the number of characters specified by the length in the property name. Set length to a positive integer value, for example, column.truncate.to.20.chars.

The fully-qualified name of a column observes the following format: <schemaName>.<tableName>.<columnName>. To match the name of a column, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the column; the expression does not match substrings that might be present in a column name.

You can specify multiple properties with different lengths in a single configuration.
column.mask.with.length.charsfalsen/aAn optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Set this property if you want the connector to mask the values for a set of columns, for example, if they contain sensitive data. Set length to a positive integer to replace data in the specified columns with the number of asterisk (*) characters specified by the length in the property name. Set length to 0 (zero) to replace data in the specified columns with an empty string.

The fully-qualified name of a column observes the following format: schemaName.tableName.columnName. To match the name of a column, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the column; the expression does not match substrings that might be present in a column name.

You can specify multiple properties with different lengths in a single configuration.
column.mask.hash.hashAlgorithm.with.salt.saltfalsen/aAn optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Fully-qualified names for columns are of the form <schemaName>.<tableName>.<columnName>.
To match the name of a column Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the column; the expression does not match substrings that might be present in a column name. In the resulting change event record, the values for the specified columns are replaced with pseudonyms.


A pseudonym consists of the hashed value that results from applying the specified hashAlgorithm and salt. Based on the hash function that is used, referential integrity is maintained, while column values are replaced with pseudonyms. Supported hash functions are described in the MessageDigest section of the Java Cryptography Architecture Standard Algorithm Name Documentation.

In the following example, CzQMA0cB5K is a randomly selected salt.


column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName

If necessary, the pseudonym is automatically shortened to the length of the column. The connector configuration can include multiple properties that specify different hash algorithms and salts.

Depending on the hashAlgorithm used, the salt selected, and the actual data set, the resulting data set might not be completely masked.

Hashing strategy version 2 should be used to ensure fidelity if the value is being hashed in different places or systems.
column.mask.hash.v2.hashAlgorithm.with.salt.saltfalsen/aAn optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Fully-qualified names for columns are of the form <schemaName>.<tableName>.<columnName>.
To match the name of a column Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the column; the expression does not match substrings that might be present in a column name. In the resulting change event record, the values for the specified columns are replaced with pseudonyms.


A pseudonym consists of the hashed value that results from applying the specified hashAlgorithm and salt. Based on the hash function that is used, referential integrity is maintained, while column values are replaced with pseudonyms. Supported hash functions are described in the MessageDigest section of the Java Cryptography Architecture Standard Algorithm Name Documentation.

In the following example, CzQMA0cB5K is a randomly selected salt.


column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName

If necessary, the pseudonym is automatically shortened to the length of the column. The connector configuration can include multiple properties that specify different hash algorithms and salts.

Depending on the hashAlgorithm used, the salt selected, and the actual data set, the resulting data set might not be completely masked.

Hashing strategy version 2 should be used to ensure fidelity if the value is being hashed in different places or systems.
column.propagate.source.typefalsen/aAn optional, comma-separated list of regular expressions that match the fully-qualified names of columns for which you want the connector to emit extra parameters that represent column metadata. When this property is set, the connector adds the following fields to the schema of event records:

__debezium.source.column.type


__debezium.source.column.length


__debezium.source.column.scale


These parameters propagate a column’s original type name and length (for variable-width types), respectively.
Enabling the connector to emit this extra data can assist in properly sizing specific numeric or character-based columns in sink databases.

The fully-qualified name of a column observes one of the following formats: databaseName.tableName.columnName, or databaseName.schemaName.tableName.columnName.
To match the name of a column, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the column; the expression does not match substrings that might be present in a column name.
datatype.propagate.source.typefalsen/aAn optional, comma-separated list of regular expressions that specify the fully-qualified names of data types that are defined for columns in a database. When this property is set, for columns with matching data types, the connector emits event records that include the following extra fields in their schema:

__debezium.source.column.type


__debezium.source.column.length


__debezium.source.column.scale


These parameters propagate a column’s original type name and length (for variable-width types), respectively.
Enabling the connector to emit this extra data can assist in properly sizing specific numeric or character-based columns in sink databases.

The fully-qualified name of a column observes one of the following formats: databaseName.tableName.typeName, or databaseName.schemaName.tableName.typeName.
To match the name of a data type, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the data type; the expression does not match substrings that might be present in a type name.

For the list of PostgreSQL-specific data type names, see the PostgreSQL data type mappings.
message.key.columnsfalseempty stringA list of expressions that specify the columns that the connector uses to form custom message keys for change event records that it publishes to the Kafka topics for specified tables.

By default, Debezium uses the primary key column of a table as the message key for records that it emits. In place of the default, or to specify a key for tables that lack a primary key, you can configure custom message keys based on one or more columns.

To establish a custom message key for a table, list the table, followed by the columns to use as the message key. Each list entry takes the following format:

<fully-qualified_tableName>:<keyColumn>,<keyColumn>

To base a table key on multiple column names, insert commas between the column names.

Each fully-qualified table name is a regular expression in the following format:

<schemaName>.<tableName>

The property can include entries for multiple tables. Use a semicolon to separate table entries in the list.

The following example sets the message key for the tables inventory.customers and purchase.orders:

inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4

In the example, the columns pk1 and pk2 are specified as the message key for the table inventory.customer. For the purchaseorders tables in any schema, the columns pk3 and pk4 serve as the message key.

There is no limit to the number of columns that you use to create custom message keys. However, it’s best to use the minimum number that are required to specify a unique key.

If the expressions that you specify for this property match columns that are not part of the table’s primary key, set the REPLICA IDENTITY of the table to FULL. If you set REPLICA IDENTITY to another value, such as DEFAULT, after delete operations, the connector fails to generate tombstone events with the expected null values.
publication.autocreate.modefalseall_tablesSpecifies whether and how the connector creates a publication. This setting applies only when the connector streams changes by using the pgoutput plug-in.

To create publications, the connector must access PostgreSQL through a database account that has specific permissions. For more information, see Setting privileges to enable Debezium to create PostgreSQL publications.

Specify one of the following values:

all_tables

If a publication exists, the connector uses it.
If a publication does not exist, the connector creates a publication for all tables in the database from which the connector captures changes. The connector runs the following SQL command to create a publication:

CREATE PUBLICATION <publication_name> FOR ALL TABLES;

disabled

The connector does not attempt to create a publication. A database administrator or the user configured to perform replications must have created the publication before running the connector. If the connector cannot find the publication, the connector throws an exception and stops.

filtered

If a publication does not exist, the connector creates one by running a SQL command in the following format:

CREATE PUBLICATION <publication_name> FOR TABLE <tbl1, tbl2, tbl3>
The resulting publication includes tables that match the current filter configuration, as specified by the schema.include.list, schema.exclude.list, table.include.list, and table.exclude.list connector configuration properties.
If the publication exists, the connector updates the publication for tables that match the current filter configuration by running a SQL command in the following format:

ALTER PUBLICATION <publication_name> SET TABLE <tbl1, tbl2, tbl3>.

no_tables

If a publication exists, the connector uses it. If a publication does not exist, the connector creates a publication without specifying any table by running a SQL command in the following format:

CREATE PUBLICATION <publication_name>;

Set the no_tables option if you want the connector to capture only logical decoding messages, and not capture any other change events, such as those caused by INSERT, UPDATE, and DELETE operations on any table.

If you select this option, to prevent the connector from emitting and processing READ events, you can specify names of schemas or tables for which you do not want to capture changes, for example, by using “table.exclude.list”: “public.*” or “schema.exclude.list”: “public”.
replica.identity.autoset.valuesfalseempty stringSet this property to apply specific replica identity settings to a subset of the tables that a connector captures, based on the table name. The replica identity values that the property sets overwrite the replica identity values that are set in the database.

The property accepts a comma-separated list of key-value pairs. Each key is a regular expression that matches fully-qualified table names; the corresponding value specifies a replica identity type. For example:

<fqTableNameA>:<replicaIdentity1>,<fqTableNameB>:<replicaIdentity2>,<fqTableNameC>:<replicaIdentity3>

Use the following format to specify the fully qualified table name:
SchemaName.TableName

Set the replica identity to one of the following values:

DEFAULT

Records the value, if one existed, that was set for the primary key column before the change event. This is the default setting for non-system tables.

INDEX indexName

Records the values that were set for all columns defined for a specified index before the change event. The index must be unique, not partial, not deferrable, and must include only columns marked NOT NULL. If the specified index is dropped, the resulting behavior is the same as if you set the value to NOTHING.

FULL

Records the values that were set for all columns in the row before the change event.

NOTHING

Records no information about the row state before the change event. This is the default value for system tables.

Example:
schema1.*:FULL,schema2.table2:NOTHING,schema2.table3:INDEX idx_name

The replica.identity.autoset.values property applies only to tables that the connector captures. Other tables are ignored, even if they match the specified expression. Use the following connector properties to designate the tables to capture:

table.include.list

table.exclude.list

schema.include.list

schema.exclude.list
binary.handling.modefalsebytesSpecifies how binary (bytea) columns should be represented in change events. Specify one of the following values:

bytes

Represents binary data as a byte array.

base64

Represents binary data as base64-encoded strings.

base64-url-safe

Represents binary data as base64-url-safe-encoded strings.

hex

Represents binary data as hex-encoded (base16) strings.
schema.name.adjustment.modefalsenoneSpecifies how schema names should be adjusted for compatibility with the message converter used by the connector. Set one of the following values:

none

Does not apply any adjustment.

avro

Replaces the characters that cannot be used in the Avro type name with underscore.

avro_unicode

Replaces the underscore or characters that cannot be used in the Avro type name with corresponding Unicode characters, such as uxxxx.

In the preceding example, the underscore character (
) represents an escape sequence, equivalent to a backslash in Java.
field.name.adjustment.modefalsenoneSpecifies how field names should be adjusted for compatibility with the message converter used by the connector. Specify one of the following values:

none

Do not apply any adjustment.

avro

Replace characters that cannot be used in Avro type names with underscores.

avro_unicode

Replace the underscore or characters that cannot be used in Avro type names with the corresponding Unicode characters, such as uxxxx.

In the preceding example, the underscore character (
) represents an escape sequence, equivalent to a backslash in Java.

For more information, see Avro naming.
money.fraction.digitsfalse2Specifies how many decimal digits should be used when converting Postgres money type to java.math.BigDecimal, which represents the values in change events. Applicable only when decimal.handling.mode is set to precise.
message.prefix.include.listfalseNo defaultAn optional, comma-separated list of regular expressions that match the names of the logical decoding message prefixes that you want the connector to capture. By default, the connector captures all logical decoding messages. When this property is set, the connector captures only logical decoding message with the prefixes specified by the property. All other logical decoding messages are excluded.

To match the name of a message prefix, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire message prefix string; the expression does not match substrings that might be present in a prefix.

If you include this property in the configuration, do not also set the message.prefix.exclude.list property.

For information about the structure of message events and about their ordering semantics, see message events.
message.prefix.exclude.listfalseNo defaultAn optional, comma-separated list of regular expressions that match the names of the logical decoding message prefixes that you do not want the connector to capture. When this property is set, the connector does not capture logical decoding messages that use the specified prefixes. All other messages are captured.
To exclude all logical decoding messages, set the value of this property to .*.

To match the name of a message prefix, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire message prefix string; the expression does not match substrings that might be present in a prefix.

If you include this property in the configuration, do not also set message.prefix.include.list property.


For information about the structure of message events and about their ordering semantics, see message events.
For more information about the configuration properties, see the Official Debezium PostgreSQL Connector documentation.