The MongoDB Kafka source connector is a Kafka Connect connector that reads data from MongoDB and writes data to Kafka topics.
Prerequisites
- The
connection.uri
is in form ofmongodb+srv://username:[email protected]
- Valid credentials with the
read
role on the database. For more granular access control, you can specify a custom role that allowsfind
, andchangeStream
actions on the databases or collections.
Quick Start
Setup the kcctl client: doc
Create a MongoDB Cluster, you can create one in k8s cluster with below yaml file:
apiVersion: v1 kind: Service metadata: name: mongo labels: name: mongo spec: ports: - port: 27017 clusterIP: None selector: role: mongo --- apiVersion: apps/v1 kind: StatefulSet metadata: name: mongo-dbz spec: selector: matchLabels: role: mongo serviceName: "mongo" replicas: 1 template: metadata: labels: role: mongo spec: terminationGracePeriodSeconds: 10 containers: - name: mongo image: debezium/example-mongodb:2.6 env: - name: MONGODB_USER value: "debezium" - name: MONGODB_PASSWORD value: "dbz" command: - mongod - "--replSet" - rs0 - "--bind_ip" # bind mongo to all ip address to allow others to access - "0.0.0.0" ports: - containerPort: 27017 - name: mongo-sidecar image: cvallance/mongo-k8s-sidecar env: - name: MONGO_SIDECAR_POD_LABELS value: "role=mongo" - name: KUBE_NAMESPACE value: default - name: KUBERNETES_MONGO_SERVICE_NAME value: "mongo"
Initialize the local MongoDB cluster:
kubectl apply -f .ci/clusters/mongodb.yaml kubectl wait -l role=mongo --for=condition=Ready pod --timeout=5m # initialize the data kubectl exec mongo-dbz-0 -c mongo -- bash ./usr/local/bin/init-inventory.sh
Create a JSON file like the following:
{ "name": "mongo-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://mongo.default.svc.cluster.local:27017/?authSource=admin", "database": "kafka-mongo", "collection": "source", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "tasks.max": "1" } }
Run the following command to create the connector:
kcctl create -f <filename>.json
Configuration
The MongoDB Kafka source connector is configured using the following Required properties:
Parameter | Description |
---|---|
connection.uri | The connection URI for the MongoDB server. |
database | The MongoDb database from which the connector imports data into Redpanda topics. The connector monitors changes in this database. Leave the field empty to watch all databases. |
collection | The collection in the MongoDB database to watch. If not set, then all collections are watched. |
topic.prefix | The prefix for the Kafka topics that the connector creates. The connector appends a database name and collection name to this prefix to create the topic name. |
The full properties are also available from the offical MongoDB Kafka Source Connector documentation.