source
Kafka Connect MongoDB Source
The official MongoDB Kafka Connect Source connector.

Available on
StreamNative Cloud console

Authored by
MongoDB
Support type
Community
License
Apache License 2.0

The MongoDB Kafka source connector is a Kafka Connect connector that reads data from MongoDB and writes data to Kafka topics.

Prerequisites

  • The connection.uri is in form of mongodb+srv://username:[email protected]
  • Valid credentials with the read role on the database. For more granular access control, you can specify a custom role that allows find, and changeStream actions on the databases or collections.

Quick Start

  1. Setup the kcctl client: doc

  2. Create a MongoDB Cluster, you can create one in k8s cluster with below yaml file:

    apiVersion: v1
    kind: Service
    metadata:
      name: mongo
      labels:
        name: mongo
    spec:
      ports:
        - port: 27017
      clusterIP: None
      selector:
        role: mongo
    ---
    apiVersion: apps/v1
    kind: StatefulSet
    metadata:
      name: mongo-dbz
    spec:
      selector:
        matchLabels:
          role: mongo
      serviceName: "mongo"
      replicas: 1
      template:
        metadata:
          labels:
            role: mongo
        spec:
          terminationGracePeriodSeconds: 10
          containers:
            - name: mongo
              image: debezium/example-mongodb:2.6
              env:
                - name: MONGODB_USER
                  value: "debezium"
                - name: MONGODB_PASSWORD
                  value: "dbz"
              command:
                - mongod
                - "--replSet"
                - rs0
                - "--bind_ip" # bind mongo to all ip address to allow others to access
                - "0.0.0.0"
              ports:
                - containerPort: 27017
            - name: mongo-sidecar
              image: cvallance/mongo-k8s-sidecar
              env:
                - name: MONGO_SIDECAR_POD_LABELS
                  value: "role=mongo"
                - name: KUBE_NAMESPACE
                  value: default
                - name: KUBERNETES_MONGO_SERVICE_NAME
                  value: "mongo"
    
  3. Initialize the local MongoDB cluster:

    kubectl apply -f .ci/clusters/mongodb.yaml
    kubectl wait -l role=mongo --for=condition=Ready pod --timeout=5m
    # initialize the data
    kubectl exec mongo-dbz-0 -c mongo -- bash ./usr/local/bin/init-inventory.sh
    
  4. Create a JSON file like the following:

    {
        "name": "mongo-source",
        "config": {
            "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
            "connection.uri": "mongodb://mongo.default.svc.cluster.local:27017/?authSource=admin",
            "database": "kafka-mongo",
            "collection": "source",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable": false,
            "tasks.max": "1"
        }
    }
    
  5. Run the following command to create the connector:

    kcctl create -f <filename>.json
    

Configuration

The MongoDB Kafka source connector is configured using the following Required properties:

ParameterDescription
connection.uriThe connection URI for the MongoDB server.
databaseThe MongoDb database from which the connector imports data into Redpanda topics. The connector monitors changes in this database. Leave the field empty to watch all databases.
collectionThe collection in the MongoDB database to watch. If not set, then all collections are watched.
topic.prefixThe prefix for the Kafka topics that the connector creates. The connector appends a database name and collection name to this prefix to create the topic name.

The full properties are also available from the offical MongoDB Kafka Source Connector documentation.