The MongoDB Kafka sink connector is a Kafka Connect connector that reads data from Kafka topics and writes data to MongoDB.
Prerequisites
- Valid credentials with the
readWrite
role on the database. For more granular access control, you can specify a custom role that allowsinsert
,remove
, andupdate
actions on the databases or collections. - The
connection.uri
is in form ofmongodb+srv://username:[email protected]
Quick Start
Setup the kcctl client: doc
Create a MongoDB Cluster, you can create one in k8s cluster with below yaml file:
apiVersion: v1 kind: Service metadata: name: mongo labels: name: mongo spec: ports: - port: 27017 clusterIP: None selector: role: mongo --- apiVersion: apps/v1 kind: StatefulSet metadata: name: mongo-dbz spec: selector: matchLabels: role: mongo serviceName: "mongo" replicas: 1 template: metadata: labels: role: mongo spec: terminationGracePeriodSeconds: 10 containers: - name: mongo image: debezium/example-mongodb:2.6 env: - name: MONGODB_USER value: "debezium" - name: MONGODB_PASSWORD value: "dbz" command: - mongod - "--replSet" - rs0 - "--bind_ip" # bind mongo to all ip address to allow others to access - "0.0.0.0" ports: - containerPort: 27017 - name: mongo-sidecar image: cvallance/mongo-k8s-sidecar env: - name: MONGO_SIDECAR_POD_LABELS value: "role=mongo" - name: KUBE_NAMESPACE value: default - name: KUBERNETES_MONGO_SERVICE_NAME value: "mongo"
Initialize the local MongoDB cluster:
kubectl apply -f .ci/clusters/mongodb.yaml kubectl wait -l role=mongo --for=condition=Ready pod --timeout=5m # initialize the data kubectl exec mongo-dbz-0 -c mongo -- bash ./usr/local/bin/init-inventory.sh
Create a JSON file like the following:
{ "name": "mongo-sink", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "connection.uri": "mongodb://mongo.default.svc.cluster.local:27017/?authSource=admin", "database": "kafka-mongo-sink", "topics": "kafka-mongo-input", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "tasks.max": "1" } }
Run the following command to create the connector:
kcctl create -f <filename>.json
Limitations
If you want to use the MongoDB CDC handler for data sourced from MongoDB instances by MongoDB source connector, you will need to select STRING
or BYTES
as the value converter for both MongoDB source and MongoDB sink connectors. Details can be found here.
Configuration
The MongoDB Kafka sink connector is configured using the following Required properties:
Parameter | Description |
---|---|
connection.uri | The connection URI for the MongoDB server. |
database | The MongoDB database name. |
topics | A list of Kafka topics that the sink connector watches. (You can define either the topics or the topics.regex setting, but not both.) |
topics.regex | A regular expression that matches the Kafka topics that the sink connector watches. (You can define either the topics or the topics.regex setting, but not both.) |
The full properties are also available from the offical MongoDB Kafka Sink Connector documentation.