sink
Kafka Connect BigQuery Sink
kafka-connect-bigquery is a Kafka Connect connector for Google BigQuery.

Available on
StreamNative Cloud console

Authored by
WePayConfluent
Support type
Community
License
Apache License 2.0

kafka-connect-bigquery is a Kafka Connect connector for Google BigQuery. It is available in the StreamNative Cloud.

Prerequisites

The following prerequisites are required before setting up the BigQuery connector.

  1. A valid Google Cloud account authorized for resource creation.
  2. A BigQuery project, which can be set up via the Google Cloud Console.
  3. A dataset within the BigQuery project.
  4. A service account with access to the BigQuery project that hosts the dataset; this account can be created in the Google Cloud Console.
  5. Ensure the service account is granted access to the BigQuery project containing the dataset; create and download a key in JSON format when setting up the service account.
  6. According to GCP specifications <https://cloud.google.com/bigquery/docs/access-control>, the service account will either need the BigQueryEditor primitive IAM role or the bigquery.dataEditor predefined IAM role. The minimum permissions are as follows:
   bigquery.datasets.get
   bigquery.tables.create
   bigquery.tables.get
   bigquery.tables.getData
   bigquery.tables.list
   bigquery.tables.update
   bigquery.tables.updateData

Configuration

The kafka-connect-bigquery connector is configured using the following properties:

ParameterDescriptionDefault
projectThe Google Cloud project ID
defaultDatasetThe BigQuery dataset name
topicsThe Kafka topics to sink to BigQuery
autoCreateTablesCreate BigQuery tables if they don’t already exist. This property should only be enabled for Schema Registry-based inputs: Avro, Protobuf, or JSON Schema (JSON_SR). Table creation is not supported for JSON input.true
keyfilekeyfile can be either a string representation of the Google credentials file or the path to the Google credentials file itself. The string representation of the Google credentials file is supported in BigQuery sink connector version 1.3 (and later). For StreamNative Cloud, the keyfile will be saved as StreamNative Cloud secret in JSON format
keySourceThe source of the keyfile. The keyfile can be provided as a string or a file path. For StreamNative Cloud, only JSON supported.JSON
gcsBucketNameThe name of the bucket where Google Cloud Storage (GCS) blobs are located. These blobs are used to batch-load to BigQuery. This is applicable only if enableBatchLoad is configured.
queueSizeThe maximum size (or -1 for no maximum size) of the worker queue for BigQuery write requests before all topics are paused. This is a soft limit; the size of the queue can go over this before topics are paused. All topics resume once a flush is triggered or the size of the queue drops under half of the maximum size.-1
bigQueryRetryThe number of retry attempts made for a BigQuery request that fails with a backend error or a quota exceeded error.0
bigQueryRetryWaitThe time in milliseconds to wait between retry attempts.1000
bigQueryMessageTimePartitioningThe time partitioning configuration for BigQuery tables.false
bigQueryPartitionDecoratorWhether or not to append partition decorator to BigQuery table name when inserting records. Default is true. Setting this to true appends partition decorator to table name (e.g. table$yyyyMMdd depending on the configuration set for bigQueryPartitionDecorator). Setting this to false bypasses the logic to append the partition decorator and uses raw table name for inserts.true
timestampPartitionFieldNameThe field name in the record that contains the timestamp to use for partitioning.null
clusteringPartitionFieldNamesThe field names in the record that contain the fields to use for clustering.null
timePartitioningTypeThe type of time partitioning to use. Support MONTH, YEAR, HOUR, DAYDAY
sanitizeTopicsWhether to sanitize topic names to be compatible with BigQuery table names.false
schemaRetrieverA class that can be used for automatically creating tables and/or updating schemas. Note that in version 2.0.0, SchemaRetriever API changed to retrieve the schema from each SinkRecord, which will help support multiple schemas per topic. SchemaRegistrySchemaRetriever has been removed as it retrieves schema based on the topic.com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
threadPoolSizeThe size of the BigQuery write thread pool. This establishes the maximum number of concurrent writes to BigQuery.10
allBQFieldsNullableIf true, no fields in any produced BigQuery schema are REQUIRED. All non-nullable Avro fields are translated as NULLABLE (or REPEATED, if arrays).false
avroDataCacheSizeThe size of the Avro data cache.100
batchLoadIntervalSecThe interval, in seconds, in which to attempt to run GCS to BigQuery load jobs. Only relevant if enableBatchLoad is configured.120
convertDoubleSpecialValuesDesignates whether +Infinity is converted to Double.MAX_VALUE and whether -Infinity and NaN are converted to Double.MIN_VALUE to ensure successfull delivery to BigQuery.false
enableBatchLoad[Beta Feature] Use with caution. The sublist of topics to be batch loaded through GCS.""
includeKafkaDataWhether to include an extra block containing the Kafka source topic, offset, and partition information in the resulting BigQuery rows.false
upsertEnabledEnable upsert functionality on the connector through the use of record keys, intermediate tables, and periodic merge flushes. Row-matching will be performed based on the contents of record keys. This feature won’t work with SMTs that change the name of the topic and doesn’t support JSON input.false
deleteEnabledEnable delete functionality on the connector through the use of record keys and intermediate tables. Row-matching will be performed based on the contents of record keys. This feature won’t work with SMTs that change the name of the topic and doesn’t support JSON input.false
intermediateTableSuffixA suffix that will be appended to the names of destination tables to create the names for the corresponding intermediate tables. Multiple intermediate tables may be created for a single destination table, but their names will always start with the name of the destination table, followed by this suffix, and possibly followed by an additional suffix."tmp"
mergeIntervalMsThe interval, in milliseconds, at which to attempt to merge intermediate tables into destination tables.60000
mergeRecordsThresholdThe number of records that must be in an intermediate table before it is eligible for a merge operation.-1
autoCreateBucketWhether to create the GCS bucket if it doesn't exist.true
allowNewBigQueryFieldsWhether to allow new fields in BigQuery tables.false
allowBigQueryRequiredFieldRelaxationIf true, fields in BigQuery Schema can be changed from REQUIRED to NULLABLE. Note that allowNewBigQueryFields and allowBigQueryRequiredFieldRelaxation replaced the autoUpdateSchemas parameter of older versions of this connector.false
allowSchemaUnionizationIf true, the existing table schema (if one is present) will be unionized with new record schemas during schema updates. If false, the record of the last schema in a batch will be used for any necessary table creation and schema update attempts.false
kafkaDataFieldNameThe Kafka data field name. The default value is null, which means the Kafka Data field will not be included.null
kafkaKeyFieldNameThe Kafka key field name. The default value is null, which means the Kafka Key field will not be included.null
topic2TableMapMap of topics to tables (optional). Format: comma-separated tuples, e.g. <topic-1>:<table-1>,<topic-2>:<table-2>,.. Note that topic name should not be modified using regex SMT while using this option. Also note that SANITIZE_TOPICS_CONFIG would be ignored if this config is set. Lastly, if the topic2table map doesn’t contain the topic for a record, a table with the same name as the topic name would be created.""