source
Debezium PostgreSQL Source
The Debezium source connector pulls messages from PostgreSQL and persists the messages to Pulsar topics.

Available on
StreamNative Cloud console

Authored by
ASF
Support type
StreamNative
License
Apache License 2.0

The Postgres source connector pulls messages from PostgreSQL and persists the messages to Pulsar topics by using debezium.

Quick start

Prerequisites

The prerequisites for connecting a Debezium Postgres source connector to external systems include:

  1. Create a Postgres service: This connector uses the debezium v1.9, Please refer to this document to see the compatible PostgreSQL versions.
  2. Prepare Postgres Database: Please refer to this document to complete the prepare steps on Postgres.

Note

The subsequent deployment steps detailed in this document leverage PostgreSQL 11.16 on AWS RDS, which natively supports the pgoutput plugin.

1. Create a table on Postgres

Run the following SQL command on your PostgreSQL. If you don't require the before data, you can disregard the configuration of REPLICA IDENTITY.

CREATE TABLE "public"."io-test" (
    "id" integer GENERATED ALWAYS AS IDENTITY,
    "first_name" text,
    "last_name" text,
    "age" integer,
    PRIMARY KEY ("id")
);
ALTER TABLE "public"."io-test" REPLICA IDENTITY FULL;

2. Create a connector

The following command shows how to use pulsarctl to create a builtin connector. If you want to create a non-builtin connector, you need to replace --source-type debezium-postgres with --archive /path/to/pulsar-io-debezium-postgres.nar. You can find the button to download the nar package at the beginning of the document.

For StreamNative Cloud User

If you are a StreamNative Cloud user, you need set up your environment first.

pulsarctl sources create \
  --source-type debezium-postgres \
  --name debezium-postgres-source \
  --tenant public \
  --namespace default \
  --parallelism 1 \
  --source-config \
  '{
    "database.hostname": "Your hostname of Postgres",
    "database.port": "Your port of Postgres",
    "database.user": "Your user of Postgres",
    "database.password": "Your password of Postgres",
    "database.dbname": "Your dbname of Postgres",
    "table.whitelist": "public.io-test",
    "database.server.name": "mydbserver",
    "plugin.name": "pgoutput" 
  }'

The --source-config is the minimum necessary configuration for starting this connector, and it is a JSON string. You need to substitute the relevant parameters with your own. If you want to configure more parameters, see Configuration Properties for reference.

You can set multiple tables for "table.whitelist", and the connector will send data from each table to a different topic of pulsar. The topic naming rule is: "{{database.server.name}}.{{table.name}}". For examples: "public/default/mydbserver.public.io-test".You can also choose to use a variety of other tools to create a connector:

Note

You can also choose to use a variety of other tools to create a connector:

3. Insert and update a data to table

You can insert and update using the sql:

INSERT INTO "public"."io-test" (first_name, last_name, age)
VALUES ('pg-io-test', 'streamnative', 4);

UPDATE "public"."io-test"
SET age = 5, last_name = 'sn'
WHERE first_name = 'pg-io-test' AND last_name = 'streamnative';

4. Show data using Pulsar client

Note

If your connector is created on StreamNative Cloud, you need to authenticate your clients. See Build applications using Pulsar clients for more information.

bin/pulsar-client \
--url "Your Pulsar serviceUrl" \
consume "public/default/mydbserver.public.io-test" -s "test-sub" -n 10 -p Earliest

----- got message -----
key:[eyJpZCI6Mn0=], properties:[], content:{"before":null,"after":{"id":1,"first_name":"pg-io-test","last_name":"streamnative","age":4},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"mydbserver","ts_ms":1698825100079,"snapshot":"false","db":"postgres","sequence":"[null,\"18052284768\"]","schema":"public","table":"io-test","txId":2245,"lsn":18052284768,"xmin":null},"op":"c","ts_ms":1698825103451,"transaction":null}
----- got message -----
key:[eyJpZCI6M30=], properties:[], content:{"before":{"id":1,"first_name":"pg-io-test","last_name":"streamnative","age":4},"after":{"id":1,"first_name":"pg-io-test","last_name":"sn","age":5},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"mydbserver","ts_ms":1698826703631,"snapshot":"false","db":"postgres","sequence":"[\"18387831504\",\"18387832144\"]","schema":"public","table":"io-test","txId":2284,"lsn":18387832144,"xmin":null},"op":"u","ts_ms":1698826704159,"transaction":null}

Configuration Properties

The configuration of Debezium source connector has the following properties.

NameRequiredSensitiveDefaultDescription
database.hostnametruefalsenullThe address of a database server.
database.porttruefalsenullThe port number of a database server.
database.usertruetruenullThe name of a database user that has the required privileges.
database.passwordtruetruenullThe password for a database user that has the required privileges.
database.dbnametruefalsenullThe database.dbname parameter in Debezium configuration is used to specify the name of the specific database that the connector should connect to.
plugin.nametruefalsenullThe plugin.name parameter in Debezium configuration is used to specify the logical decoding output plugin installed on the PostgreSQL server that the connector should use: decoderbufs, wal2json, pgoutput
database.server.nametruefalsenullThe logical name of a database server/cluster, which forms a namespace and it is used in all the names of Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used.
database.server.idfalsefalsenullThe connector’s identifier that must be unique within a database cluster and similar to the database’s server-id configuration property.
database.whitelistfalsefalsenullA list of all databases hosted by this server which is monitored by the connector.<br/><br/> This is optional, and there are other properties for listing databases and tables to include or exclude from monitoring.
key.converterfalsefalsenullThe converter provided by Kafka Connect to convert record key.
value.converterfalsefalsenullThe converter provided by Kafka Connect to convert record value.
database.historyfalsefalsenullThe name of the database history class.
database.history.pulsar.topicfalsefalsenullThe name of the database history topic where the connector writes and recovers DDL statements. <br/><br/>Note: this topic is for internal use only and should not be used by consumers.
database.history.pulsar.service.urlfalsefalsenullPulsar cluster service URL for history topic.
pulsar.service.urlfalsefalsenullPulsar cluster service URL.
offset.storage.topicfalsefalsenullRecord the last committed offsets that the connector successfully completes.

Advanced features

Converter options

  • org.apache.kafka.connect.json.JsonConverter

    Thejson-with-envelope config is valid only for the JsonConverter. By default, the value is set to false. When the json-with-envelope value is set to false, the consumer uses the schema Schema.KeyValue(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED), and the message only consists of the payload. When the json-with-envelope value is set to true, the consumer uses the schema Schema.KeyValue(Schema.BYTES, Schema.BYTES), and the message consists of the schema and the payload.

  • org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter

    If you select the AvroConverter, the consumer uses the schema Schema.KeyValue(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED), and the message consists of the payload.

Used topic on Pulsar

Currently, the destination topic (specified by the destination-topic-name option ) is a required configuration but it is not used for the Debezium connector to save data. The Debezium connector saves data on the following 4 types of topics:

  • One topic for storing the database metadata messages. It is named with the database server name ( database.server.name), like public/default/database.server.name.
  • One topic (offset.storage.topic) for storing the offset metadata messages. The connector saves the last successfully-committed offsets on this topic.
  • (Option) One topic (database.history.pulsar.topic) for storing the database history information. The connector writes and recovers DDL statements on this topic.
  • One per-table topic. The connector writes change events for all operations that occur in a table to a single Pulsar topic that is specific to that table. For examples: "public/default/mydbserver.public.io-test" If automatic topic creation is disabled on the Pulsar broker, you need to manually create these 4 types of topics and the destination topic.