sink
Google Cloud BigQuery Sink Connector
BigQuery Connector integrates Apache Pulsar with Google BigQuery.

Available on
StreamNative Cloud console

Authored by
StreamNative
Support type
streamnative
License
StreamNative, Inc.. All Rights Reserved

The Google Cloud BigQuery sink connector pulls data from Pulsar topics and persists data to Google Cloud BigQuery tables.

Quick start

Data can only be synchronized to Standard BigQuery tables. External tables and View are not supported.

Prerequisites

The prerequisites for connecting an Google BigQuery sink connector to external systems include:

  1. Create GoogleBigQuery, DataSet in Google Cloud.
  2. Create the Gcloud ServiceAccount and create a public key certificate.
  3. Create the Gcloud Role, ensure the Google Cloud role have the following permissions to the Google BigQuery API:
- bigquery.tables.create
- bigquery.tables.get
- bigquery.tables.getData
- bigquery.tables.list
- bigquery.tables.update
- bigquery.tables.updateData
  1. Grant the service account the above role permissions.

1. Create a connector

The following command shows how to use pulsarctl to create a builtin connector. If you want to create a non-builtin connector, you need to replace --sink-type bigquery with --archive /path/to/pulsar-io-bigquery.nar. You can find the button to download the nar package at the beginning of the document.

For StreamNative Cloud User

If you are a StreamNative Cloud user, you need set up your environment first.

pulsarctl sinks create \
  --sink-type bigquery \
  --name bigquery-sink \
  --tenant public \
  --namespace default \
  --inputs "Your topic name" \
  --parallelism 1 \
  --sink-config \
  '{
    "projectId": "Your BigQuery project Id", 
    "datasetName": "Your Bigquery DataSet name",
    "tableName": "The name of the table you want to write data to is automatically created by default",
    "credentialJsonString": "Public key certificate you created above"
  }'

The --sink-config is the minimum necessary configuration for starting this connector, and it is a JSON string. You need to substitute the relevant parameters with your own. If you want to configure more parameters, see Configuration Properties for reference.

Note

You can also choose to use a variety of other tools to create a connector:

2. Send messages to the topic

Note

If your connector is created on StreamNative Cloud, you need to authenticate your clients. See Build applications using Pulsar clients for more information.

@Data
@ToString
public class TestMessage {
    private String testString;
    private String testInt;
 
    public static void main(String[] args) {
        PulsarClient client = PulsarClient.builder()
        .serviceUrl("{{Your Pulsar URL}}")
        .build();

        Producer<TestMessage> producer = client.newProducer(Schema.AVRO(TestMessage.class))
            .topic("{{Your topic name}}")
            .create();

        AvroDataConvertTestIntegration testMessage = new AvroDataConvertTestIntegration();
        testMessage.setTestString("test string");
        testMessage.setTestInt(123);
        MessageId msgID = producer.send(testMessage);
        System.out.println("Publish " + testMessage + " and message ID " + msgID);
        
        producer.flush();
        producer.close();
        client.close();  
    }
}

3. Show data on Google BigQuery

This connector will automatically create the table structure according to the schema. You can use sql to query the data in the console.

SELECT * FROM `{{Your project id}}.{{Your dataset name}}.{{Your table name}}`

+-----------------+-----------------+--------------------------------+----------------------------+-------------+---------+
| __meessage_id__ | __sequence_id__ |         __event_time__         |     __producer_name__      | testString  | testInt |
+-----------------+-----------------+--------------------------------+----------------------------+-------------+---------+
| 9:20:-1         |               0 | 2023-09-14 14:05:29.657000 UTC | test-bigquery-produce-name | test string |     123 |
+-----------------+-----------------+--------------------------------+----------------------------+-------------+---------+

Configuration Properties

Before using the Google Cloud BigQuery sink connector, you need to configure it. This table outlines the properties and the descriptions.

