The Debezium MongoDB Source connector is a Kafka Connect connector that captures document-level changes in a MongoDB database and streams them to Kafka topics.

Prerequisites

  • A running MongoDB replica set or sharded cluster

Quick Start

  1. Setup the kcctl client: doc
  2. Create a JSON file like the following:
    {
        "name": "debezium-mongodb-source",
        "config": {
            "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
            "tasks.max": "1",
            "mongodb.connection.string": "mongodb://{host}:27017/?replicaSet=rs0",
            "mongodb.user": "{username}",
            "mongodb.password": "{password}",
            "database.include.list": "db1,db2",
            "topic.prefix": "my_prefix"
        }
    }
    
  3. Run the following command to create the connector:
    kcctl create -f <filename>.json
    

Configuration

The Debezium MongoDB Source connector is configured using the following properties:
PropertyRequiredDefaultDescription
nametrueNo defaultUnique name for the connector. Attempting to register again with the same name will fail. (This property is required by all Kafka Connect connectors.)
connector.classtrueNo defaultThe name of the Java class for the connector. Always use a value of io.debezium.connector.mongodb.MongoDbConnector for the MongoDB connector.
mongodb.connection.stringtrueNo defaultSpecifies a connection string that the connector uses to connect to a MongoDB replica set. This property replaces the mongodb.hosts property that was available in previous versions of the MongoDB connector.
topic.prefixtrueNo defaultA unique name that identifies the connector and/or MongoDB replica set or sharded cluster that this connector monitors. Each server should be monitored by at most one Debezium connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster. Use only alphanumeric characters, hyphens, dots and underscores to form the name. The logical name should be unique across all other connectors, because the name is used as the prefix in naming the Kafka topics that receive records from this connector.



Do not change the value of this property. If you change the name value, after a restart, instead of continuing to emit events to the original topics, the connector emits subsequent events to topics whose names are based on the new value.
internal.mongodb.allow.offset.invalidationfalsefalseSet this property to true to enable the connector to invalidate and consolidate shard-specific offsets that were recorded by earlier connector versions.

This property permits you to modify the current default behavior. The property is subject to removal in a future release if the default behavior changes to permit the connector to automatically invalidate and consolidate offsets that are recorded by earlier connector versions.
mongodb.authentication.classfalseDefaultMongoDbAuthProviderA full Java class name that is an implementation of the io.debezium.connector.mongodb.connection.MongoDbAuthProvider interface. This class handles setting the credentials on the MongoDB connection (called on each app boot). Default behavior uses the mongodb.user, mongodb.password, and mongodb.authsource properties according to each of their documentation, but other implementations may use them differently or ignore them altogether. Note that any setting in mongodb.connection.string will override settings set by this class
mongodb.userfalseNo defaultWhen using default mongodb.authentication.class: Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.
mongodb.passwordfalseNo defaultWhen using default mongodb.authentication.class: Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.
mongodb.authsourcefalseadminWhen using default mongodb.authentication.class: Database (authentication source) containing MongoDB credentials. This is required only when MongoDB is configured to use authentication with another authentication database than admin.
mongodb.ssl.enabledfalsefalseConnector will use SSL to connect to MongoDB instances.
mongodb.ssl.invalid.hostname.allowedfalsefalseWhen SSL is enabled this setting controls whether strict hostname checking is disabled during connection phase. If true the connection will not prevent man-in-the-middle attacks.
filters.match.modefalseregexThe mode used to match events based on included/excluded database and collection names. Set the property to one of the following values:

regex: Database and collection includes/excludes are evaluated as comma-separated list of regular expressions.

literal: Database and collection includes/excludes are evaluated as comma-separated list of string literals. Whitespace characters surrounding these literals are stripped.
database.include.listfalseempty stringAn optional comma-separated list of regular expressions or literals that match database names to be monitored. By default, all databases are monitored.
When database.include.list is set, the connector monitors only the databases that the property specifies. Other databases are excluded from monitoring.

To match the name of a database, Debezium performs one of the following actions based on the value of filters.match.mode property

applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the database; it does not match substrings that might be present in a database name.

compares the literals that you specify with the entire name string of the database


If you include this property in the configuration, do not also set the database.exclude.list property.
database.exclude.listfalseempty stringAn optional comma-separated list of regular expressions or literals that match database names to be excluded from monitoring. When database.exclude.list is set, the connector monitors every database except the ones that the property specifies.

