sink
Google Cloud BigQuery Sink Connector
BigQuery Connector integrates Apache Pulsar with Google BigQuery.

Available on
StreamNative Cloud console

Authored by
shibd,danpi,codelipenghui,streamnativebot
Support type
streamnative
License
Business License

The Google Cloud BigQuery sink connector pulls data from Pulsar topics and persists data to Google Cloud BigQuery tables.

Features

This section describes features of the Google Cloud BigQuery sink connector. For details about how to configure these features, see how to configure.

Delivery guarantees

The Pulsar IO connector framework provides three delivery guarantees: at-most-once, at-least-once, and effectively-once.

Currently, the Google Cloud BigQuery sink connector only provides the at-least-once delivery guarantee.

Tables schema

The Google Cloud BigQuery sink connector supports automatically creating and updating a table’s schema based on the Pulsar topic schema. You can configure the following options:

autoCreataTables = true
autoUpdateSchema = true

If the Pulsar topic schema and BigQuery schema are different, the Google Cloud BigQuery sink connector updates schemas by merging them together. The Google Cloud BigQuery sink connector supports mapping schema structures to the BigQuery RECORD TYPE.

In addition, the Google Cloud BigQuery sink connector supports writing some Pulsar-specific fields, as shown below:

#
# optional: __schema_version__ , __partition__   , __event_time__    , __publish_time__  
#           __message_id__     , __sequence_id__ , __producer_name__ , __key__           , __properties__ 
#
defaultSystemField = __event_time__,__message_id__

Note

The Google Cloud BigQuery sink connector does not delete any fields. If you change a field name in a Pulsar topic, the Google Cloud BigQuery sink connector will preserve both fields.

This table lists the schema types that currently are supported to be converted.

SchemaSupported
AVROYes
PRIMITIVEYes
JSONNo
KEY_VALUENo
PROTOBUFNo
PROTOBUF_NATIVENo

Partitioned tables

Note

This feature is only available when autoCreateTable is set to true. If you create a table manually, you need to manually specify the partition key.

BigQuery supports partitioned tables. Partitioned tables can improve query and control costs by reducing the data read from the table. The Google Cloud BigQuery sink connector provides an option to create a partitioned table. The partitioned tables use the event_time as the partition key.

partitioned-tables = true

Clustered tables

Note

This feature is only available when autoCreateTable is set to true. If you create a table manually, you need to manually specify the cluster key.

Clustered tables can improve the performance of certain queries, such as queries that use filter clauses and queries that aggregate data. The Google Cloud BigQuery sink connector provides an option to create a clustered table. The clustered tables use the message_id as the cluster key.

clustered-tables = true

Multiple tasks

You can leverage the Pulsar Functions scheduling mechanism to configure parallelism of the Google Cloud BigQuery sink connector. You can schedule multiple sink instances to run on different Function worker nodes. These sink instances consume messages according to the configured subscription mode.

parallelism = 4

Note

It is an effective way to increase parallelism when you encounter write bottlenecks. In addition, you need to pay attention to whether the write rate is greater than BigQuery Rate Limits

Batch progress

To increase write throughput, the Google Cloud BigQuery sink connector supports configuring the batch size. You can set the batch size and latency using the following options.

batchMaxSize = 100
batchMaxTime = 4000
batchFlushIntervalTime = 2000

How to get

This section describes how to build the Google Cloud BigQuery sink connector.

Work with Function Worker

You can get the Google Cloud BigQuery sink connector using one of the following methods if you use Pulsar Function Worker to run the connector in a cluster.

To build the Google Cloud BigQuery sink connector from the source code, follow these steps.

  1. Clone the source code to your machine.

    git clone https://github.com/streamnative/pulsar-io-bigquery
    
  2. Build the connector in the pulsar-io-bigquery directory.

    mvn clean install -DskipTests
    

    After the connector is successfully built, a JAR package is generated under the target directory.

    ls target
    pulsar-io-bigquery-2.8.4.2.jar
    

Work with Function Mesh

You can pull the Google Cloud BigQuery sink connector Docker image from the Docker Hub if you use Function Mesh to run the connector.

How to configure

