This document provides examples about how to use the Pulsar Java client to connect to a Pulsar cluster through a token or an OAuth2 credential file.
This document assumes that you have created a service account, and have granted the service account produce
and consume
permissions to the namespace for the target topic.
Connect to a Pulsar cluster using a token
This section describes how to connect to you Pulsar cluster using a token.
Prerequisites
Steps
To connect to your Pulsar cluster using a token, follow these steps.
-
Connect to the Pulsar cluster.
package io.streamnative.examples.oauth2;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClient;
public class ConnectByToken {
public static void main(String[] args) throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl("SERVICE_URL")
.authentication(AuthenticationFactory.token("AUTH_PARAMS"))
.build();
client.close();
}
}
Set the SERVICE_URL
and AUTH_PARAMS
parameters based on the descriptions in the prepare to connect to a Pulsar cluster user guide.
-
Create a Java consumer and use the Java consumer to consume messages.
public class SampleConsumer {
public static void main(String[] args) throws Exception {
String serviceUrl = "SERVICE_URL";
String topic = "persistent://public/default/topic-1";
String token = "YOUR_TOKEN";
PulsarClient client = PulsarClient.builder()
.serviceUrl(serviceUrl)
.authentication(AuthenticationFactory.token(token))
.build();
Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
for (int i = 0; i < 10; i++) {
Message<byte[]> msg = consumer.receive();
consumer.acknowledge(msg);
System.out.println("Receive message " + new String(msg.getData()));
}
consumer.close();
client.close();
}
}
-
Create a Java producer and use the Java producer to produce messages.
public class SampleProducer {
public static void main(String[] args) throws Exception {
String serviceUrl = "SERVICE_URL";
String topic = "persistent://public/default/topic-1";
String token = "YOUR_TOKEN";
PulsarClient client = PulsarClient.builder()
.serviceUrl(serviceUrl)
.authentication(AuthenticationFactory.token(token))
.build();
Producer<byte[]> producer = client.newProducer().topic(token).create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
MessageId msgID = producer.send(message.getBytes());
System.out.println("Publish " + "my-message-" + i + " and message ID " + msgID);
}
producer.close();
client.close();
}
}
Connect to a Pulsar cluster using an OAuth2 credential file
To connect to your Pulsar cluster using an OAuth2 credential file, follow these steps.
-
Generate the App credentials by following similar instructions in configure OAuth2 authentication.
-
Save the App credentials into an OAuth2 credential file.
-
Connect to your Pulsar cluster through the OAuth2 credential file.
package io.streamnative.examples.oauth2;
import java.net.URL;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
public class ConnectByOauth2 {
public static void main(String[] args) throws Exception {
String issuerUrl = "your-issuer-url";
String credentialsUrl = "file:///path/to/private-key-file.json";
String audience = "your-audience";
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar-service-url")
.authentication(
AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl), new URL(credentialsUrl), audience))
.build();
client.close();
}
}
issuerUrl
: the URL of your OAuth2 identity provider.
credentialsUrl
: the path to your OAuth2 credential file.
audience
: the audience
of your Pulsar cluster.
serviceUrl
: the URL of your Pulsar cluster.
-
Create a Java consumer and use the Java consumer to consume messages.
You can create and configure the Java consumer to consume messages using the OAuth2 credential file. Or, you can define a Base64 String when creating the Java consumer. Then, the consumer can extract the client ID and client Secret from the OAuth2 credential file and use them to consume messages.
-
Using the OAuth2 credential file
public class SampleConsumer {
public static void main(String[] args) throws Exception {
JCommanderPulsar jct = new JCommanderPulsar();
JCommander jCommander = new JCommander(jct, args);
if (jct.help) {
jCommander.usage();
return;
}
String topic = "your-topic";
PulsarClient client = PulsarClient.builder()
.serviceUrl(jct.serviceUrl)
.authentication(
AuthenticationFactoryOAuth2.clientCredentials(new URL(jct.issuerUrl), new URL(jct.credentialsUrl), jct.audience))
.build();
Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("your-sub-name")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
for (int i = 0; i < 10; i++) {
Message<byte[]> msg = consumer.receive();
consumer.acknowledge(msg);
System.out.println("Receive message " + new String(msg.getData()));
}
consumer.close();
client.close();
}
}
-
Using the OAuth2 client ID and client Secret
package io.streamnative.examples.oauth2;
import com.beust.jcommander.JCommander;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2;
public class SampleConsumerNoCredentialFile {
public static void main(String[] args) throws Exception {
JCommanderPulsar jct = new JCommanderPulsar();
JCommander jCommander = new JCommander(jct, args);
if (jct.help) {
jCommander.usage();
return;
}
String topic = "your-topic";
AuthenticationOAuth2 authenticationOAuth2 = new AuthenticationOAuth2();
ObjectMapper objectMapper = new ObjectMapper();
Map<String, String> params = Maps.newHashMap();
Map<String, String> data = Maps.newHashMap();
data.put("client_id", jct.clientId);
data.put("client_secret", jct.clientSecret);
params.put("privateKey", "data:application/json;base64,"
+ new String(Base64.getEncoder().encode(
objectMapper.writeValueAsString(data).getBytes(StandardCharsets.UTF_8))));
params.put("issuerUrl", jct.issuerUrl);
params.put("audience", jct.audience);
params.put("scope", jct.scope);
authenticationOAuth2.configure(objectMapper.writeValueAsString(params));
PulsarClient client = PulsarClient.builder()
.serviceUrl(jct.serviceUrl)
.authentication(authenticationOAuth2)
.build();
Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("your-sub-name")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
for (int i = 0; i < 10; i++) {
Message<byte[]> msg = consumer.receive();
consumer.acknowledge(msg);
System.out.println("Receive message " + new String(msg.getData()));
}
consumer.close();
client.close();
}
}
-
Create a Java producer and use the Java producer to produce messages.
You can create and configure the Java producer to produce messages using the OAuth2 credential file. Or, you can define a Base64 String when creating the Java producer. Then, the producer can extract the client ID and client Secret from the OAuth2 credential file and use them to produce messages.
-
Using the OAuth2 credential file
public class SampleProducer {
public static void main(String[] args) throws Exception {
JCommanderPulsar jct = new JCommanderPulsar();
JCommander jCommander = new JCommander(jct, args);
if (jct.help) {
jCommander.usage();
return;
}
String topic = "your-topic";
PulsarClient client = PulsarClient.builder()
.serviceUrl(jct.serviceUrl)
.authentication(
AuthenticationFactoryOAuth2.clientCredentials(new URL(jct.issuerUrl), new URL(jct.credentialsUrl), jct.audience))
.build();
ProducerBuilder<byte[]> producerBuilder = client.newProducer().topic(topic)
.producerName("your-producer-name");
Producer<byte[]> producer = producerBuilder.create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
MessageId msgID = producer.send(message.getBytes());
System.out.println("Publish " + "my-message-" + i + " and message ID " + msgID);
}
producer.close();
client.close();
}
}
-
Using the OAuth2 client ID and client Secret
package io.streamnative.examples.oauth2;
import com.beust.jcommander.JCommander;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2;
public class SampleProducerNoCredentialFile {
public static void main(String[] args) throws Exception {
JCommanderPulsar jct = new JCommanderPulsar();
JCommander jCommander = new JCommander(jct, args);
if (jct.help) {
jCommander.usage();
return;
}
String topic = "your-topic";
AuthenticationOAuth2 authenticationOAuth2 = new AuthenticationOAuth2();
ObjectMapper objectMapper = new ObjectMapper();
Map<String, String> params = Maps.newHashMap();
Map<String, String> data = Maps.newHashMap();
data.put("client_id", jct.clientId);
data.put("client_secret", jct.clientSecret);
params.put("privateKey", "data:application/json;base64,"
+ new String(Base64.getEncoder().encode(
objectMapper.writeValueAsString(data).getBytes(StandardCharsets.UTF_8))));
params.put("issuerUrl", jct.issuerUrl);
params.put("audience", jct.audience);
params.put("scope", jct.scope);
authenticationOAuth2.configure(objectMapper.writeValueAsString(params));
PulsarClient client = PulsarClient.builder()
.serviceUrl(jct.serviceUrl)
.authentication(authenticationOAuth2)
.build();
ProducerBuilder<byte[]> producerBuilder = client.newProducer().topic(topic)
.producerName("your-producer-name");
Producer<byte[]> producer = producerBuilder.create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
MessageId msgID = producer.send(message.getBytes());
System.out.println("Publish " + "my-message-" + i + " and message ID " + msgID);
}
producer.close();
client.close();
}
}