To match the name of a database, Debezium performs one of the following actions based on the value of filters.match.mode property

applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the database; it does not match substrings that might be present in a database name.

compares the literals that you specify with the entire name string of the database


If you include this property in the configuration, do not set the database.include.list property.
collection.include.listfalseempty stringAn optional comma-separated list of regular expressions or literals that match fully-qualified namespaces for MongoDB collections to be monitored. By default, the connector monitors all collections except those in the local and admin databases. When collection.include.list is set, the connector monitors only the collections that the property specifies. Other collections are excluded from monitoring. Collection identifiers are of the form databaseName.collectionName.

To match the name of a namespace, Debezium performs one of the following actions based on the value of filters.match.mode property

applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the namespace; it does not match substrings in the name.

compares the literals that you specify with the entire name string of the namespace


If you include this property in the configuration, do not also set the collection.exclude.list property.
collection.exclude.listfalseempty stringAn optional comma-separated list of regular expressions or literals that match fully-qualified namespaces for MongoDB collections to be excluded from monitoring. When collection.exclude.list is set, the connector monitors every collection except the ones that the property specifies. Collection identifiers are of the form databaseName.collectionName.


To match the name of a namespace, Debezium performs one of the following actions based on the value of filters.match.mode property

applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the namespace; it does not match substrings that might be present in a database name.

compares the literals that you specify with the entire name string of the namespace


If you include this property in the configuration, do not set the collection.include.list property.
capture.modefalsechange_streams_update_fullSpecifies the method that the connector uses to capture update event changes from a MongoDB server. Set this property to one of the following values:

change_streams: update event messages do not include the full document. Messages do not include a field that represents the state of the document before the change.

change_streams_update_full: update event messages include the full document. Messages do not include a before field that represents the state of the document before the update. The event message returns the full state of the document in the after field. Set capture.mode.full.update.type to specify how the connector fetches full documents from the database.

In some situations, when capture.mode is configured to return full documents, the updateDescription and after fields of the update event message might report inconsistent values. Such discrepancies can result after multiple updates are applied to a document in rapid succession. The connector requests the full document from the MongoDB database only after it receives the update described in the event’s updateDescription field. If a later update modifies the source document before the connector can retrieve it from the database, the connector receives the document that is modified by this later update.

change_streams_update_full_with_pre_image: update event event messages include the full document, and include a field that represents the state of the document before the change. Set capture.mode.full.update.type to specify how the connector fetches full documents from the database.

change_streams_with_pre_image: update events do not include the full document, but include a field that represents the state of the document before the change.
capture.scopefalsedeploymentSpecifies the scope of the change streams that the connector opens. Set this property to one of the following values:

deployment: Opens a change stream cursor for a deployment (either a replica set or a sharded cluster) to watch for changes to all non-system collections across all databases, except for admin, local, and config.

database: Opens a change stream cursor for a single database to watch for changes to all of its non-system collections.

To support Debezium signaling, if you set capture.scope to database, the signaling data collection must reside in a database that is specified by the capture.target property.

collection: Opens a change stream cursor for a single collection to watch for changes to that collection.

This feature is currently in an incubating state. The exact semantics, configuration options, and so forth are subject to change, based on the feedback that we receive.

Setting the value of the capture.scope property to collection prevents the connector from using the default source signaling channel. Because the source channel must be enabled to permit connectors to process incremental snapshot signals — even for signals are sent over the Kafka, JMX, or File channels — the connector cannot perform incremental snapshots when capture-scope is set to collection.
capture.targetfalseSpecifies the database that the connector monitors for changes. This property applies only if the capture.scope is set to database.
field.exclude.listfalseempty stringAn optional comma-separated list of the fully-qualified names of fields that should be excluded from change event message values. Fully-qualified names for fields are of the form databaseName.collectionName.fieldName.nestedFieldName, where databaseName and collectionName may contain the wildcard (*) which matches any characters.
field.renamesfalseempty stringAn optional comma-separated list of the fully-qualified replacements of fields that should be used to rename fields in change event message values. Fully-qualified replacements for fields are of the form databaseName.collectionName.fieldName.nestedFieldName:newNestedFieldName, where databaseName and collectionName may contain the wildcard (*) which matches any characters, the colon character (:) is used to determine rename mapping of field. The next field replacement is applied to the result of the previous field replacement in the list, so keep this in mind when renaming multiple fields that are in the same path.
tombstones.on.deletefalsetrueControls whether a delete event is followed by a tombstone event.