Before using the Google Cloud BigQuery sink connector, you need to configure it. This table outlines the properties and the descriptions.

NameTypeRequiredDefaultDescription
projectIdStringYes"" (empty string)The Google BigQuery project ID.
datasetNameStringYes"" (empty string)The Google BigQuery dataset name.
tableNameStringYes"" (empty string)The Google BigQuery table name.
credentialJsonStringStringNo"" (empty string)The authentication JSON key. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your service account key when the credentialJsonString is set to an empty string. For details, see the Google documentation.
visibleModelStringNo"Committed"The mode that controls when data written to the stream becomes visible in BigQuery for reading. For details, see the Google documentation. Available options are Committed and Pending.
pendingMaxSizeintNo10000The maximum number of messages waiting to be committed in Pending mode.
batchMaxSizeintNo20The maximum number of batch messages.
batchMaxTimelongNo5000The maximum batch waiting time (in units of milliseconds).
batchFlushIntervalTimelongNo2000The batch flush interval (in units of milliseconds).
failedMaxRetryNumintNo20The maximum retries when appending fails. By default, it sets 2 seconds for each retry.
autoCreateTablebooleanNotrueAutomatically create a table if no table is available.
autoUpdateTablebooleanNotrueAutomatically update the table schema if the BigQuery table schema is incompatible with the Pulsar schema.
partitionedTablesbooleanNotrueCreate a partitioned table when the table is automatically created. It will use the __event_time__ as the partition key.
partitionedTableIntervalDayintNo7The number of days between partitioning of the partitioned table.
clusteredTablesbooleanNotrueCreate a clustered table when the table is automatically created. It will use the __message_id__ as the cluster key.
defaultSystemFieldStringNo"" (empty string)Create the system fields when the table is automatically created. You can use commas to separate multiple fields. The supported system fields are: __schema_version__ , __partition__ , __event_time__, __publish_time__ , __message_id__ , __sequence_id__ , and __producer_name__.

Note

The provided Google Cloud credentials must have permission to access Google Cloud resources. To use the Google Cloud BigQuery sink connector, ensure the Google Cloud credentials have the following permissions to the Google BigQuery API:

  • bigquery.jobs.create
  • bigquery.tables.create
  • bigquery.tables.get
  • bigquery.tables.getData
  • bigquery.tables.list
  • bigquery.tables.update
  • bigquery.tables.updateData For more information about Google BigQuery API permissions, see Google Cloud BigQuery API permissions: Access control.

Work with Function Worker

You can create a configuration file (JSON or YAML) to set the properties if you use Pulsar Function Worker to run connectors in a cluster.

Example

  • JSON

     {
         "name": "google-bigquery-sink",
         "archive": "connectors/pulsar-io-bigquery-2.8.4.2.jar",
         "className": "org.apache.pulsar.ecosystem.io.bigquery.BigQuerySink",
         "tenant": "public",
         "namespace": "default",
         "inputs": [
           "test-google-bigquery-pulsar"
         ],
         "parallelism": 1,
         "configs": {
           "projectId": "SECRETS",
           "datasetName": "pulsar-io-google-bigquery",
           "tableName": "test-google-bigquery-sink",
           "credentialJsonString": "SECRETS"
       }
     }
    
  • YAML

     name: google-bigquery-sink
     archive: 'connectors/pulsar-io-bigquery-2.8.4.2.jar'
     className: org.apache.pulsar.ecosystem.io.bigquery.BigQuerySink
     tenant: public
     namespace: default
     inputs:
       - test-google-bigquery-pulsar
     parallelism: 1
     configs:
       projectId: SECRETS
       datasetName: pulsar-io-google-bigquery
       tableName: test-google-bigquery-sink
       credentialJsonString: SECRETS
    

Work with Function Mesh

You can create a CustomResourceDefinitions (CRD) to create a Google Cloud BigQuery sink connector. Using CRD makes Function Mesh naturally integrate with the Kubernetes ecosystem. For more information about Pulsar sink CRD configurations, see sink CRD configurations.

You can define a CRD file (YAML) to set the properties as below.

apiVersion: compute.functionmesh.io/v1alpha1
kind: Sink
metadata:
  name: google-bigquery-sink-sample
