Skip to main content
The Azure Cosmos DB source connector reads data from Azure Cosmos DB (SQL API) change feed and writes data to Kafka topics. The source connector supports at-least once with multiple tasks and exactly-once for single tasks.
This connector is available as a built-in connector on StreamNative Cloud.

Prerequisites

  • An Azure Cosmos DB account with a database and container.
  • The Cosmos DB endpoint URI (connect.cosmos.connection.endpoint).
  • The Cosmos DB primary key (connect.cosmos.master.key).

Quick Start

  1. Setup the kcctl client: doc
  2. Create a JSON file like the following:
    {
        "name": "cosmosdb-source",
        "config": {
            "connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
            "connect.cosmos.connection.endpoint": "https://<cosmosinstance-name>.documents.azure.com:443/",
            "connect.cosmos.master.key": "<cosmosdbprimarykey>",
            "connect.cosmos.databasename": "kafkaconnect",
            "connect.cosmos.containers.topicmap": "apparels#kafka",
            "connect.cosmos.task.poll.interval": "100",
            "connect.cosmos.offset.useLatest": false,
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable": "false",
            "key.converter": "org.apache.kafka.connect.json.JsonConverter",
            "key.converter.schemas.enable": "false",
            "tasks.max": "1"
        }
    }
    
  3. Run the following command to create the connector:
    kcctl create -f <filename>.json
    

Configuration

Configure the Azure Cosmos DB source connector with the following properties:
PropertyRequiredDefaultDescription
connector.classtrueClassname of the Cosmos DB source. Should be set to com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector.
connect.cosmos.connection.endpointtrueCosmos endpoint URI string.
connect.cosmos.master.keytrueThe Cosmos primary key that the source connects with.
connect.cosmos.databasenametrueThe name of the Cosmos database to read from.
connect.cosmos.containers.topicmaptrueMapping between Kafka Topics and Cosmos Containers, formatted using CSV as shown: topic#container,topic2#container2.
connect.cosmos.task.poll.intervaltrueInterval (in milliseconds) to poll the change feed container for changes.
connect.cosmos.connection.gateway.enabledfalsefalseFlag to indicate whether to use gateway mode.
connect.cosmos.messagekey.enabledtruetrueSet if the Kafka message key should be set.
connect.cosmos.messagekey.fieldtrueidUse the field’s value from the document as the message key.
connect.cosmos.offset.useLatesttruefalseSet to "true" to use the latest (most recent) source offset, "false" to use the earliest recorded offset.
key.convertertrueSerialization format for the key data written into Kafka topic.
value.convertertrueSerialization format for the value data written into the Kafka topic.
key.converter.schemas.enablefalsetrueSet to "true" if the key data has embedded schema.
value.converter.schemas.enablefalsetrueSet to "true" if the value data has embedded schema.
tasks.maxfalse1Maximum number of connector source tasks. This should be set to equal to or greater than the number of containers specified in the topicmap property.

Supported Data Formats

The source connector supports the following data formats:
Format NameDescription
JSON (Plain)JSON record structure without any attached schema.
JSON with SchemaJSON record structure with explicit schema information to ensure the data matches the expected format.
AVROA row-oriented remote procedure call and data serialization framework. It uses JSON for defining data types and protocols, and serializes data in a compact binary format.

Supported Data Types

Azure Cosmos DB source connector converts JSON Document to Schema supporting the following data types:
JSON Data TypeSchema Type
ArrayArray
BooleanBoolean
NumberFloat32, Float64, Int8, Int16, Int32, Int64
NullString
Object (JSON)Struct
StringString
For full details, see the Azure Cosmos DB source connector documentation