Connect to Pulsar cluster using Java client
This document provides examples about how to use the Pulsar Java client to connect to a Pulsar cluster through a token or an OAuth2 credential file.
Note
This document assumes that you have created a service account, and have granted the service account produce
and consume
permissions to the namespace for the target topic.
Connect to a Pulsar cluster using a token
This section describes how to connect to you Pulsar cluster using a token.
Prerequisites
Steps
To connect to your Pulsar cluster using a token, follow these steps.
Connect to the Pulsar cluster.
package io.streamnative.examples.oauth2; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClient; public class ConnectByToken { public static void main(String[] args) throws Exception { PulsarClient client = PulsarClient.builder() .serviceUrl("SERVICE_URL") .authentication(AuthenticationFactory.token("AUTH_PARAMS")) .build(); client.close(); } }
Set the
SERVICE_URL
andAUTH_PARAMS
parameters based on the descriptions in the prepare to connect to a Pulsar cluster user guide.Create a Java consumer and use the Java consumer to consume messages.
public class SampleConsumer { public static void main(String[] args) throws Exception { String serviceUrl = "SERVICE_URL"; String topic = "persistent://public/default/topic-1"; String token = "YOUR_TOKEN"; PulsarClient client = PulsarClient.builder() .serviceUrl(serviceUrl) .authentication(AuthenticationFactory.token(token)) .build(); Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES) .topic(topic) .subscriptionName("sub") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); for (int i = 0; i < 10; i++) { Message<byte[]> msg = consumer.receive(); consumer.acknowledge(msg); System.out.println("Receive message " + new String(msg.getData())); } consumer.close(); client.close(); } }
Create a Java producer and use the Java producer to produce messages.
public class SampleProducer { public static void main(String[] args) throws Exception { String serviceUrl = "SERVICE_URL"; String topic = "persistent://public/default/topic-1"; String token = "YOUR_TOKEN"; PulsarClient client = PulsarClient.builder() .serviceUrl(serviceUrl) .authentication(AuthenticationFactory.token(token)) .build(); Producer<byte[]> producer = client.newProducer().topic(token).create(); for (int i = 0; i < 10; i++) { String message = "my-message-" + i; MessageId msgID = producer.send(message.getBytes()); System.out.println("Publish " + "my-message-" + i + " and message ID " + msgID); } producer.close(); client.close(); } }
Connect to a Pulsar cluster using an OAuth2 credential file
To connect to your Pulsar cluster using an OAuth2 credential file, follow these steps.
Generate the App credentials by following similar instructions in configure OAuth2 authentication.
Save the App credentials into an OAuth2 credential file.
Connect to your Pulsar cluster through the OAuth2 credential file.
package io.streamnative.examples.oauth2; import java.net.URL; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2; public class ConnectByOauth2 { public static void main(String[] args) throws Exception { String issuerUrl = "your-issuer-url"; String credentialsUrl = "file:///path/to/private-key-file.json"; String audience = "your-audience"; PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar-service-url") .authentication( AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl), new URL(credentialsUrl), audience)) .build(); client.close(); } }
issuerUrl
: the URL of your OAuth2 identity provider.credentialsUrl
: the path to your OAuth2 credential file.audience
: theaudience
of your Pulsar cluster.serviceUrl
: the URL of your Pulsar cluster.
Create a Java consumer and use the Java consumer to consume messages.
You can create and configure the Java consumer to consume messages using the OAuth2 credential file. Or, you can define a Base64 String when creating the Java consumer. Then, the consumer can extract the client ID and client Secret from the OAuth2 credential file and use them to consume messages.
Using the OAuth2 credential file
public class SampleConsumer { public static void main(String[] args) throws Exception { JCommanderPulsar jct = new JCommanderPulsar(); JCommander jCommander = new JCommander(jct, args); if (jct.help) { jCommander.usage(); return; } String topic = "your-topic"; PulsarClient client = PulsarClient.builder() .serviceUrl(jct.serviceUrl) .authentication( AuthenticationFactoryOAuth2.clientCredentials(new URL(jct.issuerUrl), new URL(jct.credentialsUrl), jct.audience)) .build(); Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES) .topic(topic) .subscriptionName("your-sub-name") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); for (int i = 0; i < 10; i++) { Message<byte[]> msg = consumer.receive(); consumer.acknowledge(msg); System.out.println("Receive message " + new String(msg.getData())); } consumer.close(); client.close(); } }
Using the OAuth2 client ID and client Secret
package io.streamnative.examples.oauth2; import com.beust.jcommander.JCommander; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Maps; import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.Map; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2; public class SampleConsumerNoCredentialFile { public static void main(String[] args) throws Exception { JCommanderPulsar jct = new JCommanderPulsar(); JCommander jCommander = new JCommander(jct, args); if (jct.help) { jCommander.usage(); return; } String topic = "your-topic"; AuthenticationOAuth2 authenticationOAuth2 = new AuthenticationOAuth2(); ObjectMapper objectMapper = new ObjectMapper(); Map<String, String> params = Maps.newHashMap(); Map<String, String> data = Maps.newHashMap(); data.put("client_id", jct.clientId); data.put("client_secret", jct.clientSecret); params.put("privateKey", "data:application/json;base64," + new String(Base64.getEncoder().encode( objectMapper.writeValueAsString(data).getBytes(StandardCharsets.UTF_8)))); params.put("issuerUrl", jct.issuerUrl); params.put("audience", jct.audience); params.put("scope", jct.scope); authenticationOAuth2.configure(objectMapper.writeValueAsString(params)); PulsarClient client = PulsarClient.builder() .serviceUrl(jct.serviceUrl) .authentication(authenticationOAuth2) .build(); Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES) .topic(topic) .subscriptionName("your-sub-name") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); for (int i = 0; i < 10; i++) { Message<byte[]> msg = consumer.receive(); consumer.acknowledge(msg); System.out.println("Receive message " + new String(msg.getData())); } consumer.close(); client.close(); } }
Create a Java producer and use the Java producer to produce messages.
You can create and configure the Java producer to produce messages using the OAuth2 credential file. Or, you can define a Base64 String when creating the Java producer. Then, the producer can extract the client ID and client Secret from the OAuth2 credential file and use them to produce messages.
Using the OAuth2 credential file
public class SampleProducer { public static void main(String[] args) throws Exception { JCommanderPulsar jct = new JCommanderPulsar(); JCommander jCommander = new JCommander(jct, args); if (jct.help) { jCommander.usage(); return; } String topic = "your-topic"; PulsarClient client = PulsarClient.builder() .serviceUrl(jct.serviceUrl) .authentication( AuthenticationFactoryOAuth2.clientCredentials(new URL(jct.issuerUrl), new URL(jct.credentialsUrl), jct.audience)) .build(); ProducerBuilder<byte[]> producerBuilder = client.newProducer().topic(topic) .producerName("your-producer-name"); Producer<byte[]> producer = producerBuilder.create(); for (int i = 0; i < 10; i++) { String message = "my-message-" + i; MessageId msgID = producer.send(message.getBytes()); System.out.println("Publish " + "my-message-" + i + " and message ID " + msgID); } producer.close(); client.close(); } }
Using the OAuth2 client ID and client Secret
package io.streamnative.examples.oauth2; import com.beust.jcommander.JCommander; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Maps; import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.Map; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2; public class SampleProducerNoCredentialFile { public static void main(String[] args) throws Exception { JCommanderPulsar jct = new JCommanderPulsar(); JCommander jCommander = new JCommander(jct, args); if (jct.help) { jCommander.usage(); return; } String topic = "your-topic"; AuthenticationOAuth2 authenticationOAuth2 = new AuthenticationOAuth2(); ObjectMapper objectMapper = new ObjectMapper(); Map<String, String> params = Maps.newHashMap(); Map<String, String> data = Maps.newHashMap(); data.put("client_id", jct.clientId); data.put("client_secret", jct.clientSecret); params.put("privateKey", "data:application/json;base64," + new String(Base64.getEncoder().encode( objectMapper.writeValueAsString(data).getBytes(StandardCharsets.UTF_8)))); params.put("issuerUrl", jct.issuerUrl); params.put("audience", jct.audience); params.put("scope", jct.scope); authenticationOAuth2.configure(objectMapper.writeValueAsString(params)); PulsarClient client = PulsarClient.builder() .serviceUrl(jct.serviceUrl) .authentication(authenticationOAuth2) .build(); ProducerBuilder<byte[]> producerBuilder = client.newProducer().topic(topic) .producerName("your-producer-name"); Producer<byte[]> producer = producerBuilder.create(); for (int i = 0; i < 10; i++) { String message = "my-message-" + i; MessageId msgID = producer.send(message.getBytes()); System.out.println("Publish " + "my-message-" + i + " and message ID " + msgID); } producer.close(); client.close(); } }