This document provides examples about how to use the Pulsar C++ client to connect to a Pulsar cluster through a token or an OAuth2 credential file.
This document assumes that you have created a service account, and have granted the service account produce
and consume
permissions to the namespace for the target topic.
Connect to a Pulsar cluster using a token
This section describes how to connect to you Pulsar cluster using a token.
Prerequisites
Steps
To connect to your Pulsar cluster using a token, follow these steps.
-
Connect to the Pulsar cluster.
#include <iostream>
#include <pulsar/Client.h>
using namespace pulsar;
int main() {
ClientConfiguration config;
config.setAuth(AuthToken::create("AUTH_PARAMS"));
Client client("SERVICE_URL", config);
client.close();
}
Set the SERVICE_URL
and AUTH_PARAMS
parameters based on the descriptions in the prepare to connect to a Pulsar cluster user guide.
-
Create a C++ consumer and use the C++ consumer to consume messages.
int main(int argc, char *argv[]) {
ClientConfiguration config;
std::string oauthParams = argv[2];
config.setAuth(AuthToken::create("AUTH_PARAMS"));
Client client(argv[1], config);
Consumer consumer;
Result result = client.subscribe("your-topic", "your-sub-name", consumer);
if (result != ResultOk) {
std::cout << "Failed to subscribe: " << result << "\n";
return -1;
}
Message msg;
while (true) {
consumer.receive(msg);
std::cout << "Received: " << msg << " with payload '" << msg.getDataAsString() << "'" << "\n";
consumer.acknowledge(msg);
}
}
-
Create a C++ producer and use the C++ producer to produce messages.
int main(int argc, char *argv[]) {
ClientConfiguration config;
std::string oauthParams = argv[2];
config.setAuth(AuthToken::create("AUTH_PARAMS"));
Client client(argv[1], config);
Producer producer;
Result result = client.createProducer("your-topic", producer);
if (result != ResultOk) {
std::cout << "Error creating producer: " << result << "\n";
return -1;
}
// Send synchronously
Message msg = MessageBuilder().setContent("content").build();
Result res = producer.send(msg);
std::cout << "Message sent: " << res << "\n";
client.close();
}
Connect to a Pulsar cluster using an OAuth2 credential file
To connect to your Pulsar cluster using an OAuth2 credential file, follow these steps.
-
Generate the App credentials by following similar instructions in configure OAuth2 authentication.
-
Save the App credentials into an OAuth2 credential file.
-
Connect to your Pulsar cluster through the OAuth2 credential file.
#include <iostream>
#include <pulsar/Authentication.h>
#include <pulsar/Client.h>
using namespace pulsar;
int main() {
ClientConfiguration config;
std::string oauthParams = R"({
"issuer_url": "your-issuer-url",
"private_key": "file:///path/to/private-key-file.json",
"audience": "your-audience"})";
config.setAuth(pulsar::AuthOauth2::create(oauthParams));
Client client("pulsar-service-url", config);
client.close();
}
issuerUrl
: the URL of your OAuth2 identity provider.
private_key
: the path to your OAuth2 credential file.
audience
: the audience
of your Pulsar cluster.
client
: the URL of your Pulsar cluster.
-
Create a C++ consumer and use the C++ consumer to consume messages.
int main(int argc, char *argv[]) {
ClientConfiguration config;
std::string oauthParams = argv[2];
config.setAuth(pulsar::AuthOauth2::create(oauthParams));
Client client(argv[1], config);
Consumer consumer;
Result result = client.subscribe("your-topic", "your-sub-name", consumer);
if (result != ResultOk) {
std::cout << "Failed to subscribe: " << result << "\n";
return -1;
}
Message msg;
while (true) {
consumer.receive(msg);
std::cout << "Received: " << msg << " with payload '" << msg.getDataAsString() << "'" << "\n";
consumer.acknowledge(msg);
}
}
-
Create a C++ producer and use the C++ producer to produce messages.
int main(int argc, char *argv[]) {
ClientConfiguration config;
std::string oauthParams = argv[2];
config.setAuth(pulsar::AuthOauth2::create(oauthParams));
Client client(argv[1], config);
Producer producer;
Result result = client.createProducer("your-topic", producer);
if (result != ResultOk) {
std::cout << "Error creating producer: " << result << "\n";
return -1;
}
// Send synchronously
Message msg = MessageBuilder().setContent("content").build();
Result res = producer.send(msg);
std::cout << "Message sent: " << res << "\n";
client.close();
}