The MongoDB Kafka source connector is a Kafka Connect connector that reads data from MongoDB and writes data to Kafka topics.
This connector is available as a built-in connector on StreamNative Cloud.
 
Prerequisites
- The 
connection.uri is in form of mongodb+srv://username:password@cluster0.xxx.mongodb.net 
- Valid credentials with the 
read role on the database. For more granular access control, you can specify a custom role that allows find, and changeStream actions on the databases or collections. 
Quick Start
- 
Setup the kcctl client: doc
 
- 
Create a MongoDB Cluster, you can create one in k8s cluster with below yaml file:
apiVersion: v1
kind: Service
metadata:
  name: mongo
  labels:
    name: mongo
spec:
  ports:
    - port: 27017
  clusterIP: None
  selector:
    role: mongo
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: mongo-dbz
spec:
  selector:
    matchLabels:
      role: mongo
  serviceName: "mongo"
  replicas: 1
  template:
    metadata:
      labels:
        role: mongo
    spec:
      terminationGracePeriodSeconds: 10
      containers:
        - name: mongo
          image: debezium/example-mongodb:2.6
          env:
            - name: MONGODB_USER
              value: "debezium"
            - name: MONGODB_PASSWORD
              value: "dbz"
          command:
            - mongod
            - "--replSet"
            - rs0
            - "--bind_ip" # bind mongo to all ip address to allow others to access
            - "0.0.0.0"
          ports:
            - containerPort: 27017
        - name: mongo-sidecar
          image: cvallance/mongo-k8s-sidecar
          env:
            - name: MONGO_SIDECAR_POD_LABELS
              value: "role=mongo"
            - name: KUBE_NAMESPACE
              value: default
            - name: KUBERNETES_MONGO_SERVICE_NAME
              value: "mongo"
 
 
- 
Initialize the local MongoDB cluster:
kubectl apply -f .ci/clusters/mongodb.yaml
kubectl wait -l role=mongo --for=condition=Ready pod --timeout=5m
# initialize the data
kubectl exec mongo-dbz-0 -c mongo -- bash ./usr/local/bin/init-inventory.sh
 
 
- 
Create a JSON file like the following:
{
    "name": "mongo-source",
    "config": {
        "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
        "connection.uri": "mongodb://mongo.default.svc.cluster.local:27017/?authSource=admin",
        "database": "kafka-mongo",
        "collection": "source",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "tasks.max": "1"
    }
}
 
 
- 
Run the following command to create the connector:
kcctl create -f <filename>.json
 
 
Configuration
The MongoDB Kafka source connector is configured using the following Required properties:
| Parameter | Description | 
|---|
connection.uri | The connection URI for the MongoDB server. | 
database | The MongoDb database from which the connector imports data into Kafka topics. The connector monitors changes in this database. Leave the field empty to watch all databases. | 
collection | The collection in the MongoDB database to watch. If not set, then all collections are watched. | 
topic.prefix | The prefix for the Kafka topics that the connector creates. The connector appends a database name and collection name to this prefix to create the topic name. | 
 
The full properties are also available from the offical MongoDB Kafka Source Connector documentation.