Skip to main content
The Azure Cosmos DB sink connector reads data from Kafka topics and writes data to Azure Cosmos DB (SQL API). The sink connector fully supports exactly-once semantics.
This connector is available as a built-in connector on StreamNative Cloud.

Prerequisites

  • An Azure Cosmos DB account with a database and container.
  • The Cosmos DB endpoint URI (connect.cosmos.connection.endpoint).
  • The Cosmos DB primary key (connect.cosmos.master.key).

Quick Start

  1. Setup the kcctl client: doc
  2. Create a JSON file like the following:
    {
        "name": "cosmosdb-sink",
        "config": {
            "connector.class": "com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector",
            "connect.cosmos.connection.endpoint": "https://<cosmosinstance-name>.documents.azure.com:443/",
            "connect.cosmos.master.key": "<cosmosdbprimarykey>",
            "connect.cosmos.databasename": "kafkaconnect",
            "connect.cosmos.containers.topicmap": "hotels#kafka",
            "topics": "hotels",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable": "false",
            "key.converter": "org.apache.kafka.connect.json.JsonConverter",
            "key.converter.schemas.enable": "false",
            "tasks.max": "1"
        }
    }
    
  3. Run the following command to create the connector:
    kcctl create -f <filename>.json
    

Configuration

Configure the Azure Cosmos DB sink connector with the following properties:
PropertyRequiredDefaultDescription
connector.classtrueClassname of the Cosmos DB sink. Should be set to com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector.
connect.cosmos.connection.endpointtrueCosmos endpoint URI string.
connect.cosmos.master.keytrueThe Cosmos primary key that the sink connects with.
connect.cosmos.databasenametrueThe name of the Cosmos database the sink writes to.
connect.cosmos.containers.topicmaptrueMapping between Kafka Topics and Cosmos Containers, formatted using CSV as shown: topic#container,topic2#container2.
topicstrueA list of Kafka topics to watch.
connect.cosmos.connection.gateway.enabledfalsefalseFlag to indicate whether to use gateway mode.
connect.cosmos.sink.bulk.enabledfalsetrueFlag to indicate whether bulk mode is enabled.
connect.cosmos.sink.maxRetryCountfalse10Max retry attempts on transient write failures. NOTE: This is different from max throttling retry attempts, which are infinite.
connect.cosmos.connection.sharing.enabledfalsefalseFlag to enable connection sharing between instances of cosmos clients on the same JVM. NOTE: If gateway mode is enabled, this configure will not make any difference.
key.convertertrueSerialization format for the key data written into Kafka topic.
value.convertertrueSerialization format for the value data written into the Kafka topic.
key.converter.schemas.enablefalsetrueSet to "true" if the key data has embedded schema.
value.converter.schemas.enablefalsetrueSet to "true" if the value data has embedded schema.
tasks.maxfalse1Maximum number of connector sink tasks.

Supported Data Formats

The sink connector supports the following data formats:
Format NameDescription
JSON (Plain)JSON record structure without any attached schema.
JSON with SchemaJSON record structure with explicit schema information to ensure the data matches the expected format.
AVROA row-oriented remote procedure call and data serialization framework. It uses JSON for defining data types and protocols, and serializes data in a compact binary format.

Supported Data Types

Azure Cosmos DB sink connector converts SinkRecord into JSON Document supporting the following schema types:
Schema TypeJSON Data Type
ArrayArray
BooleanBoolean
Float32Number
Float64Number
Int8Number
Int16Number
Int32Number
Int64Number
MapObject (JSON)
StringString
StructObject (JSON)
For full details, see the Azure Cosmos DB sink connector documentation