Skip to main content
The Kafka Connect Bigtable sink is a dedicated connector designed to stream data into Bigtable in real time with as little latency as possible.

Prerequisites

You must have a GCP project in order to use Cloud Bigtable. Follow these setup steps for Bigtable before doing the quickstart.

Quick Start

  1. Setup the kcctl client: doc
  2. Create a Bigtable instance in your GCP project.
  3. (Optional) Create a Bigtable table with column families to receive data(you can set the connector to automatically create tables and column families if they do not exist).
  4. Create a JSON file like the following:
{
    "name": "bigtable-sink",
    "config": {
        "connector.class": "com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector",
        "gcp.bigtable.project.id": "${GCP_PROJECT_ID}",
        "gcp.bigtable.instance.id": "${BIGTABLE_INSTANCE_ID}",
        "gcp.bigtable.credentials.json": "${GCP_CREDENTIALS_JSON}",
        "topics": "${KAFKA_TOPIC_NAME}",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false"
    }
}
  1. Run the following command to create the connector:
kcctl create -f <filename>.json

Configuration

The Google Bigtable sink connector is configured using the following properties:
ConfigTypeRequiredDescriptionDefault
gcp.bigtable.project.idStringREQUIRED (No default)The ID of the GCP project.
gcp.bigtable.instance.idStringREQUIRED (No default)The ID of the Cloud Bigtable instance.
gcp.bigtable.app.profile.idStringOptionalThe application profile that the connector should use.default
gcp.bigtable.credentials.pathStringOptionalThe path to the JSON service key file.
gcp.bigtable.credentials.jsonStringOptionalGCP credentials JSON blob.
insert.modeStringOptionalDefines the insertion mode to use. Supported modes are: INSERT, UPSERTINSERT
max.batch.sizeIntOptionalThe maximum number of records that can be batched into a batch of upserts.1
value.null.modeStringOptionalDefines what to do with nulls within Kafka values. Supported modes are: WRITE, IGNORE, DELETE.WRITE
error.modeStringOptionalSpecifies how to handle errors that result from writes, after retries. Supported modes are: FAIL, WARN, IGNOREFAIL
table.name.formatStringOptionalName of the destination table. Use ${topic} within the table name to specify the originating topic name.${topic}
row.key.definitionStringOptionalA comma separated list of Kafka Record key field names that specifies the order of Kafka key fields to be concatenated to form the row key.
row.key.delimiterStringOptionalThe delimiter used in concatenating Kafka key fields in the row key.
auto.create.tablesBooleanOptionalWhether to automatically create the destination table if it is found to be mission.false
auto.create.column.familiesBooleanOptionalWhether to automatically create missing columns families in the table relative to the record schema.false
default.column.familyStringOptionalAny root-level fields on the SinkRecord that aren’t objects will be added to this column family.${topic}
default.column.qualifierStringOptionalAny root-level values on the SinkRecord that aren’t objects will be added to this column within default column family.KAFKA_VALUE
retry.timeout.msLongOptionalMaximum time in milliseconds allocated for retrying database operations before trying other error handling mechanisms.90000