The Google Cloud Pub/Sub Group Kafka Connector library provides Google Cloud Platform (GCP) first-party connectors for Pub/Sub products with Kafka Connect. You can use the library to transmit data from Apache Kafka to Cloud Pub/Sub or Pub/Sub Lite and vice versa.Documentation Index
Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
Use this file to discover all available pages before exploring further.
Prerequisites
You must have a GCP project in order to use Cloud Pub/Sub or Pub/Sub Lite. Follow these setup steps for Pub/Sub before doing the quickstart. Follow these setup steps for Pub/Sub Lite before doing the quickstart. For general information on how to authenticate with GCP when using the Google Cloud Pub/Sub Group Kafka Connector library, please visit Provide credentials for Application Default Credentials.Quick Start
- Setup the kcctl client: doc
- Create a Pub/Sub topic in your GCP project.
- Create a JSON file like the following:
- Run the following command to create the connector:
Configuration
The Google Pub/Sub sink connector is configured using the following properties:| Config | Type | Default | Description |
|---|---|---|---|
| cps.topic | String | REQUIRED (No default) | The Pub/Sub topic ID, e.g. “foo” for topic “/projects/bar/topics/foo”. |
| cps.project | String | REQUIRED (No default) | The project containing the Pub/Sub topic, e.g. “bar” from above. |
| cps.endpoint | String | ”pubsub.googleapis.com:443” | The Pub/Sub endpoint to use. |
| maxBufferSize | Integer | 100 | The maximum number of messages that can be received for the messages on a topic partition before publishing them to Pub/Sub. |
| maxBufferBytes | Long | 10,000,000 | The maximum number of bytes that can be received for the messages on a topic partition before publishing them to Pub/Sub. |
| maxOutstandingRequestBytes | Long | Long.MAX_VALUE | The maximum number of total bytes that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing. |
| maxOutstandingMessages | Long | Long.MAX_VALUE | The maximum number of messages that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing. |
| maxDelayThresholdMs | Integer | 100 | The maximum amount of time to wait to reach maxBufferSize or maxBufferBytes before publishing outstanding messages to Pub/Sub. |
| maxRequestTimeoutMs | Integer | 10,000 | The timeout for individual publish requests to Pub/Sub. |
| maxTotalTimeoutMs | Integer | 60,000 | The total timeout for a call to publish (including retries) to Pub/Sub. |
| maxShutdownTimeoutMs | Integer | 60,000 | The maximum amount of time to wait for a publisher to shutdown when stopping task in Kafka Connect. |
| gcp.credentials.file.path | String | Optional | The filepath, which stores GCP credentials. If not defined, GOOGLE_APPLICATION_CREDENTIALS env is used. |
| gcp.credentials.json | String | Optional | GCP credentials JSON blob. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value. |
| metadata.publish | Boolean | false | When true, include the Kafka topic, partition, offset, and timestamp as message attributes when a message is published to Pub/Sub. |
| headers.publish | Boolean | false | When true, include any headers as attributes when a message is published to Pub/Sub. |
| orderingKeySource | String (none, key, partition) | none | When set to “none”, do not set the ordering key. When set to “key”, uses a message’s key as the ordering key. If set to “partition”, converts the partition number to a String and uses that as the ordering key. Note that using “partition” should only be used for low-throughput topics or topics with thousands of partitions. |
| messageBodyName | String | ”cps_message_body” | When using a struct or map value schema, this field or key name indicates that the corresponding value will go into the Pub/Sub message body. |
| enableCompression | Boolean | false | When true, enable publish-side compression in order to save on networking costs between Kafka Connect and Cloud Pub/Sub. |
| compressionBytesThreshold | Long | 240 | When enableCompression is true, the minimum size of publish request (in bytes) to compress. |