> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Azure Cosmos DB Kafka Connect Sink

> The Azure Cosmos DB Kafka Connect Sink connector.

The Azure Cosmos DB sink connector reads data from Kafka topics and writes data to Azure Cosmos DB (SQL API). The sink connector fully supports exactly-once semantics.

<Note title="✅ Available on StreamNative Cloud">
  This connector is available as a built-in connector on StreamNative Cloud.
</Note>

### Prerequisites

* An Azure Cosmos DB account with a database and container.
* The Cosmos DB endpoint URI (`connect.cosmos.connection.endpoint`).
* The Cosmos DB primary key (`connect.cosmos.master.key`).

### Quick Start

1. Setup the kcctl client: [doc](https://docs.streamnative.io/docs/kafka-connect-setup)

2. Create a JSON file like the following:
   ```json theme={null}
   {
       "name": "cosmosdb-sink",
       "config": {
           "connector.class": "com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector",
           "connect.cosmos.connection.endpoint": "https://<cosmosinstance-name>.documents.azure.com:443/",
           "connect.cosmos.master.key": "<cosmosdbprimarykey>",
           "connect.cosmos.databasename": "kafkaconnect",
           "connect.cosmos.containers.topicmap": "hotels#kafka",
           "topics": "hotels",
           "value.converter": "org.apache.kafka.connect.json.JsonConverter",
           "value.converter.schemas.enable": "false",
           "key.converter": "org.apache.kafka.connect.json.JsonConverter",
           "key.converter.schemas.enable": "false",
           "tasks.max": "1"
       }
   }
   ```

3. Run the following command to create the connector:
   ```shell theme={null}
   kcctl apply -f <filename>.json
   ```

### Configuration

Configure the Azure Cosmos DB sink connector with the following properties:

| Property                                    | Required | Default | Description                                                                                                                                                           |
| ------------------------------------------- | -------- | ------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `connector.class`                           | true     |         | Classname of the Cosmos DB sink. Should be set to `com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector`.                                                        |
| `connect.cosmos.connection.endpoint`        | true     |         | Cosmos endpoint URI string.                                                                                                                                           |
| `connect.cosmos.master.key`                 | true     |         | The Cosmos primary key that the sink connects with.                                                                                                                   |
| `connect.cosmos.databasename`               | true     |         | The name of the Cosmos database the sink writes to.                                                                                                                   |
| `connect.cosmos.containers.topicmap`        | true     |         | Mapping between Kafka Topics and Cosmos Containers, formatted using CSV as shown: `topic#container,topic2#container2`.                                                |
| `topics`                                    | true     |         | A list of Kafka topics to watch.                                                                                                                                      |
| `connect.cosmos.connection.gateway.enabled` | false    | false   | Flag to indicate whether to use gateway mode.                                                                                                                         |
| `connect.cosmos.sink.bulk.enabled`          | false    | true    | Flag to indicate whether bulk mode is enabled.                                                                                                                        |
| `connect.cosmos.sink.maxRetryCount`         | false    | 10      | Max retry attempts on transient write failures. NOTE: This is different from max throttling retry attempts, which are infinite.                                       |
| `connect.cosmos.connection.sharing.enabled` | false    | false   | Flag to enable connection sharing between instances of cosmos clients on the same JVM. NOTE: If gateway mode is enabled, this configure will not make any difference. |
| `key.converter`                             | true     |         | Serialization format for the key data written into Kafka topic.                                                                                                       |
| `value.converter`                           | true     |         | Serialization format for the value data written into the Kafka topic.                                                                                                 |
| `key.converter.schemas.enable`              | false    | true    | Set to `"true"` if the key data has embedded schema.                                                                                                                  |
| `value.converter.schemas.enable`            | false    | true    | Set to `"true"` if the value data has embedded schema.                                                                                                                |
| `tasks.max`                                 | false    | 1       | Maximum number of connector sink tasks.                                                                                                                               |

### Supported Data Formats

The sink connector supports the following data formats:

| Format Name      | Description                                                                                                                                                                |
| ---------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| JSON (Plain)     | JSON record structure without any attached schema.                                                                                                                         |
| JSON with Schema | JSON record structure with explicit schema information to ensure the data matches the expected format.                                                                     |
| AVRO             | A row-oriented remote procedure call and data serialization framework. It uses JSON for defining data types and protocols, and serializes data in a compact binary format. |

### Supported Data Types

Azure Cosmos DB sink connector converts SinkRecord into JSON Document supporting the following schema types:

| Schema Type | JSON Data Type |
| ----------- | -------------- |
| Array       | Array          |
| Boolean     | Boolean        |
| Float32     | Number         |
| Float64     | Number         |
| Int8        | Number         |
| Int16       | Number         |
| Int32       | Number         |
| Int64       | Number         |
| Map         | Object (JSON)  |
| String      | String         |
| Struct      | Object (JSON)  |

For full details, see the [Azure Cosmos DB sink connector documentation](https://github.com/microsoft/kafka-connect-cosmosdb/blob/main/doc/README_Sink.md)
