The Azure Cosmos DB source connector reads data from Azure Cosmos DB (SQL API) change feed and writes data to Kafka topics. The source connector supports at-least once with multiple tasks and exactly-once for single tasks.
This connector is available as a built-in connector on StreamNative Cloud.
Prerequisites
- An Azure Cosmos DB account with a database and container.
- The Cosmos DB endpoint URI (
connect.cosmos.connection.endpoint).
- The Cosmos DB primary key (
connect.cosmos.master.key).
Quick Start
-
Setup the kcctl client: doc
-
Create a JSON file like the following:
{
"name": "cosmosdb-source",
"config": {
"connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
"connect.cosmos.connection.endpoint": "https://<cosmosinstance-name>.documents.azure.com:443/",
"connect.cosmos.master.key": "<cosmosdbprimarykey>",
"connect.cosmos.databasename": "kafkaconnect",
"connect.cosmos.containers.topicmap": "apparels#kafka",
"connect.cosmos.task.poll.interval": "100",
"connect.cosmos.offset.useLatest": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"tasks.max": "1"
}
}
-
Run the following command to create the connector:
kcctl create -f <filename>.json
Configuration
Configure the Azure Cosmos DB source connector with the following properties:
| Property | Required | Default | Description |
|---|
connector.class | true | | Classname of the Cosmos DB source. Should be set to com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector. |
connect.cosmos.connection.endpoint | true | | Cosmos endpoint URI string. |
connect.cosmos.master.key | true | | The Cosmos primary key that the source connects with. |
connect.cosmos.databasename | true | | The name of the Cosmos database to read from. |
connect.cosmos.containers.topicmap | true | | Mapping between Kafka Topics and Cosmos Containers, formatted using CSV as shown: topic#container,topic2#container2. |
connect.cosmos.task.poll.interval | true | | Interval (in milliseconds) to poll the change feed container for changes. |
connect.cosmos.connection.gateway.enabled | false | false | Flag to indicate whether to use gateway mode. |
connect.cosmos.messagekey.enabled | true | true | Set if the Kafka message key should be set. |
connect.cosmos.messagekey.field | true | id | Use the field’s value from the document as the message key. |
connect.cosmos.offset.useLatest | true | false | Set to "true" to use the latest (most recent) source offset, "false" to use the earliest recorded offset. |
key.converter | true | | Serialization format for the key data written into Kafka topic. |
value.converter | true | | Serialization format for the value data written into the Kafka topic. |
key.converter.schemas.enable | false | true | Set to "true" if the key data has embedded schema. |
value.converter.schemas.enable | false | true | Set to "true" if the value data has embedded schema. |
tasks.max | false | 1 | Maximum number of connector source tasks. This should be set to equal to or greater than the number of containers specified in the topicmap property. |
The source connector supports the following data formats:
| Format Name | Description |
|---|
| JSON (Plain) | JSON record structure without any attached schema. |
| JSON with Schema | JSON record structure with explicit schema information to ensure the data matches the expected format. |
| AVRO | A row-oriented remote procedure call and data serialization framework. It uses JSON for defining data types and protocols, and serializes data in a compact binary format. |
Supported Data Types
Azure Cosmos DB source connector converts JSON Document to Schema supporting the following data types:
| JSON Data Type | Schema Type |
|---|
| Array | Array |
| Boolean | Boolean |
| Number | Float32, Float64, Int8, Int16, Int32, Int64 |
| Null | String |
| Object (JSON) | Struct |
| String | String |
For full details, see the Azure Cosmos DB source connector documentation