true - a delete operation is represented by a delete event and a subsequent tombstone event.

false - only a delete event is emitted.

After a source record is deleted, emitting a tombstone event (the default behavior) allows Kafka to completely delete all events that pertain to the key of the deleted row in case log compaction is enabled for the topic.
schema.name.adjustment.modefalsenoneSpecifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings:


none does not apply any adjustment.


avro replaces the characters that cannot be used in the Avro type name with underscore.


avro_unicode replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java
field.name.adjustment.modefalsenoneSpecifies how field names should be adjusted for compatibility with the message converter used by the connector. Possible settings:


none does not apply any adjustment.


avro replaces the characters that cannot be used in the Avro type name with underscore.


avro_unicode replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java


See Avro naming for more details.
capture.mode.full.update.typefalselookupSpecifies how the connector looks up the full value of an updated document when the capture.mode is set retrieve full documents. The connector retrieves full documents when its capture.mode is set to one of the following options:

change_streams_update_full

change_streams_update_full_with_pre-image

To use this option with a MongoDB change streams collection, you must configure the collection to return document pre- and post-images. Pre- and post-images for an operation are available only if the required configuration is in place before the operation occurs.

Set this property to one of the following values:

lookup: The connector uses a separate lookup to fetch the updated full MongoDB document.

If the lookup process fails to retrieve a document, it cannot populate the full document to the after state in the event payload. In such a situation, the connector emits an event message that contains a null value in the after field.

Failed lookups can occur because a delete operation removed the document immediately after it was created, or because a change to the sharding key results in the document being moved to a different location. Sharding key changes can result when you modify any of the properties that make up the key.

post_image: The connector uses MongoDB post images to populate events with the full MongoDB document. The database must be running MongoDB 6.0 or later to use this option.
max.batch.sizefalse2048Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector. Defaults to 2048.
max.queue.sizefalse8192Positive integer value that specifies the maximum number of records that the blocking queue can hold. When Debezium reads events streamed from the database, it places the events in the blocking queue before it writes them to Kafka. The blocking queue can provide backpressure for reading change events from the database in cases where the connector ingests messages faster than it can write them to Kafka, or when Kafka becomes unavailable. Events that are held in the queue are disregarded when the connector periodically records offsets. Always set the value of max.queue.size to be larger than the value of max.batch.size.
max.queue.size.in.bytesfalse0A long integer value that specifies the maximum volume of the blocking queue in bytes. By default, volume limits are not specified for the blocking queue. To specify the number of bytes that the queue can consume, set this property to a positive long value.
If max.queue.size is also set, writing to the queue is blocked when the size of the queue reaches the limit specified by either property. For example, if you set max.queue.size=1000, and max.queue.size.in.bytes=5000, writing to the queue is blocked after the queue contains 1000 records, or after the volume of the records in the queue reaches 5000 bytes.
connect.max.attemptsfalse16Positive integer value that specifies the maximum number of failed connection attempts to a replica set primary before an exception occurs and task is aborted. Defaults to 16, which with the defaults for connect.backoff.initial.delay.ms and connect.backoff.max.delay.ms results in just over 20 minutes of attempts before failing.
mongodb.ssl.keystorefalseNo DefaultAn optional setting that specifies the location of the key store file. A key store file can be used for two-way authentication between the client and the MongoDB server.
mongodb.ssl.keystore.passwordfalseNo DefaultThe password for the key store file. Specify a password only if the mongodb.ssl.keystore is configured.
mongodb.ssl.keystore.typefalseNo DefaultThe type of key store file. Specify a type only if the mongodb.ssl.keystore is configured.
mongodb.ssl.truststorefalseNo DefaultThe location of the trust store file for the server certificate verification.
mongodb.ssl.truststore.passwordfalseNo DefaultThe password for the trust store file. Used to check the integrity of the truststore, and unlock the truststore. Specify a password only if the mongodb.ssl.truststore is configured.
mongodb.ssl.truststore.typefalseNo DefaultThe type of trust store file. Specify a type only if the mongodb.ssl.truststore is configured.
source.struct.versionfalsev2Schema version for the source block in CDC events. Debezium 0.10 introduced a few breaking
changes to the structure of the source block in order to unify the exposed structure across all the connectors.
By setting this option to v1 the structure used in earlier versions can be produced. Note that this setting is not recommended and is planned for removal in a future Debezium version.
heartbeat.interval.msfalse0Controls how frequently heartbeat messages are sent.
This property contains an interval in milliseconds that defines how frequently the connector sends messages into a heartbeat topic. This can be used to monitor whether the connector is still receiving change events from the database. You also should leverage heartbeat messages in cases where only records in non-captured collections are changed for a longer period of time. In such situation the connector would proceed to read the oplog/change stream from the database but never emit any change messages into Kafka, which in turn means that no offset updates are committed to Kafka. This will cause the oplog files to be rotated out but connector will not notice it so on restart some events are no longer available which leads to the need of re-execution of the initial snapshot.

