The Debezium MongoDB source connector pulls messages from MongoDB and persists the messages to Pulsar topics
builtin
connector. If you want to create a non-builtin
connector,
you need to replace --source-type debezium-mongodb
with --archive /path/to/pulsar-io-debezium-mongodb.nar
. You can find the button to download the nar
package at the beginning of the document.
--source-config
is the minimum necessary configuration for starting this connector, and it is a JSON string. You need to substitute the relevant parameters with your own.
If you want to configure more parameters, see Configuration Properties for reference.
pulsar-admin
are similar to those of pulsarctl
. You can find an example for StreamNative Cloud Doc.Name | Required | Sensitive | Default | Description |
---|---|---|---|---|
mongodb.hosts | true | false | null | The comma-separated list of hostname and port pairs (in the form ‘host’ or ‘host:port’) of the MongoDB servers in the replica set. The list contains a single hostname and a port pair. If mongodb.members.auto.discover is set to false, the host and port pair are prefixed with the replica set name (e.g., rs0/localhost:27017). |
mongodb.name | true | false | null | A unique name that identifies the connector and/or MongoDB replica set or shared cluster that this connector monitors. Each server should be monitored by at most one Debezium connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster. |
mongodb.user | false | true | null | Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. |
mongodb.password | false | true | null | Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. |
mongodb.task.id | true | false | null | The taskId of the MongoDB connector that attempts to use a separate task for each replica set. |
database.whitelist | false | false | null | A list of all databases hosted by this server which is monitored by the connector. By default, all databases are monitored. |
key.converter | false | false | null | The converter provided by Kafka Connect to convert record key. |
value.converter | false | false | null | The converter provided by Kafka Connect to convert record value. |
database.history.pulsar.topic | false | false | null | The name of the database history topic where the connector writes and recovers DDL statements. Note: this topic is for internal use only and should not be used by consumers. |
database.history.pulsar.service.url | false | false | null | Pulsar cluster service URL for history topic. |
offset.storage.topic | false | false | null | Record the last committed offsets that the connector successfully completes. By default, it’s topicNamespace + "/" + sourceName + "-debezium-offset-topic" . eg. persistent://public/default/debezium-mongodb-source-debezium-offset-topic |
json-with-envelope | false | false | false | Thejson-with-envelope config is valid only for the JsonConverter. By default, the value is set to false. When the json-with-envelope value is set to false, the consumer uses the schema Schema.KeyValue(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED) , and the message only consists of the payload. When the json-with-envelope value is set to true, the consumer uses the schema Schema.KeyValue(Schema.BYTES, Schema.BYTES) , and the message consists of the schema and the payload. |
json-with-envelope
config is valid only for the JsonConverter. By default, the value is set to false. When the json-with-envelope
value is set to false, the consumer uses the schema Schema.KeyValue(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED)
, and the message only consists of the payload.
When the json-with-envelope
value is set to true, the consumer uses the schema Schema.KeyValue(Schema.BYTES, Schema.BYTES)
, and the message consists of the schema and the payload.
Schema.KeyValue(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED)
, and the message consists of the payload.
destination-topic-name
option ) is a required configuration but it is not used for the Debezium connector to save data. The Debezium connector saves data on the following 4 types of topics:
database.server.name
), like public/default/database.server.name
.offset.storage.topic
) for storing the offset metadata messages. The connector saves the last successfully-committed offsets on this topic.database.history.pulsar.topic
) for storing the database history information. The connector writes and recovers DDL statements on this topic.