NameTypeRequiredSensitiveDefaultDescription
projectIdStringYesfalse"" (empty string)The Google BigQuery project ID.
datasetNameStringYesfalse"" (empty string)The Google BigQuery dataset name.
tableNameStringYesfalse"" (empty string)The Google BigQuery table name.
credentialJsonStringStringYestrue"" (empty string)The authentication JSON key. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your service account key when the credentialJsonString is set to an empty string. For details, see the Google documentation.
visibleModelStringNofalse"Committed"The mode that controls when data written to the stream becomes visible in BigQuery for reading. For details, see the Google documentation. Available options are Committed and Pending.
pendingMaxSizeintNofalse10000The maximum number of messages waiting to be committed in Pending mode.
batchMaxSizeintNofalse20The maximum number of batch messages. The actual batch bytes size cannot exceed 10 MB. If it does, the batch will be flushed first. https://cloud.google.com/bigquery/quotas
batchMaxTimelongNofalse5000The maximum batch waiting time (in units of milliseconds).
batchFlushIntervalTimelongNofalse2000The batch flush interval (in units of milliseconds).
failedMaxRetryNumintNofalse20The maximum retries when appending fails. By default, it sets 2 seconds for each retry.
autoCreateTablebooleanNofalsetrueAutomatically create a table if no table is available.
autoUpdateTablebooleanNofalsetrueAutomatically update the table schema if the BigQuery table schema is incompatible with the Pulsar schema.
partitionedTablesbooleanNofalsetrueCreate a partitioned table when the table is automatically created. It will use the __event_time__ as the partition key.
partitionedTableIntervalDayintNofalse7The number of days between partitioning of the partitioned table.
clusteredTablesbooleanNofalsetrueCreate a clustered table when the table is automatically created. It will use the __message_id__ as the cluster key.
defaultSystemFieldStringNofalse"" (empty string)Create the system fields when the table is automatically created. You can use commas to separate multiple fields. The supported system fields are: __schema_version__ , __partition__ , __event_time__, __publish_time__ , __message_id__ , __sequence_id__ , __producer_name__ and __properties__. The __properties__ will be a repeat struct on bigquery. key and value will as a string type..

Advanced features

Delivery guarantees

The Pulsar IO connector framework provides three delivery guarantees: at-most-once, at-least-once, and effectively-once.

Currently, the Google Cloud BigQuery sink connector only provides the at-least-once delivery guarantee.

Tables schema

The Google Cloud BigQuery sink connector supports automatically creating and updating a table’s schema based on the Pulsar topic schema. You can configure the following options:

autoCreataTables = true
autoUpdateSchema = true

If the Pulsar topic schema and BigQuery schema are different, the Google Cloud BigQuery sink connector updates schemas by merging them together. The Google Cloud BigQuery sink connector supports mapping schema structures to the BigQuery RECORD TYPE.

In addition, the Google Cloud BigQuery sink connector supports writing some Pulsar-specific fields, as shown below:

#
# optional: __schema_version__ , __partition__   , __event_time__    , __publish_time__  
#           __message_id__     , __sequence_id__ , __producer_name__ , __key__ , __properties__ 
#
defaultSystemField = __event_time__,__message_id__

Note

The Google Cloud BigQuery sink connector does not delete any fields. If you change a field name in a Pulsar topic, the Google Cloud BigQuery sink connector will preserve both fields.

This table lists the schema types that currently are supported to be converted.

SchemaSupported
AVROYes
PRIMITIVEYes
PROTOBUF_NATIVEYes
PROTOBUFNo
JSONNo
KEY_VALUENo

Partitioned tables

Note

This feature is only available when autoCreateTable is set to true. If you create a table manually, you need to manually specify the partition key.

BigQuery supports partitioned tables. Partitioned tables can improve query and control costs by reducing the data read from the table. The Google Cloud BigQuery sink connector provides an option to create a partitioned table. The partitioned tables use the event_time as the partition key.

partitioned-tables = true

Clustered tables

Note

This feature is only available when autoCreateTable is set to true. If you create a table manually, you need to manually specify the cluster key.

Clustered tables can improve the performance of certain queries, such as queries that use filter clauses and queries that aggregate data. The Google Cloud BigQuery sink connector provides an option to create a clustered table. The clustered tables use the message_id as the cluster key.

clustered-tables = true

Multiple tasks

You can leverage the Pulsar Functions scheduling mechanism to configure parallelism of the Google Cloud BigQuery sink connector. You can schedule multiple sink instances to run on different Function worker nodes. These sink instances consume messages according to the configured subscription mode.

parallelism = 4

Note

It is an effective way to increase parallelism when you encounter write bottlenecks. In addition, you need to pay attention to whether the write rate is greater than BigQuery Rate Limits

Batch progress

To increase write throughput, the Google Cloud BigQuery sink connector supports configuring the batch size. You can set the batch size and latency using the following options.

batchMaxSize = 100
batchMaxTime = 4000
batchFlushIntervalTime = 2000