The Azure Cosmos DB source connector reads data from Azure Cosmos DB (SQL API) change feed and writes data to Kafka topics. The source connector supports at-least once with multiple tasks and exactly-once for single tasks.Documentation Index
Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
Use this file to discover all available pages before exploring further.
This connector is available as a built-in connector on StreamNative Cloud.
Prerequisites
- An Azure Cosmos DB account with a database and container.
- The Cosmos DB endpoint URI (
connect.cosmos.connection.endpoint). - The Cosmos DB primary key (
connect.cosmos.master.key).
Quick Start
- Setup the kcctl client: doc
-
Create a JSON file like the following:
-
Run the following command to create the connector:
Configuration
Configure the Azure Cosmos DB source connector with the following properties:| Property | Required | Default | Description |
|---|---|---|---|
connector.class | true | Classname of the Cosmos DB source. Should be set to com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector. | |
connect.cosmos.connection.endpoint | true | Cosmos endpoint URI string. | |
connect.cosmos.master.key | true | The Cosmos primary key that the source connects with. | |
connect.cosmos.databasename | true | The name of the Cosmos database to read from. | |
connect.cosmos.containers.topicmap | true | Mapping between Kafka Topics and Cosmos Containers, formatted using CSV as shown: topic#container,topic2#container2. | |
connect.cosmos.task.poll.interval | true | Interval (in milliseconds) to poll the change feed container for changes. | |
connect.cosmos.connection.gateway.enabled | false | false | Flag to indicate whether to use gateway mode. |
connect.cosmos.messagekey.enabled | true | true | Set if the Kafka message key should be set. |
connect.cosmos.messagekey.field | true | id | Use the field’s value from the document as the message key. |
connect.cosmos.offset.useLatest | true | false | Set to "true" to use the latest (most recent) source offset, "false" to use the earliest recorded offset. |
key.converter | true | Serialization format for the key data written into Kafka topic. | |
value.converter | true | Serialization format for the value data written into the Kafka topic. | |
key.converter.schemas.enable | false | true | Set to "true" if the key data has embedded schema. |
value.converter.schemas.enable | false | true | Set to "true" if the value data has embedded schema. |
tasks.max | false | 1 | Maximum number of connector source tasks. This should be set to equal to or greater than the number of containers specified in the topicmap property. |
Supported Data Formats
The source connector supports the following data formats:| Format Name | Description |
|---|---|
| JSON (Plain) | JSON record structure without any attached schema. |
| JSON with Schema | JSON record structure with explicit schema information to ensure the data matches the expected format. |
| AVRO | A row-oriented remote procedure call and data serialization framework. It uses JSON for defining data types and protocols, and serializes data in a compact binary format. |
Supported Data Types
Azure Cosmos DB source connector converts JSON Document to Schema supporting the following data types:| JSON Data Type | Schema Type |
|---|---|
| Array | Array |
| Boolean | Boolean |
| Number | Float32, Float64, Int8, Int16, Int32, Int64 |
| Null | String |
| Object (JSON) | Struct |
| String | String |

































