source
Google Cloud BigQuery Source Connector
BigQuery Connector integrates Apache Pulsar with Google BigQuery.

Available on
StreamNative Cloud console

Authored by
shibd,danpi,codelipenghui,Huanli-Meng
Support type
streamnative
License
Apache License 2.0

The Google Cloud BigQuery Source Connector feeds data from Google Cloud BigQuery tables and writes data to Pulsar topics.

How to get

This section describes how to build the Google Cloud BigQuery source connector.

Work with Function Worker

You can get the Google Cloud BigQuery source connector using one of the following methods:

  1. Clone the source code to your machine.

    git clone https://github.com/streamnative/pulsar-io-bigquery
    
  2. Build the connector in the pulsar-io-bigquery directory.

    mvn clean install -DskipTests
    

    After the connector is successfully built, a JAR package is generated under the target directory.

    ls target
    pulsar-io-bigquery-2.10.5.4.jar
    

Work with Function Mesh

You can pull the Google Cloud BigQuery source connector Docker image from the Docker Hub if you use Function Mesh to run the connector.

How to configure

Before using the Google Cloud BigQuery source connector, you need to configure it. This table lists the properties and the descriptions.

NameTypeRequiredDefaultDescription
projectIdStringYes"" (empty string)The Google BigQuery project ID.
datasetNameStringYes"" (empty string)The Google BigQuery dataset name.
tableNameStringYes"" (empty string)The Google BigQuery table name.
credentialJsonStringStringNo"" (empty string)The authentication JSON key. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your service account key when the credentialJsonString is set to an empty string. For details, see the Google documentation.
queueSizeintNo"" (empty string)The buffer queue size of the source. It is used for storing records before they are sent to Pulsar topics. By default, it is set to 10000.
snapshotTimelongNo-1The snapshot time of the table. If it is not set, it is interpreted as now.
sqlStringNo"" (empty string)The SQL query on BigQuery. The computed result is saved in a temporary table. The temporary table has a configurable expiration time, and the BigQuery source connector automatically deletes the temporary table when the data is transferred completely. The projectId and datasetName gets values from the configuration file, and the tableName is generated by UUID.
expirationTimeInMinutesintNo1440The expiration time in minutes until the table is expired and auto-deleted.
maxParallelismintNo1The maximum parallelism for reading. In fact, the number may be less if the BigQuery source connector deems the data small enough.
selectedFieldsStringNo"" (empty string)Names of the fields in the table that should be read.
filtersStringNo"" (empty string)A list of clauses that can filter the result of the table.
checkpointIntervalSecondsintNo60The checkpoint interval (in units of seconds). By default, it is set to 60s.

Note

The provided Google Cloud credentials must have permission to access Google Cloud resources. To use the Google Cloud BigQuery source connector, ensure the Google Cloud credentials have the following permissions to the Google BigQuery API:

  • bigquery.jobs.create
  • bigquery.tables.create
  • bigquery.tables.get
  • bigquery.tables.getData
  • bigquery.tables.list
  • bigquery.tables.update
  • bigquery.tables.updateData For more information about Google BigQuery API permissions, see Google Cloud BigQuery API permissions: Access control.

Work with Function Worker

You can create a configuration file (JSON or YAML) to set the properties if you use Pulsar Function Worker to run connectors in a cluster.

Example

  • YAML

      name: google-bigquery-source
      className: org.apache.pulsar.ecosystem.io.bigquery.BigQuerySource
      namespace: default
      topicName: google-bigquery-source-test
      parallelism: 1
      archive: connectors/pulsar-io-bigquery-source.jar
      batchSourceConfig: 
        discoveryTriggererClassName: org.apache.pulsar.ecosystem.io.bigquery.source.BigQueryOnceTrigger
      configs:
        # projectId is BigQuery project id.
        #
        # This field is *required*.
        #
        projectId: bigquery-dev-001
        # datasetName is BigQuery dataset name.
        #
        # This field is *required*.
        #
        datasetName: babynames
        # tableName is BigQuery table name.
        #
        # This field is *required*.
        #
        tableName: names2021
        credentialJsonString: SECRETS
    
  • JSON

    {
        "name": "source-test-source",
        "className": "org.apache.pulsar.ecosystem.io.bigquery.BigQuerySource",
        "namespace": "default",
        "topicName": "google-bigquery-source-test",
        "parallelism": 1,
        "archive": "connectors/pulsar-io-bigquery-source.jar",
        "batchSourceConfig": {
        "discoveryTriggererClassName": "org.apache.pulsar.ecosystem.io.bigquery.source.BigQueryOnceTrigger"
        },
        "configs": {
            "projectId": "bigquery-dev-001",
            "datasetName": "babynames",
            "tableName": "names2021",
            "credentialJsonString": "SECRETS"
        }
    }
    

Work with Function Mesh

You can create a CustomResourceDefinitions (CRD) to create a Google Cloud BigQuery source connector. Using CRD makes Function Mesh naturally integrate with the Kubernetes ecosystem. For more information about Pulsar IO source CRD configurations, see source CRD configurations.

You can define a CRD file (YAML) to set the properties as below.

apiVersion: compute.functionmesh.io/v1alpha1
kind: Source
metadata:
   name: google-bigquery-source-sample
spec:
   image: streamnative/pulsar-io-bigquery:2.10.5.4
   className: org.apache.pulsar.functions.source.batch.BatchSourceExecutor
   replicas: 1
   maxReplicas: 1
   output:
      producerConf:
         maxPendingMessages: 1000
         maxPendingMessagesAcrossPartitions: 50000
         useThreadLocalProducers: true
      topic: persistent://public/default/google-bigquery-pulsar-source
   sourceConfig:
      __BATCHSOURCECLASSNAME__: org.apache.pulsar.ecosystem.io.bigquery.BigQuerySource
      __BATCHSOURCECONFIGS__: '{"discoveryTriggererClassName":"org.apache.pulsar.ecosystem.io.bigquery.source.BigQueryOnceTrigger"}'
      projectId: SECRETS
      datasetName: pulsar-io-google-bigquery
      tableName: test-google-bigquery-source
      credentialJsonString: SECRETS
   pulsar:
      pulsarConfig: "test-pulsar-source-config"
   resources:
      limits:
         cpu: "0.2"
         memory: 1.1G
      requests:
         cpu: "0.1"
         memory: 1G
   java:
      jar: connectors/pulsar-io-bigquery-2.10.5.4.jar
   clusterName: pulsar
---
apiVersion: v1
kind: ConfigMap
metadata:
   name: test-pulsar-source-config
data:
   webServiceURL: http://pulsar-broker.default.svc.cluster.local:8080
   brokerServiceURL: pulsar://pulsar-broker.default.svc.cluster.local:6650

How to use

You can use the Google Cloud BigQuery source connector with Function Worker or Function Mesh.

Work with Function Worker

You can use the Google Cloud BigQuery source connector as a non built-in connector or a built-in connector as below.

If you already have a Pulsar cluster, you can use the Google Cloud BigQuery source connector as a non built-in connector directly.

This example shows how to create a Google Cloud BigQuery source connector on a Pulsar cluster using the pulsar-admin sources create command.

PULSAR_HOME/bin/pulsar-admin sources create \
--source-config-file <google-bigquery-source-config.yaml >
--