Skip to main content
The Google Cloud Pub/Sub Group Kafka Connector library provides Google Cloud Platform (GCP) first-party connectors for Pub/Sub products with Kafka Connect. You can use the library to transmit data from Apache Kafka to Cloud Pub/Sub or Pub/Sub Lite and vice versa.

Prerequisites

You must have a GCP project in order to use Cloud Pub/Sub or Pub/Sub Lite. Follow these setup steps for Pub/Sub before doing the quickstart. Follow these setup steps for Pub/Sub Lite before doing the quickstart. For general information on how to authenticate with GCP when using the Google Cloud Pub/Sub Group Kafka Connector library, please visit Provide credentials for Application Default Credentials.

Quick Start

  1. Setup the kcctl client: doc
  2. Create a Pub/Sub topic and a Subscription on this topic in your GCP project.
  3. Create a JSON file like the following:
{
    "name": "pubsub-source",
    "config": {
        "connector.class": "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
        "cps.project": "${GCP_PROJECT_ID}",
        "cps.topic": "${PUBSUB_TOPIC_NAME}",
        "cps.subscription": "${PUBSUB_SUBSCRIPTION_NAME}",
        "gcp.credentials.json": "${GCP_CREDENTIALS_JSON}",
        "kafka.topic": "${KAFKA_TOPIC_NAME}"
    }
}
  1. Run the following command to create the connector:
kcctl create -f <filename>.json

Configuration

The Google Pub/Sub sink connector is configured using the following properties:
ConfigValue RangeDefaultDescription
cps.subscriptionStringREQUIRED (No default)The Pub/Sub subscription ID, e.g. “baz” for subscription “/projects/bar/subscriptions/baz”.
cps.projectStringREQUIRED (No default)The project containing the Pub/Sub subscription, e.g. “bar” from above.
cps.endpointString”pubsub.googleapis.com:443”The Pub/Sub endpoint to use.
kafka.topicStringREQUIRED (No default)The Kafka topic which will receive messages from the Pub/Sub subscription.
cps.maxBatchSizeInteger100The maximum number of messages per batch in a pull request to Pub/Sub.
cps.makeOrderingKeyAttributeBooleanfalseWhen true, copy the ordering key to the set of attributes set in the Kafka message.
kafka.key.attributeStringnullThe Pub/Sub message attribute to use as a key for messages published to Kafka. If set to “orderingKey”, use the message’s ordering key.
kafka.partition.countInteger1The number of Kafka partitions for the Kafka topic in which messages will be published to. NOTE: this parameter is ignored if partition scheme is “kafka_partitioner”.
kafka.partition.schemeround_robin, hash_key, hash_value, kafka_partitioner, ordering_keyround_robinThe scheme for assigning a message to a partition in Kafka. The scheme “round_robin” assigns partitions in a round robin fashion, while the schemes “hash_key” and “hash_value” find the partition by hashing the message key and message value respectively. “kafka_partitioner” scheme delegates partitioning logic to Kafka producer, which by default detects number of partitions automatically and performs either murmur hash based partition mapping or round robin depending on whether message key is provided or not. “ordering_key” uses the hash code of a message’s ordering key. If no ordering key is present, uses “round_robin”.
gcp.credentials.file.pathStringOptionalThe filepath, which stores a GCP Service Account credentials. If not defined, GOOGLE_APPLICATION_CREDENTIALS env is used. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value.
gcp.credentials.jsonStringOptionalGCP Service Account credentials JSON blob. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value.
kafka.record.headersBooleanfalseUse Kafka record headers to store Pub/Sub message attributes.
cps.streamingPull.enabledBooleanfalseWhether to use streaming pull for the connector to connect to Pub/Sub. If provided, cps.maxBatchSize is ignored.
cps.streamingPull.flowControlMessagesLong1,000The maximum number of outstanding messages per task when using streaming pull.
cps.streamingPull.flowControlBytesLong100L * 1024 * 1024 (100 MiB)The maximum number of outstanding message bytes per task when using streaming pull.
cps.streamingPull.parallelStreamsInteger1The number of streams to pull messages from the subscription when using streaming pull.
cps.streamingPull.maxAckExtensionMsLong0The maximum number of milliseconds the subscribe deadline will be extended to in milliseconds when using streaming pull. A value of 0 implies the java-pubsub library default value.
cps.streamingPull.maxMsPerAckExtensionLong0The maximum number of milliseconds to extend the subscribe deadline for at a time when using streaming pull. A value of 0 implies the java-pubsub library default value.
The full properties are also available from the offical Google Pub/Sub Kafka Source Connector documentation.