> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

> The official Google Cloud Bigtable Sink connector.

# Kafka connect google bigtable sink

The Kafka Connect Bigtable sink is a dedicated connector designed to stream data into Bigtable in real time with as little latency as possible.

### Prerequisites

You must have a GCP project in order to use Cloud Bigtable.

Follow these [setup steps](https://docs.cloud.google.com/bigtable/docs/overview)
for Bigtable before doing the [quickstart](#quick-start).

### Quick Start

1. Setup the kcctl client: [doc](https://docs.streamnative.io/docs/kafka-connect-setup)
2. Create a Bigtable instance in your GCP project.
3. (Optional) Create a Bigtable table with column families to receive data(you can set the connector to automatically create tables and column families if they do not exist).
4. Create a JSON file like the following:

```json theme={null}
{
    "name": "bigtable-sink",
    "config": {
        "connector.class": "com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector",
        "gcp.bigtable.project.id": "${GCP_PROJECT_ID}",
        "gcp.bigtable.instance.id": "${BIGTABLE_INSTANCE_ID}",
        "gcp.bigtable.credentials.json": "${GCP_CREDENTIALS_JSON}",
        "topics": "${KAFKA_TOPIC_NAME}",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false"
    }
}
```

5. Run the following command to create the connector:

```shell theme={null}
kcctl apply -f <filename>.json
```

### Configuration

The Google Bigtable sink connector is configured using the following properties:

| Config                        | Type    | Required              | Description                                                                                                                                 | Default      |
| ----------------------------- | ------- | --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------- | ------------ |
| gcp.bigtable.project.id       | String  | REQUIRED (No default) | The ID of the GCP project.                                                                                                                  |              |
| gcp.bigtable.instance.id      | String  | REQUIRED (No default) | The ID of the Cloud Bigtable instance.                                                                                                      |              |
| gcp.bigtable.app.profile.id   | String  | Optional              | The application profile that the connector should use.                                                                                      | default      |
| gcp.bigtable.credentials.path | String  | Optional              | The path to the JSON service key file.                                                                                                      |              |
| gcp.bigtable.credentials.json | String  | Optional              | GCP credentials JSON blob.                                                                                                                  |              |
| insert.mode                   | String  | Optional              | Defines the insertion mode to use. Supported modes are: INSERT, UPSERT                                                                      | INSERT       |
| max.batch.size                | Int     | Optional              | The maximum number of records that can be batched into a batch of upserts.                                                                  | 1            |
| value.null.mode               | String  | Optional              | Defines what to do with `null`s within Kafka values. Supported modes are: WRITE, IGNORE, DELETE.                                            | WRITE        |
| error.mode                    | String  | Optional              | Specifies how to handle errors that result from writes, after retries. Supported modes are: FAIL, WARN, IGNORE                              | FAIL         |
| table.name.format             | String  | Optional              | Name of the destination table. Use `${topic}` within the table name to specify the originating topic name.                                  | `${topic}`   |
| row\.key.definition           | String  | Optional              | A comma separated list of Kafka Record key field names that specifies the order of Kafka key fields to be concatenated to form the row key. |              |
| row\.key.delimiter            | String  | Optional              | The delimiter used in concatenating Kafka key fields in the row key.                                                                        |              |
| auto.create.tables            | Boolean | Optional              | Whether to automatically create the destination table if it is found to be mission.                                                         | false        |
| auto.create.column.families   | Boolean | Optional              | Whether to automatically create missing columns families in the table relative to the record schema.                                        | false        |
| default.column.family         | String  | Optional              | Any root-level fields on the SinkRecord that aren't objects will be added to this column family.                                            | `${topic}`   |
| default.column.qualifier      | String  | Optional              | Any root-level values on the SinkRecord that aren't objects will be added to this column within default column family.                      | KAFKA\_VALUE |
| retry.timeout.ms              | Long    | Optional              | Maximum time in milliseconds allocated for retrying database operations before trying other error handling mechanisms.                      | 90000        |