Set this parameter to 0 to not send heartbeat messages at all.
Disabled by default.
skipped.operationsfalsetA comma-separated list of the operation types that you want the connector to skip during streaming. You can configure the connector to skip the following types of operations:

c (insert/create)

u (update)

d (delete)

t (truncate)

Set the value to none if you do not want the connector to skip any operations. Because MongoDB does not support truncate change events, setting the default t value has the same effect as setting the value to none.
snapshot.collection.filter.overridesfalseNo defaultControls which collection items are included in snapshot. This property affects snapshots only. Specify a comma-separated list of collection names in the form databaseName.collectionName.

For each collection that you specify, also specify another configuration property: snapshot.collection.filter.overrides.databaseName.collectionName. For example, the name of the other configuration property might be: snapshot.collection.filter.overrides.customers.orders. Set this property to a valid filter expression that retrieves only the items that you want in the snapshot. When the connector performs a snapshot, it retrieves only the items that matches the filter expression.
snapshot.delay.msfalseNo defaultAn interval in milliseconds that the connector should wait before taking a snapshot after starting up;
Can be used to avoid snapshot interruptions when starting multiple connectors in a cluster, which may cause re-balancing of connectors.
streaming.delay.msfalse0Specifies the time, in milliseconds, that the connector delays the start of the streaming process after it completes a snapshot. Setting a delay interval helps to prevent the connector from restarting snapshots in the event that a failure occurs immediately after the snapshot completes, but before the streaming process begins. Set a delay value that is higher than the value of the offset.flush.interval.ms property that is set for the Kafka Connect worker.
snapshot.fetch.sizefalse0Specifies the maximum number of documents that should be read in one go from each collection while taking a snapshot. The connector will read the collection contents in multiple batches of this size.
Defaults to 0, which indicates that the server chooses an appropriate fetch size.
snapshot.include.collection.listfalseAll collections specified in collection.include.listAn optional, comma-separated list of regular expressions that match the fully-qualified names (.) of the schemas that you want to include in a snapshot. The specified items must be named in the connectors’s collection.include.list property. This property takes effect only if the connector’s snapshot.mode property is set to a value other than never.
This property does not affect the behavior of incremental snapshots.


To match the name of a schema, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the schema; it does not match substrings that might be present in a schema name.
snapshot.max.threadsfalse1Positive integer value that specifies the maximum number of threads used to perform an intial sync of the collections in a replica set. Defaults to 1.
snapshot.modefalseinitialSpecifies the criteria for performing a snapshot when the connector starts. Set the property to one of the following values:

always: The connector performs a snapshot every time that it starts. The snapshot includes the structure and data of the captured tables. Specify this value to populate topics with a complete representation of the data from the captured tables every time that the connector starts. After the snapshot completes, the connector begins to stream event records for subsequent database changes.

initial: When the connector starts, it performs an initial database snapshot. After the snapshot completes, the connector begins to stream event records for subsequent database changes.

initial_only: The connector performs a database a snapshot only when no offsets have been recorded for the logical server name. After the snapshot completes, the connector stops. It does not transition to streaming event records for subsequent database changes.

never: Deprecated, see no_data.

no_data: The connector runs a snapshot that captures the structure of all relevant tables, but it does not create READ events to represent the data set at the point of the connector’s start-up.

when_needed: After the connector starts, it performs a snapshot only if it detects one of the following circumstances:

It cannot detect any topic offsets.

A previously recorded offset specifies a log position that is not available on the server.

