Skip to main content
This is a sink Apache Kafka Connect connector that stores Kafka messages in a Google Cloud Storage (GCS) bucket.

Prerequisites

You must have a GCP project in order to use GCS.

Quick Start

  1. Setup the kcctl client: doc
  2. Create a GCS bucket in your GCP project.
  3. Create a JSON file like the following:
    {
        "name": "gcs-sink",
        "config": {
            "connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
            "tasks.max": "1",
            "topics": "kafka-gcs-input",
            "format.output.type": "json",
            "gcs.bucket.name": "${GCS_BUCKET_NAME}"
        }
    }
    
  4. Run the following command to create the connector:
    kcctl create -f <filename>.json
    

Configuration

The GCS Kafka Connect Sink connector is configured using the following properties:
ParameterRequiredDescriptionDefault
connector.classYesThe Java class for the GCS Sink connector.
tasks.maxYesThe maximum number of tasks that should be created for this connector.
topicsNoA comma-separated list of Kafka topics to consume from, Only one of topics or topics.regex should be specified.
topics.regexNoRegular expression giving topics to consume. Under the hood, the regex is compiled to a java.util.regex.Pattern. Only one of topics or topics.regex should be specified.
gcs.bucket.nameYesThe name of the GCS bucket where the data will be stored.
gcs.credentials.jsonNoThe GCP credentials in JSON format. If not provided, the connector will use the default application credentials.
gcs.credentials.pathNoThe path to a GCP credentials file. Cannot be set together with “gcs.credentials.json or “gcs.credentials.default.
gcs.credentials.defaultNoWhether to connect using default the GCP SDK default credential discovery. When set to null (the default) or false, will fall back to connecting with No Credentials.Cannot be set together with “gcs.credentials.json” or “gcs.credentials.path”.
gcs.object.content.encodingNoThe GCS object metadata value of Content-Encoding.
gcs.endpointNoExplicit GCS Endpoint Address, mainly for testing.
gcs.retry.backoff.initial.delay.msNoInitial retry delay in milliseconds. The default value is 1000.1000
gcs.retry.backoff.max.delay.msNoMaximum retry delay in milliseconds. The default value is 32000.32000
gcs.retry.backoff.delay.multiplierNoRetry delay multiplier. The default value is 2.0.2.0
gcs.retry.backoff.max.attemptsNoRetry max attempts. The default value is 6.6
gcs.retry.backoff.total.timeout.msNoRetry total timeout in milliseconds. The default value is 50000.50000
gcs.user.agentNoA custom user agent used while contacting google.”Google GCS Sink/3.4.1 (GPN: Aiven;)“
file.name.prefixNoThe prefix to be added to the name of each file put on GCS.
file.name.templateNoThe template for file names on GCS. Supports {{ variable }} placeholders for substituting variables. Currently supported variables are topic, partition, and start_offset (the offset of the first record in the file).{{topic}}-{{partition:padding=false}}-{{start_offset:padding=false}}
file.compression.typeNoThe compression type used for files put on GCS. The supported values are: ‘none’, ‘gzip’, ‘snappy’, ‘zstd’.none
file.max.recordsNoThe maximum number of records to put in a single file. Must be a non-negative integer number. 0 is interpreted as “unlimited”, which is the default.0
file.name.timestamp.timezoneNoSpecifies the timezone in which the dates and time for the timestamp variable will be treated. Use standard shot and long names. Default is UTC.UTC
file.name.timestamp.sourceNoSpecifies the the timestamp variable source. Default is wall-clock.WALLCLOCK
format.output.typeNoThe format type of output contentThe supported values are: ‘avro’, ‘csv’, ‘json’, ‘jsonl’, ‘parquet’.csv
format.output.fieldsNoFields to put into output files. The supported values are: ‘key’, ‘value’, ‘offset’, ‘timestamp’, ‘headers’.value
format.output.fields.value.encodingNoThe type of encoding for the value field. The supported values are: ‘none’, ‘base64’.base64
format.output.envelopeNoWhether to enable envelope for entries with single field.true
errors.deadletterqueue.topic.nameNoThe name of the topic to be used as the dead letter queue (DLQ) for messages that result in an error when processed by this sink connector, or its transformations or converters. The topic name is blank by default, which means that no messages are to be recorded in the DLQ.
errors.deadletterqueue.topic.replication.factorNoReplication factor used to create the dead letter queue topic when it doesn’t already exist.3
errors.deadletterqueue.context.headers.enableNoIf true, add headers containing error context to the messages written to the dead letter queue. To avoid clashing with headers from the original record, all error context header keys, all error context header keys will start with __connect.errors.false