Skip to main content
The Google Cloud Pub/Sub Group Kafka Connector library provides Google Cloud Platform (GCP) first-party connectors for Pub/Sub products with Kafka Connect. You can use the library to transmit data from Apache Kafka to Cloud Pub/Sub or Pub/Sub Lite and vice versa.

Prerequisites

You must have a GCP project in order to use Cloud Pub/Sub or Pub/Sub Lite. Follow these setup steps for Pub/Sub before doing the quickstart. Follow these setup steps for Pub/Sub Lite before doing the quickstart. For general information on how to authenticate with GCP when using the Google Cloud Pub/Sub Group Kafka Connector library, please visit Provide credentials for Application Default Credentials.

Quick Start

  1. Setup the kcctl client: doc
  2. Create a Pub/Sub topic in your GCP project.
  3. Create a JSON file like the following:
{
    "name": "pubsub-sink",
    "config": {
        "connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
        "cps.project": "${GCP_PROJECT_ID}",
        "cps.topic": "${PUBSUB_TOPIC_NAME}",
        "gcp.credentials.json": "${GCP_CREDENTIALS_JSON}",
        "topics": "${KAFKA_TOPIC_NAME}"
    }
}
  1. Run the following command to create the connector:
kcctl create -f <filename>.json

Configuration

The Google Pub/Sub sink connector is configured using the following properties:
ConfigTypeDefaultDescription
cps.topicStringREQUIRED (No default)The Pub/Sub topic ID, e.g. “foo” for topic “/projects/bar/topics/foo”.
cps.projectStringREQUIRED (No default)The project containing the Pub/Sub topic, e.g. “bar” from above.
cps.endpointString”pubsub.googleapis.com:443”The Pub/Sub endpoint to use.
maxBufferSizeInteger100The maximum number of messages that can be received for the messages on a topic partition before publishing them to Pub/Sub.
maxBufferBytesLong10,000,000The maximum number of bytes that can be received for the messages on a topic partition before publishing them to Pub/Sub.
maxOutstandingRequestBytesLongLong.MAX_VALUEThe maximum number of total bytes that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing.
maxOutstandingMessagesLongLong.MAX_VALUEThe maximum number of messages that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing.
maxDelayThresholdMsInteger100The maximum amount of time to wait to reach maxBufferSize or maxBufferBytes before publishing outstanding messages to Pub/Sub.
maxRequestTimeoutMsInteger10,000The timeout for individual publish requests to Pub/Sub.
maxTotalTimeoutMsInteger60,000The total timeout for a call to publish (including retries) to Pub/Sub.
maxShutdownTimeoutMsInteger60,000The maximum amount of time to wait for a publisher to shutdown when stopping task in Kafka Connect.
gcp.credentials.file.pathStringOptionalThe filepath, which stores GCP credentials. If not defined, GOOGLE_APPLICATION_CREDENTIALS env is used.
gcp.credentials.jsonStringOptionalGCP credentials JSON blob. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value.
metadata.publishBooleanfalseWhen true, include the Kafka topic, partition, offset, and timestamp as message attributes when a message is published to Pub/Sub.
headers.publishBooleanfalseWhen true, include any headers as attributes when a message is published to Pub/Sub.
orderingKeySourceString (none, key, partition)noneWhen set to “none”, do not set the ordering key. When set to “key”, uses a message’s key as the ordering key. If set to “partition”, converts the partition number to a String and uses that as the ordering key. Note that using “partition” should only be used for low-throughput topics or topics with thousands of partitions.
messageBodyNameString”cps_message_body”When using a struct or map value schema, this field or key name indicates that the corresponding value will go into the Pub/Sub message body.
enableCompressionBooleanfalseWhen true, enable publish-side compression in order to save on networking costs between Kafka Connect and Cloud Pub/Sub.
compressionBytesThresholdLong240When enableCompression is true, the minimum size of publish request (in bytes) to compress.
The full properties are also available from the official Google Pub/Sub Kafka Sink Connector documentation.