The Debezium Microsoft SQL Server source connector pulls messages from SQL Server and persists the messages to Pulsar topics
offset.storage.topic
and database.history.pulsar.topic
. Refer to the Used Topic On Pulsar section for more details.builtin
connector. If you want to create a non-builtin
connector,
you need to replace --source-type debezium-mssql
with --archive /path/to/pulsar-io-debezium-mssql.nar
. You can find the button to download the nar
package at the beginning of the document.
--parallelism
must be set to 1. Debezium connectors do not support parallel consumption within a single instance. If you need to process tables in parallel, you can deploy multiple connector instances, each configured for different database schemas or tables.{{database.server.name}}.{{table.name}}
. For examples: public/default/mydbserver.public.io-test
.--source-config
is the minimum necessary configuration for starting this connector, and it is a JSON string. You need to substitute the relevant parameters with your own.
If you want to configure more parameters, see Configuration Properties for reference.
pulsar-admin
are similar to those of pulsarctl
. You can find an example for StreamNative Cloud Doc.Name | Required | Sensitive | Default | Description |
---|---|---|---|---|
database.hostname | true | false | null | The address of a database server. |
database.port | true | false | null | The port number of a database server. |
database.user | true | true | null | The name of a database user that has the required privileges. |
database.password | true | true | null | The password for a database user that has the required privileges. |
database.dbname | true | false | null | The database.dbname parameter in Debezium configuration is used to specify the name of the specific database that the connector should connect to. |
database.server.name | true | false | null | The logical name of a database server/cluster, which forms a namespace and it is used in all the names of Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used. |
database.server.id | false | false | null | The connector’s identifier that must be unique within a database cluster and similar to the database’s server-id configuration property. |
database.whitelist | false | false | null | A list of all databases hosted by this server which is monitored by the connector. This is optional, and there are other properties for listing databases and tables to include or exclude from monitoring. |
table.whitelist | false | false | null | A list of all tables hosted by this server which is monitored by the connector. This is optional, and there are other properties for listing tables and tables to include or exclude from monitoring. |
key.converter | false | false | null | The converter provided by Kafka Connect to convert record key. |
value.converter | false | false | null | The converter provided by Kafka Connect to convert record value. |
database.history | false | false | null | The name of the database history class. |
database.history.pulsar.topic | false | false | null | The name of the database history topic where the connector writes and recovers DDL statements. Note: this topic is for internal use only and should not be used by consumers. |
database.history.pulsar.service.url | false | false | null | Pulsar cluster service URL for history topic. |
pulsar.service.url | false | false | null | Pulsar cluster service URL. |
offset.storage.topic | false | false | null | Record the last committed offsets that the connector successfully completes. |
json-with-envelope
config is valid only for the JsonConverter. By default, the value is set to false. When the json-with-envelope
value is set to false, the consumer uses the schema Schema.KeyValue(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED)
, and the message only consists of the payload.
When the json-with-envelope
value is set to true, the consumer uses the schema Schema.KeyValue(Schema.BYTES, Schema.BYTES)
, and the message consists of the schema and the payload.
Schema.KeyValue(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED)
, and the message consists of the payload.
destination-topic-name
option ) is a required configuration but it is not used for the Debezium connector to save data. The Debezium connector saves data on the following 4 types of topics:
database.server.name
), like public/default/database.server.name
.offset.storage.topic
) for storing the offset metadata messages. The connector saves the last successfully-committed offsets on this topic.database.history.pulsar.topic
) for storing the database history information. The connector writes and recovers DDL statements on this topic.{{database.server.name}}.{{table.name}}
. For examples: public/default/mydbserver.public.io-test
.offset.storage.topic
and database.history.pulsar.topic
, If they are not specified in your connector’s configuration, they will be created automatically using the following default naming convention:
database.history.pulsar.topic
: "{tenant}/{namespace}/{connector-name}-debezium-history-topic"
offset.storage.topic
: "{tenant}/{namespace}/{connector-name}-offset-storage-topic"