configuration_based: With this option, you control snapshot behavior through a set of connector properties that have the prefix ‘snapshot.mode.configuration.based’.

custom: The custom snapshot mode lets you inject your own implementation of the io.debezium.spi.snapshot.Snapshotter interface. Set the snapshot.mode.custom.name configuration property to the name provided by the name() method of your implementation.

For more information, see custom snapshotter SPI.
snapshot.mode.configuration.based.snapshot.datafalsefalseIf the snapshot.mode is set to configuration_based, set this property to specify whether the connector includes table data when it performs a snapshot.
snapshot.mode.configuration.based.snapshot.schemafalsefalseIf the snapshot.mode is set to configuration_based, set this property to specify whether the connector includes the table schema when it performs a snapshot.
snapshot.mode.configuration.based.start.streamfalsefalseIf the snapshot.mode is set to configuration_based, set this property to specify whether the connector begins to stream change events after a snapshot completes.
snapshot.mode.configuration.based.snapshot.on.schema.errorfalsefalseIf the snapshot.mode is set to configuration_based, set this property to specify whether the connector includes table schema in a snapshot if the schema history topic is not available.
snapshot.mode.configuration.based.snapshot.on.data.errorfalsefalseIf the snapshot.mode is set to configuration_based, this property specifies whether the connector attempts to snapshot table data if it does not find the last committed offset in the transaction log.
Set the value to true to instruct the connector to perform a new snapshot.
snapshot.mode.custom.namefalseNo defaultIf snapshot.mode is set to custom, use this setting to specify the name of the custom implementation that is provided in the name() method that is defined in the ‘io.debezium.spi.snapshot.Snapshotter’ interface. After a connector restart, Debezium calls the specified custom implementation to determine whether to perform a snapshot. For more information, see custom snapshotter SPI.
provide.transaction.metadatafalsefalseWhen set to true Debezium generates events with transaction boundaries and enriches data events envelope with transaction metadata.

See Transaction Metadata for additional details.
retriable.restart.connector.wait.msfalse10000 (10 seconds)The number of milliseconds to wait before restarting a connector after a retriable error occurs.
mongodb.poll.interval.msfalse30000The interval in which the connector polls for new, removed, or changed replica sets.
mongodb.connect.timeout.msfalse10000 (10 seconds)The number of milliseconds the driver will wait before a new connection attempt is aborted.
mongodb.heartbeat.frequency.msfalse10000 (10 seconds)The frequency that the cluster monitor attempts to reach each server.
mongodb.socket.timeout.msfalse0The number of milliseconds before a send/receive on the socket can take before a timeout occurs. A value of 0 disables this behavior.
mongodb.server.selection.timeout.msfalse30000 (30 seconds)The number of milliseconds the driver will wait to select a server before it times out and throws an error.
cursor.pipelinefalseNo defaultWhen streaming changes, this setting applies processing to change stream events as part of the standard MongoDB aggregation stream pipeline. A pipeline is a MongoDB aggregation pipeline composed of instructions to the database to filter or transform data. This can be used customize the data that the connector consumes. The value of this property must be an array of permitted aggregation pipeline stages in JSON format. Note that this is appended after the internal pipeline used to support the connector (e.g. filtering operation types, database names, collection names, etc.).
cursor.pipeline.orderfalseinternal_firstThe order used to construct the effective MongoDB aggregation stream pipeline. Set the property to one of the following values:

internal_first: Internal stages defined by the connector are applied first. This means that only the events which ought to be captured by the connector are fed to the user defined stages (configured by setting cursor.pipeline).

user_first: Stages defined by the ‘cursor.pipeline’ property are applied first. In this mode all events, included those not captured by the connector, are fed to user defined pipeline stages. This mode can have negative performance impact if the value of cursor.pipeline contains complex operations.

user_only: Stages defined by the ‘cursor.pipeline’ property will replace internal stages defined by the connector. This mode is intended only for expert users since all events are processed only by user defined pipeline stages. This mode can have negative impact on performance and overall functionality of the connector!
cursor.oversize.handling.modefalsefailThe strategy used to handle change events for documents exceeding specified BSON size. Set the property to one of the following values:

fail: The connector fails if the total size of change event exceed the maximum BSON size.

skip: Any change events for documents exceeding the maximum (specified by the cursor.oversize.skip.threshold property) size will be ignored

