> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

> The official Google Cloud Pub/Sub Sink connector.

# Kafka connect google pubsub sink

The Google Cloud Pub/Sub Group Kafka Connector library provides Google Cloud
Platform (GCP) first-party connectors for Pub/Sub products with
[Kafka Connect](http://kafka.apache.org/documentation.html#connect).
You can use the library to transmit data from
[Apache Kafka](http://kafka.apache.org)
to [Cloud Pub/Sub](https://cloud.google.com/pubsub/docs/) or
[Pub/Sub Lite](https://cloud.google.com/pubsub/lite/docs) and vice versa.

### Prerequisites

You must have a GCP project in order to use Cloud Pub/Sub or Pub/Sub Lite.

Follow these [setup steps](https://cloud.google.com/pubsub/docs/publish-receive-messages-client-library#before-you-begin)
for Pub/Sub before doing the [quickstart](#quickstart).

Follow these [setup steps](https://cloud.google.com/pubsub/lite/docs/publish-receive-messages-console#before-you-begin)
for Pub/Sub Lite before doing the [quickstart](#quickstart).

For general information on how to authenticate with GCP when using the Google
Cloud Pub/Sub Group Kafka Connector library, please visit [Provide credentials
for Application Default Credentials](https://cloud.google.com/docs/authentication/provide-credentials-adc).

### Quick Start

1. Setup the kcctl client: [doc](https://docs.streamnative.io/docs/kafka-connect-setup)
2. Create a Pub/Sub topic in your GCP project.
3. Create a JSON file like the following:

```json theme={null}
{
    "name": "pubsub-sink",
    "config": {
        "connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
        "cps.project": "${GCP_PROJECT_ID}",
        "cps.topic": "${PUBSUB_TOPIC_NAME}",
        "gcp.credentials.json": "${GCP_CREDENTIALS_JSON}",
        "topics": "${KAFKA_TOPIC_NAME}"
    }
}
```

4. Run the following command to create the connector:

```shell theme={null}
kcctl apply -f <filename>.json
```

### Configuration

The Google Pub/Sub sink connector is configured using the following properties:

| Config                     | Type                          | Default                     | Description                                                                                                                                                                                                                                                                                                                         |
| -------------------------- | ----------------------------- | --------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| cps.topic                  | String                        | REQUIRED (No default)       | The Pub/Sub topic ID, e.g. "foo" for topic "/projects/bar/topics/foo".                                                                                                                                                                                                                                                              |
| cps.project                | String                        | REQUIRED (No default)       | The project containing the Pub/Sub topic, e.g. "bar" from above.                                                                                                                                                                                                                                                                    |
| cps.endpoint               | String                        | "pubsub.googleapis.com:443" | The Pub/Sub endpoint to use.                                                                                                                                                                                                                                                                                                        |
| maxBufferSize              | Integer                       | 100                         | The maximum number of messages that can be received for the messages on a topic partition before publishing them to Pub/Sub.                                                                                                                                                                                                        |
| maxBufferBytes             | Long                          | 10,000,000                  | The maximum number of bytes that can be received for the messages on a topic partition before publishing them to Pub/Sub.                                                                                                                                                                                                           |
| maxOutstandingRequestBytes | Long                          | Long.MAX\_VALUE             | The maximum number of total bytes that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing.                                                                                                                                                                            |
| maxOutstandingMessages     | Long                          | Long.MAX\_VALUE             | The maximum number of messages that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing.                                                                                                                                                                               |
| maxDelayThresholdMs        | Integer                       | 100                         | The maximum amount of time to wait to reach maxBufferSize or maxBufferBytes before publishing outstanding messages to Pub/Sub.                                                                                                                                                                                                      |
| maxRequestTimeoutMs        | Integer                       | 10,000                      | The timeout for individual publish requests to Pub/Sub.                                                                                                                                                                                                                                                                             |
| maxTotalTimeoutMs          | Integer                       | 60,000                      | The total timeout for a call to publish (including retries) to Pub/Sub.                                                                                                                                                                                                                                                             |
| maxShutdownTimeoutMs       | Integer                       | 60,000                      | The maximum amount of time to wait for a publisher to shutdown when stopping task in Kafka Connect.                                                                                                                                                                                                                                 |
| gcp.credentials.file.path  | String                        | Optional                    | The filepath, which stores GCP credentials. If not defined, GOOGLE\_APPLICATION\_CREDENTIALS env is used.                                                                                                                                                                                                                           |
| gcp.credentials.json       | String                        | Optional                    | GCP credentials JSON blob. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value.                                                                                                                                                             |
| metadata.publish           | Boolean                       | false                       | When true, include the Kafka topic, partition, offset, and timestamp as message attributes when a message is published to Pub/Sub.                                                                                                                                                                                                  |
| headers.publish            | Boolean                       | false                       | When true, include any headers as attributes when a message is published to Pub/Sub.                                                                                                                                                                                                                                                |
| orderingKeySource          | String (none, key, partition) | none                        | When set to "none", do not set the ordering key. When set to "key", uses a message's key as the ordering key. If set to "partition", converts the partition number to a String and uses that as the ordering key. Note that using "partition" should only be used for low-throughput topics or topics with thousands of partitions. |
| messageBodyName            | String                        | "cps\_message\_body"        | When using a struct or map value schema, this field or key name indicates that the corresponding value will go into the Pub/Sub message body.                                                                                                                                                                                       |
| enableCompression          | Boolean                       | false                       | When true, enable publish-side compression in order to save on networking costs between Kafka Connect and Cloud Pub/Sub.                                                                                                                                                                                                            |
| compressionBytesThreshold  | Long                          | 240                         | When enableCompression is true, the minimum size of publish request (in bytes) to compress.                                                                                                                                                                                                                                         |

The full properties are also available from the [official Google Pub/Sub Kafka Sink Connector documentation](https://github.com/googleapis/java-pubsub-group-kafka-connector?tab=readme-ov-file#sink-connector).
