source
Debezium Microsoft SQL Server Source
The Debezium Microsoft SQL Server source connector pulls messages from SQL Server and persists the messages to Pulsar topics

Available on
StreamNative Cloud console

Authored by
ASF
Support type
Apache community
License
Apache License 2.0

The Debezium Microsoft SQL Server source connector pulls messages from the SQL Server and persists the messages to Pulsar topics.

Configuration

Debezium Microsoft SQL source connector

The configuration of Microsoft SQL Server Debezium source connector has the following properties.

NameRequiredDefaultDescription
task.classtruenullA source task class that is implemented in Debezium.
database.hostnametruenullThe address of a database server.
database.porttruenullThe port number of a database server.
database.usertruenullThe name of a database user that has the required privileges.
database.passwordtruenullThe password for a database user that has the required privileges.
database.dbnametruenullThe name of the SQL server database.
database.server.nametruenullThe logical name of a database server/cluster, which forms a namespace and it is used in all the names of Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used.
snapshot.modetruenullSpecifies the criteria for running a snapshot when the connector starts. <br/><br/>Supported values: initial, initial_only, schema_only.
key.converterfalseorg.apache.kafka.connect.json.JsonConverterThe converter provided by Kafka Connect to convert the record key.
value.converterfalseorg.apache.kafka.connect.json.JsonConverterThe converter provided by Kafka Connect to convert the record value.
database.historyfalseorg.apache.pulsar.io.debezium.PulsarDatabaseHistoryThe name of the database history class.
database.history.pulsar.topicfalsedebezium-history-topicThe name of the database history topic where the connector writes and recovers DDL statements. <br/><br/>Note: this topic is for internal use only and should not be used by consumers.
database.history.pulsar.service.urlfalsenullThe service URL of your Pulsar cluster for the history topic. If it is not set, the database history Pulsar client will use the same client settings as those of the connector, such as client_auth_plugin and client_auth_params.
offset.storage.topicfalsedebezium-offset-topicRecord the last committed offsets that the connector successfully completes.
table.include.listfalsenullThe optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be monitored.
table.exlude.listfalsenullThe optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be excluded.
column.include.listfalsenullThe optional comma-separated list of regular expressions that match fully-qualified names of columns that should be monitored.
column.exclude.listfalsenullThe optional comma-separated list of regular expressions that match the fully-qualified names of columns that should be excluded.

Converter options

  • org.apache.kafka.connect.json.JsonConverter

    Thejson-with-envelope configuration is valid only for the JsonConverter. By default, the value is set to false. When the json-with-envelope value is set to false, the consumer uses the schema Schema.KeyValue(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED), and the message only consists of the payload. When the json-with-envelope value is set to true, the consumer uses the Schema.KeyValue(Schema.BYTES, Schema.BYTES) schema, and the message consists of the schema and the payload.

  • org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter

    If you select the AvroConverter, the consumer uses the Schema.KeyValue(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED) schema, and the message consists of the payload.

Examples

Debezium requires SQL Server with CDC enabled. This setup is outlined in the documentation and used in the integration test. For more information, see Enable and disable change data capture in Microsoft SQL Server.

You can use one of the following methods to create a configuration file.

  • JSON
{
  "database.hostname": "localhost",
  "database.port": "1433",
  "database.user": "sa",
  "database.password": "MyP@ssw0rd!",
  "database.dbname": "MyTestDB",
  "database.server.name": "mssql",
  "snapshot.mode": "schema_only",
  "topic.namespace": "public/default",
  "task.class": "io.debezium.connector.sqlserver.SqlServerConnectorTask",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "typeClassName": "org.apache.pulsar.common.schema.KeyValue",
  "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory",
  "database.tcpKeepAlive": "true",
  "decimal.handling.mode": "double",
  "database.history.pulsar.topic": "debezium-mssql-source-history-topic",
  "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650"
}
  • YAML
tenant: "public"
namespace: "default"
name: "debezium-mssql-source"
inputs: [ "debezium-mssql-topic" ]
parallelism: 1

className: "org.apache.pulsar.io.debezium.mssql.DebeziumMsSqlSource"
database.dbname: "mssql"

configs:
    database.hostname: "localhost"
    database.port: "1433"
    database.user: "sa"
    database.password: "MyP@ssw0rd!"
    database.dbname: "MyTestDB"
    database.server.name: "mssql"
    snapshot.mode: "schema_only"
    topic.namespace: "public/default"
    task.class: "io.debezium.connector.sqlserver.SqlServerConnectorTask"
    value.converter: "org.apache.kafka.connect.json.JsonConverter"
    key.converter: "org.apache.kafka.connect.json.JsonConverter"
    typeClassName: "org.apache.pulsar.common.schema.KeyValue"
    database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
    database.tcpKeepAlive: "true"
    decimal.handling.mode: "double"
    database.history.pulsar.topic: "debezium-mssql-source-history-topic"
    database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"

For the full list of configuration properties supported by Debezium, see Debezium Connector for MS SQL.

Performance

The Debezium Microsoft SQL Server source connector supports a maximum publish throughput of 17.000 msg/s or 50 Mbit/s.