spec:
  image: streamnative/pulsar-io-bigquery:2.8.4.2
  className: org.apache.pulsar.ecosystem.io.bigquery.BigQuerySink
  replicas: 1
  maxReplicas: 1
  input:
    topics: 
      - persistent://public/default/test-google-bigquery-pulsar
  sinkConfig:
     projectId: SECRETS
     datasetName: pulsar-io-google-bigquery
     tableName: test-google-bigquery-sink
     credentialJsonString: SECRETS
  pulsar:
    pulsarConfig: "test-pulsar-sink-config"
  resources:
    limits:
    cpu: "0.2"
    memory: 1.1G
    requests:
    cpu: "0.1"
    memory: 1G
  java:
    jar: connectors/pulsar-io-bigquery-2.8.4.2.jar
  clusterName: test-pulsar
  autoAck: false

How to use

You can use the Google Cloud BigQuery sink connector with Function Worker or Function Mesh.

Work with Function Worker

Note

Currently, the Google Cloud BigQuery sink connector cannot run as a built-in connector as it uses the JAR package.

  1. Start a Pulsar cluster in standalone mode.

    PULSAR_HOME/bin/pulsar standalone
    
  2. Run the Google Cloud BigQuery sink connector.

    PULSAR_HOME/bin/pulsar-admin sinks localrun \
    --sink-config-file <google-bigquery-sink-config.yaml>
    --archive <pulsar-io-bigquery-2.8.4.2.jar>
    

    Or, you can create a connector for the Pulsar cluster.

    PULSAR_HOME/bin/pulsar-admin sinks create \
    --sink-config-file <google-bigquery-sink-config.yaml>
    --archive <pulsar-io-bigquery-2.8.4.2.jar>
    
  3. Send messages to a Pulsar topic.

    This example sends ten “hello” messages to the test-google-bigquery-pulsar topic in the default namespace of the public tenant.

    PULSAR_HOME/bin/pulsar-client produce public/default/test-google-bigquery-pulsar --messages hello -n 10
    
  4. Query data using Google BigQuery.

For details, see Query a public dataset with the Google Cloud console.

Work with Function Mesh

This example describes how to create a Google Cloud BigQuery sink connector for a Kubernetes cluster using Function Mesh.

Prerequisites

Step

  1. Define the Google Cloud BigQuery sink connector with a YAML file and save it as sink-sample.yaml.

    This example shows how to publish the Google Cloud BigQuery sink connector to Function Mesh with a Docker image.

    apiVersion: compute.functionmesh.io/v1alpha1
    kind: Sink
    metadata:
      name: google-bigquery-sink-sample
    spec:
      image: streamnative/pulsar-io-bigquery:2.8.4.2
      className: org.apache.pulsar.ecosystem.io.bigquery.BigQuerySink
      replicas: 1
      maxReplicas: 1
      input:
        topics: 
          - persistent://public/default/test-google-bigquery-pulsar
      sinkConfig:
         projectId: SECRETS
         datasetName: pulsar-io-google-bigquery
         tableName: test-google-bigquery-sink
         credentialJsonString: SECRETS
      pulsar:
        pulsarConfig: "test-pulsar-sink-config"
      resources:
        limits:
        cpu: "0.2"
        memory: 1.1G
        requests:
        cpu: "0.1"
        memory: 1G
      java:
        jar: connectors/pulsar-io-bigquery-2.8.4.2.jar
      clusterName: test-pulsar
      autoAck: false
    
  2. Apply the YAML file to create the Google Cloud BigQuery sink connector.

    Input

    kubectl apply -f <path-to-sink-sample.yaml>
    

    Output

    sink.compute.functionmesh.io/google-bigquery-sink-sample created
    
  3. Check whether the Google Cloud BigQuery sink connector is created successfully.

    Input

    kubectl get all
    

    Output

    NAME                                         READY   STATUS      RESTARTS   AGE
    pod/google-bigquery-sink-sample-0               1/1    Running     0          77s
    

    After that, you can produce and consume messages using the Google Cloud BigQuery sink connector between Pulsar and Google BigQuery.