split: Change events exceeding the maximum BSON size will be split using the $changeStreamSplitLargeEvent aggregation. This option requires MongoDB 6.0.9 or newer.
cursor.oversize.skip.thresholdfalse0The maximum allowed size in bytes of the stored document for which change events are processed. This includes both, the size before and after database operation, more specifically this limits the size of fullDocument and fullDocumentBeforeChange filed of MongoDB change events.
cursor.max.await.time.msfalse0Specifies the maximum number of milliseconds the oplog/change stream cursor will wait for the server to produce a result before causing an execution timeout exception. A value of 0 indicates using the server/driver default wait timeout.
signal.data.collectionfalseNo defaultFully-qualified name of the data collection that is used to send signals to the connector. Use the following format to specify the collection name:
.
signal.enabled.channelsfalsesourceList of the signaling channel names that are enabled for the connector. By default, the following channels are available:

source

kafka

file

jmx Optionally, you can also implement a custom signaling channel.
notification.enabled.channelsfalseNo defaultList of notification channel names that are enabled for the connector. By default, the following channels are available:

sink

log

jmx Optionally, you can also implement a custom notification channel.
incremental.snapshot.chunk.sizefalse1024The maximum number of documents that the connector fetches and reads into memory during an incremental snapshot chunk. Increasing the chunk size provides greater efficiency, because the snapshot runs fewer snapshot queries of a greater size. However, larger chunk sizes also require more memory to buffer the snapshot data. Adjust the chunk size to a value that provides the best performance in your environment.
incremental.snapshot.watermarking.strategyfalseinsert_insertSpecifies the watermarking mechanism that the connector uses during an incremental snapshot to deduplicate events that might be captured by an incremental snapshot and then recaptured after streaming resumes.
You can specify one of the following options:

insert_insert: When you send a signal to initiate an incremental snapshot, for every chunk that Debezium reads during the snapshot, it writes an entry to the signaling data collection to record the signal to open the snapshot window. After the snapshot completes, Debezium inserts a second entry that records the signal to close the window.

insert_delete: When you send a signal to initiate an incremental snapshot, for every chunk that Debezium reads, it writes a single entry to the signaling data collection to record the signal to open the snapshot window. After the snapshot completes, this entry is removed. No entry is created for the signal to close the snapshot window. Set this option to prevent rapid growth of the signaling data collection.
topic.naming.strategyfalseio.debezium.schema.DefaultTopicNamingStrategyThe name of the TopicNamingStrategy class that should be used to determine the topic name for data change, schema change, transaction, heartbeat event etc., defaults to DefaultTopicNamingStrategy.
topic.delimiterfalse.Specify the delimiter for topic name, defaults to ..
topic.cache.sizefalse10000The size used for holding the topic names in bounded concurrent hash map. This cache will help to determine the topic name corresponding to a given data collection.
topic.heartbeat.prefixfalse__debezium-heartbeatControls the name of the topic to which the connector sends heartbeat messages. The topic name has this pattern:

topic.heartbeat.prefix.topic.prefix

For example, if the topic prefix is fulfillment, the default topic name is __debezium-heartbeat.fulfillment.
topic.transactionfalsetransactionControls the name of the topic to which the connector sends transaction metadata messages. The topic name has this pattern:

topic.prefix.topic.transaction

For example, if the topic prefix is fulfillment, the default topic name is fulfillment.transaction.
custom.metric.tagsfalseNo defaultDefines tags that customize MBean object names by adding metadata that provides contextual information. Specify a comma-separated list of key-value pairs. Each key represents a tag for the MBean object name, and the corresponding value represents a value for the key, for example,
k1=v1,k2=v2

The connector appends the specified tags to the base MBean object name. Tags can help you to organize and categorize metrics data. You can define tags to identify particular application instances, environments, regions, versions, and so forth. For more information, see Customized MBean names.
errors.max.retriesfalse-1Specifies how the connector responds after an operation that results in a retriable error, such as a connection error.
Set one of the following options:

-1

No limit. The connector always restarts automatically, and retries the operation, regardless of the number of previous failures.

0

Disabled. The connector fails immediately, and never retries the operation. User intervention is required to restart the connector.

> 0

The connector restarts automatically until it reaches the specified maximum number of retries. After the next failure, the connector stops, and user intervention is required to restart it.
For more information about the configuration properties, see the Official Debezium MongoDB Connector documentation.