The Azure Cosmos DB sink connector reads data from Kafka topics and writes data to Azure Cosmos DB (SQL API). The sink connector fully supports exactly-once semantics.
This connector is available as a built-in connector on StreamNative Cloud.
Prerequisites
- An Azure Cosmos DB account with a database and container.
- The Cosmos DB endpoint URI (
connect.cosmos.connection.endpoint).
- The Cosmos DB primary key (
connect.cosmos.master.key).
Quick Start
-
Setup the kcctl client: doc
-
Create a JSON file like the following:
{
"name": "cosmosdb-sink",
"config": {
"connector.class": "com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector",
"connect.cosmos.connection.endpoint": "https://<cosmosinstance-name>.documents.azure.com:443/",
"connect.cosmos.master.key": "<cosmosdbprimarykey>",
"connect.cosmos.databasename": "kafkaconnect",
"connect.cosmos.containers.topicmap": "hotels#kafka",
"topics": "hotels",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"tasks.max": "1"
}
}
-
Run the following command to create the connector:
kcctl create -f <filename>.json
Configuration
Configure the Azure Cosmos DB sink connector with the following properties:
| Property | Required | Default | Description |
|---|
connector.class | true | | Classname of the Cosmos DB sink. Should be set to com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector. |
connect.cosmos.connection.endpoint | true | | Cosmos endpoint URI string. |
connect.cosmos.master.key | true | | The Cosmos primary key that the sink connects with. |
connect.cosmos.databasename | true | | The name of the Cosmos database the sink writes to. |
connect.cosmos.containers.topicmap | true | | Mapping between Kafka Topics and Cosmos Containers, formatted using CSV as shown: topic#container,topic2#container2. |
topics | true | | A list of Kafka topics to watch. |
connect.cosmos.connection.gateway.enabled | false | false | Flag to indicate whether to use gateway mode. |
connect.cosmos.sink.bulk.enabled | false | true | Flag to indicate whether bulk mode is enabled. |
connect.cosmos.sink.maxRetryCount | false | 10 | Max retry attempts on transient write failures. NOTE: This is different from max throttling retry attempts, which are infinite. |
connect.cosmos.connection.sharing.enabled | false | false | Flag to enable connection sharing between instances of cosmos clients on the same JVM. NOTE: If gateway mode is enabled, this configure will not make any difference. |
key.converter | true | | Serialization format for the key data written into Kafka topic. |
value.converter | true | | Serialization format for the value data written into the Kafka topic. |
key.converter.schemas.enable | false | true | Set to "true" if the key data has embedded schema. |
value.converter.schemas.enable | false | true | Set to "true" if the value data has embedded schema. |
tasks.max | false | 1 | Maximum number of connector sink tasks. |
The sink connector supports the following data formats:
| Format Name | Description |
|---|
| JSON (Plain) | JSON record structure without any attached schema. |
| JSON with Schema | JSON record structure with explicit schema information to ensure the data matches the expected format. |
| AVRO | A row-oriented remote procedure call and data serialization framework. It uses JSON for defining data types and protocols, and serializes data in a compact binary format. |
Supported Data Types
Azure Cosmos DB sink connector converts SinkRecord into JSON Document supporting the following schema types:
| Schema Type | JSON Data Type |
|---|
| Array | Array |
| Boolean | Boolean |
| Float32 | Number |
| Float64 | Number |
| Int8 | Number |
| Int16 | Number |
| Int32 | Number |
| Int64 | Number |
| Map | Object (JSON) |
| String | String |
| Struct | Object (JSON) |
For full details, see the Azure Cosmos DB sink connector documentation