> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Azure Cosmos DB Kafka Connect Source

> The Azure Cosmos DB Kafka Connect Source connector.

The Azure Cosmos DB source connector reads data from Azure Cosmos DB (SQL API) change feed and writes data to Kafka topics. The source connector supports at-least once with multiple tasks and exactly-once for single tasks.

<Note title="✅ Available on StreamNative Cloud">
  This connector is available as a built-in connector on StreamNative Cloud.
</Note>

### Prerequisites

* An Azure Cosmos DB account with a database and container.
* The Cosmos DB endpoint URI (`connect.cosmos.connection.endpoint`).
* The Cosmos DB primary key (`connect.cosmos.master.key`).

### Quick Start

1. Setup the kcctl client: [doc](https://docs.streamnative.io/docs/kafka-connect-setup)

2. Create a JSON file like the following:
   ```json theme={null}
   {
       "name": "cosmosdb-source",
       "config": {
           "connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
           "connect.cosmos.connection.endpoint": "https://<cosmosinstance-name>.documents.azure.com:443/",
           "connect.cosmos.master.key": "<cosmosdbprimarykey>",
           "connect.cosmos.databasename": "kafkaconnect",
           "connect.cosmos.containers.topicmap": "apparels#kafka",
           "connect.cosmos.task.poll.interval": "100",
           "connect.cosmos.offset.useLatest": false,
           "value.converter": "org.apache.kafka.connect.json.JsonConverter",
           "value.converter.schemas.enable": "false",
           "key.converter": "org.apache.kafka.connect.json.JsonConverter",
           "key.converter.schemas.enable": "false",
           "tasks.max": "1"
       }
   }
   ```

3. Run the following command to create the connector:
   ```shell theme={null}
   kcctl apply -f <filename>.json
   ```

### Configuration

Configure the Azure Cosmos DB source connector with the following properties:

| Property                                    | Required | Default | Description                                                                                                                                           |
| ------------------------------------------- | -------- | ------- | ----------------------------------------------------------------------------------------------------------------------------------------------------- |
| `connector.class`                           | true     |         | Classname of the Cosmos DB source. Should be set to `com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector`.                                  |
| `connect.cosmos.connection.endpoint`        | true     |         | Cosmos endpoint URI string.                                                                                                                           |
| `connect.cosmos.master.key`                 | true     |         | The Cosmos primary key that the source connects with.                                                                                                 |
| `connect.cosmos.databasename`               | true     |         | The name of the Cosmos database to read from.                                                                                                         |
| `connect.cosmos.containers.topicmap`        | true     |         | Mapping between Kafka Topics and Cosmos Containers, formatted using CSV as shown: `topic#container,topic2#container2`.                                |
| `connect.cosmos.task.poll.interval`         | true     |         | Interval (in milliseconds) to poll the change feed container for changes.                                                                             |
| `connect.cosmos.connection.gateway.enabled` | false    | false   | Flag to indicate whether to use gateway mode.                                                                                                         |
| `connect.cosmos.messagekey.enabled`         | true     | true    | Set if the Kafka message key should be set.                                                                                                           |
| `connect.cosmos.messagekey.field`           | true     | id      | Use the field's value from the document as the message key.                                                                                           |
| `connect.cosmos.offset.useLatest`           | true     | false   | Set to `"true"` to use the latest (most recent) source offset, `"false"` to use the earliest recorded offset.                                         |
| `key.converter`                             | true     |         | Serialization format for the key data written into Kafka topic.                                                                                       |
| `value.converter`                           | true     |         | Serialization format for the value data written into the Kafka topic.                                                                                 |
| `key.converter.schemas.enable`              | false    | true    | Set to `"true"` if the key data has embedded schema.                                                                                                  |
| `value.converter.schemas.enable`            | false    | true    | Set to `"true"` if the value data has embedded schema.                                                                                                |
| `tasks.max`                                 | false    | 1       | Maximum number of connector source tasks. This should be set to equal to or greater than the number of containers specified in the topicmap property. |

### Supported Data Formats

The source connector supports the following data formats:

| Format Name      | Description                                                                                                                                                                |
| ---------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| JSON (Plain)     | JSON record structure without any attached schema.                                                                                                                         |
| JSON with Schema | JSON record structure with explicit schema information to ensure the data matches the expected format.                                                                     |
| AVRO             | A row-oriented remote procedure call and data serialization framework. It uses JSON for defining data types and protocols, and serializes data in a compact binary format. |

### Supported Data Types

Azure Cosmos DB source connector converts JSON Document to Schema supporting the following data types:

| JSON Data Type | Schema Type                                 |
| -------------- | ------------------------------------------- |
| Array          | Array                                       |
| Boolean        | Boolean                                     |
| Number         | Float32, Float64, Int8, Int16, Int32, Int64 |
| Null           | String                                      |
| Object (JSON)  | Struct                                      |
| String         | String                                      |

For full details, see the [Azure Cosmos DB source connector documentation](https://github.com/microsoft/kafka-connect-cosmosdb/blob/main/doc/README_Source.md)
