The Google Cloud Pub/Sub source connector feeds data from Google Cloud Pub/Sub topics and writes data to Pulsar topics.
Quick start
Prerequisites
The prerequisites for connecting an Google PubSub source connector to external systems include:
- Create Google PubSub Topic in Google Cloud.
- Create the Gcloud ServiceAccount and create a public key certificate.
- Create the Gcloud Role, ensure the Google Cloud role have the following permissions:
- pubsub.subscriptions.consume
- pubsub.subscriptions.create
- pubsub.subscriptions.get
- pubsub.subscriptions.update
- pubsub.topics.attachSubscription
- Grant the service account the above role permissions.
1. Create a connector
The following command shows how to use pulsarctl to create a builtin
connector. If you want to create a non-builtin
connector, you need to replace --source-type google-pubsub
with --archive /path/to/pulsar-io-google-pubsub.nar
. You can find the button to download the nar
package at the beginning of the document.
For StreamNative Cloud User
If you are a StreamNative Cloud user, you need set up your environment first.
pulsarctl sources create \
--source-type google-pubsub \
--name pubsub-source \
--tenant public \
--namespace default \
--destination-topic-name "Your topic name" \
--parallelism 1 \
--source-config \
'{
"pubsubProjectId": "Your google pubsub project Id",
"pubsubTopicId": "Your google pubsub Topic name",
"pubsubCredential": "The escaped and compressed public key certificate you created above"
}'
The --source-config
is the minimum necessary configuration for starting this connector, and it is a JSON string. You need to substitute the relevant parameters with your own. If you want to configure more parameters, see Configuration Properties for reference.
Note
You can also choose to use a variety of other tools to create a connector:
- pulsar-admin: The command arguments for
pulsar-admin
are similar to those ofpulsarctl
. You can find an example for StreamNative Cloud Doc. - RestAPI: You can find an example for StreamNative Cloud Doc.
- Terraform: You can find an example for StreamNative Cloud Doc.
- Function Mesh: The docker image can be found at the beginning of the document.
2. Write data to Google PubSub topic
Send some messages to the Google Cloud PubSub using the gcloud CLI tool
gcloud pubsub topics publish {{Your PubSub Topic Name}} --message="my-message-0"
gcloud pubsub topics publish {{Your PubSub Topic Name}} --message="my-message-1"
gcloud pubsub topics publish {{Your PubSub Topic Name}} --message="my-message-2"
gcloud pubsub topics publish {{Your PubSub Topic Name}} --message="my-message-3"
gcloud pubsub topics publish {{Your PubSub Topic Name}} --message="my-message-4"
gcloud pubsub topics publish {{Your PubSub Topic Name}} --message="my-message-5"
gcloud pubsub topics publish {{Your PubSub Topic Name}} --message="my-message-6"
gcloud pubsub topics publish {{Your PubSub Topic Name}} --message="my-message-7"
gcloud pubsub topics publish {{Your PubSub Topic Name}} --message="my-message-8"
gcloud pubsub topics publish {{Your PubSub Topic Name}} --message="my-message-9"
3. Show data by Pulsar Consumer
Note
If your connector is created on StreamNative Cloud, you need to authenticate your clients. See Build applications using Pulsar clients for more information.
public static void main(String[] args) {
PulsarClient client = PulsarClient.builder()
.serviceUrl("{{Your Pulsar URL}}")
.build();
Consumer<GenericRecord> consumer = client.newConsumer(Schema.AUTO_CONSUME())
.topic("{{The topic name that you specified when you created the connector}}")
.subscriptionName(subscription)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Consumer<byte[]> consumer = client.newConsumer()
.topic("{{The topic name that you specified when you created the connector}}")
.subscriptionName("test-sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
for (int i = 0; i < 10; i++) {
Message<byte[]> msg = consumer.receive();
consumer.acknowledge(msg);
System.out.println("Receive message " + new String(msg.getData()));
}
client.close();
}
// output
// Receive message my-message-0
// Receive message my-message-1
// Receive message my-message-2
// Receive message my-message-3
// Receive message my-message-4
// Receive message my-message-5
// Receive message my-message-6
// Receive message my-message-7
// Receive message my-message-8
// Receive message my-message-9
Configuration Properties
Before using the Google PubSub source connector, you need to configure it. This table outlines the properties and the descriptions.
Name | Type | Required | Sensitive | Default | Description |
---|---|---|---|---|---|
pubsubCredential | String | true | true | "" (empty string) | The credential (JSON string) for accessing the Google Cloud. It needs to be compressed and escaping before use. |
pubsubProjectId | String | true | false | "" (empty string) | The Google Cloud project ID. |
pubsubTopicId | String | true | false | " " (empty string) | The topic ID. It is used to read messages from or write messages to Google Cloud Pub/Sub topics. |