sink
Google Cloud Pub/Sub Sink Connector
Authored by
nodece,Huanli-Meng,shibd,nicoloboschi
Support type
streamnative
License
Business License

The Google Cloud Pub/Sub sink connector pulls data from Pulsar topics and persists data to Google Cloud Pub/Sub topics.

How to get

This section describes how to build the Google Cloud Pub/Sub sink connector.

Work with Function Worker

You can get the Google Cloud Pub/Sub sink connector using one of the following methods if you use Pulsar Function Worker to run connectors in a cluster.

To build the Google Cloud Pub/Sub sink connector from the source code, follow these steps.

  1. Clone the source code to your machine.

    git clone https://github.com/streamnative/pulsar-io-google-pubsub
    
  2. Build the connector in the pulsar-io-google-pubsub directory.

    mvn clean install -DskipTests
    

    After the connector is successfully built, a NAR package is generated under the target directory.

    ls target
    pulsar-io-google-pubsub-2.11.3.2.nar
    

Work with Function Mesh

You can pull the Google Cloud Pub/Sub sink connector Docker image from the Docker Hub if you use Function Mesh to run the connector.

How to configure

Before using the Google Cloud Pub/Sub sink connector, you need to configure it. This table lists the properties and the descriptions.

NameTypeRequiredDefaultDescription
pubsubEndpointStringfalse"" (empty string)The Google Cloud Pub/Sub end-point URL.
pubsubCredentialStringfalse"" (empty string)The credential (JSON string) for accessing the Google Cloud.
pubsubProjectIdStringtrue"" (empty string)The Google Cloud project ID.
pubsubTopicIdStringtrue" " (empty string)The topic ID. It is used to read messages from or write messages to Google Cloud Pub/Sub topics.
pubsubSchemaIdStringfalse"" (empty string)The schema ID. You must set the schema ID when creating a schema for Google Cloud Pub/Sub topics.
pubsubSchemaTypeStringfalse"" (empty string)The schema type. You must set the schema type when creating a schema for Google Cloud Pub/Sub topics. Currently, only the AVRO format is supported.
pubsubSchemaEncodingStringfalse"" (empty string)The encoding of the schema. You must set the schema encoding when creating a schema for Google Cloud Pub/Sub topics. Currently, only the JSON format is supported.
pubsubSchemaDefinitionStringfalse"" (empty string)The definition of the schema. It is used to create a schema to or parse messages from Google Cloud Pub/Sub topics.

Note

The provided Google Cloud credentials must have permissions to access Google Cloud resources. To use the Google Cloud Pub/Sub sink connector, ensure the Google Cloud credentials have the following permissions to Google Cloud Pub/Sub API:

Work with Function Worker

You can create a configuration file (JSON or YAML) to set the properties if you use Pulsar Function Worker to run connectors in a cluster.

Example

  • JSON

     {
         "tenant": "public",
         "namespace": "default",
         "name": "google-pubsub-sink",
         "inputs": [
           "test-google-pubsub-pulsar"
         ],
         "archive": "connectors/pulsar-io-google-pubsub-2.11.3.2.nar",
         "parallelism": 1,
         "configs": {
           "pubsubCredential": "SECRETS",
           "pubsubProjectId": "pulsar-io-google-pubsub",
           "pubsubTopicId": "test-google-pubsub-sink"
       }
     }
    
  • YAML

    tenant: public
    namespace: default
    name: google-pubsub-sink
    inputs:
      - test-google-pubsub-pulsar
    archive: connectors/pulsar-io-google-pubsub-2.11.3.2.nar
    parallelism: 1
    configs:
      pubsubCredential: 'SECRETS'
      pubsubProjectId: pulsar-io-google-pubsub
      pubsubTopicId: test-google-pubsub-sink
    

Work with Function Mesh

You can create a CustomResourceDefinitions (CRD) to create a Google Cloud Pub/Sub sink connector. Using CRD makes Function Mesh naturally integrate with the Kubernetes ecosystem. For more information about Pulsar sink CRD configurations, see sink CRD configurations.

You can define a CRD file (YAML) to set the properties as below.

apiVersion: compute.functionmesh.io/v1alpha1
kind: Sink
metadata:
  name: google-pubsub-sink-sample
spec:
  image: streamnative/pulsar-io-google-pubsub:2.11.3.2
  className: org.apache.pulsar.ecosystem.io.pubsub.PubsubSink
  replicas: 1
  maxReplicas: 1
  input:
    topics: 
      - persistent://public/default/destination
  sinkConfig:
    pubsubCredential: 'SECRETS'
    pubsubProjectId: pulsar-io-google-pubsub
    pubsubTopicId: test-google-pubsub-sink
  pulsar:
    pulsarConfig: "test-pulsar-sink-config"
  resources:
    limits:
    cpu: "0.2"
    memory: 1.1G
    requests:
    cpu: "0.1"
    memory: 1G
  java:
    jar: connectors/pulsar-io-google-pubsub-2.11.3.2.nar
  clusterName: test-pulsar
  autoAck: true

How to use

You can use the Google Cloud Pub/Sub sink connector with Function Worker or Function Mesh.

Work with Function Worker

You can use the Google Cloud Pub/Sub sink connector as a non built-in connector or a built-in connector.

If you already have a Pulsar cluster, you can use the Google Cloud Pub/Sub sink connector as a non built-in connector directly.

This example shows how to create a Google Cloud Pub/Sub sink connector on a Pulsar cluster using the pulsar-admin sinks create command.

PULSAR_HOME/bin/pulsar-admin sinks create \
--sink-config-file <google-pubsub-sink-config.yaml>

Work with Function Mesh

This example describes how to create a Google Cloud Pub/Sub sink connector for a Kuberbetes cluster using Function Mesh.

Prerequisites

Step

  1. Define the Google Cloud Pub/Sub sink connector with a YAML file and save it as sink-sample.yaml.

    This example shows how to publish the Google Cloud Pub/Sub sink connector to Function Mesh with a Docker image.

    apiVersion: compute.functionmesh.io/v1alpha1
    kind: Sink
    metadata:
      name: google-pubsub-sink-sample
    spec:
      image: streamnative/pulsar-io-google-pubsub:2.11.3.2
      className: org.apache.pulsar.ecosystem.io.pubsub.PubsubSink
      replicas: 1
      maxReplicas: 1
      input:
        topics: 
          - persistent://public/default/destination
        typeClassName:[B”
      sinkConfig:
        pubsubCredential: 'SECRETS'
        pubsubProjectId: pulsar-io-google-pubsub
        pubsubTopicId: test-google-pubsub-sink
      pulsar:
        pulsarConfig: "test-pulsar-sink-config"
      resources:
        limits:
          cpu: "0.2"
          memory: 1.1G
        requests:
          cpu: "0.1"
          memory: 1G
      java:
        jar: connectors/pulsar-io-google-pubsub-2.11.3.2.nar
      clusterName: test-pulsar
      autoAck: true
    
  2. Apply the YAML file to create the Google Cloud Pub/Sub sink connector.

    Input

    kubectl apply -f <path-to-sink-sample.yaml>
    

    Output

    sink.compute.functionmesh.io/google-pubsub-sink-sample created
    
  3. Check whether the Google Cloud Pub/Sub sink connector is created successfully.

    Input

    kubectl get all
    

    Output

    NAME                                         READY   STATUS      RESTARTS   AGE
    pod/google-pubsub-sink-sample-0               1/1    Running     0          77s
    

    After that, you can produce and consume messages using the Google Cloud Pub/Sub sink connector between Pulsar and Google Cloud Pub/Sub.