- StreamNative Cloud
- Connect
Connect to cluster through Java client
This document describes how to connect to a cluster through a Java client, and use the Java producer and consumer to produce and consume messages to and from a topic. The Java client supports to connect to a Pulsar cluster either through the OAuth2 authentication plugin or Token authentication plugin.
This document assumes that you have created a Pulsar cluster and a service account, and have granted the service account produce
and consume
permissions to the namespace for the target topic.
Prerequisites
- Java 1.8 or higher version
- Pulsar client 2.6.1 or higher version
Connect to cluster through OAuth2 authentication plugin
To connect a cluster through the OAuth2 authentication plugin, follow these steps.
Get the service URL of your Pulsar cluster. For details, see get a service URL.
Get the OAuth2 credential file of your service account. For details, see get an OAuth2 credential file.
Connect to a Pulsar cluster through the OAuth2 authentication plugin.
PulsarClient client = PulsarClient.builder() .serviceUrl("broker_service_url") .authentication( AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl), new URL(credentialsUrl), audience)) .build();
serviceUrl
: the broker service URL of your Pulsar cluster.issuerUrl
: the URL of your OAuth2 authentication provider. You can get the value from your downloaded OAuth2 credential file.credentialsUrl
: the path to your downloaded OAuth2 credential file. TheprivateKey
parameter supports the following three pattern formats:file:///path/to/file
file:/path/to/file
data:application/json;base64,<base64-encoded value>
audience
: theaudience
parameter is the Uniform Resource Name (URN), which is a combination of theurn:sn:pulsar
, the organization name, and the Pulsar instance name, in this formaturn:sn:pulsar:<org_name>:<instance_name>
.
Create a Java consumer and use the Java consumer to consume messages.
You can create and configure the Java consumer to consume messages using the OAuth2 credential file. Or, you can define a Base64 String when creating the Java consumer. Then, the consumer can extract the client ID and client Secret from the OAuth2 credential file and use them to consume messages.
Using the OAuth2 credential file
public class SampleConsumer { public static void main(String[] args) throws Exception { JCommanderPulsar jct = new JCommanderPulsar(); JCommander jCommander = new JCommander(jct, args); if (jct.help) { jCommander.usage(); return; } String topic = "your-topic"; PulsarClient client = PulsarClient.builder() .serviceUrl(jct.serviceUrl) .authentication( AuthenticationFactoryOAuth2.clientCredentials(new URL(jct.issuerUrl), new URL(jct.credentialsUrl), jct.audience)) .build(); Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES) .topic(topic) .subscriptionName("your-sub-name") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); for (int i = 0; i < 10; i++) { Message<byte[]> msg = consumer.receive(); consumer.acknowledge(msg); System.out.println("Receive message " + new String(msg.getData())); } consumer.close(); client.close(); } }
Using the OAuth2 client ID and client Secret
package io.streamnative.examples.oauth2; import com.beust.jcommander.JCommander; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Maps; import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.Map; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2; public class SampleConsumerNoCredentialFile { public static void main(String[] args) throws Exception { JCommanderPulsar jct = new JCommanderPulsar(); JCommander jCommander = new JCommander(jct, args); if (jct.help) { jCommander.usage(); return; } String topic = "your-topic"; AuthenticationOAuth2 authenticationOAuth2 = new AuthenticationOAuth2(); ObjectMapper objectMapper = new ObjectMapper(); Map<String, String> params = Maps.newHashMap(); Map<String, String> data = Maps.newHashMap(); data.put("client_id", jct.clientId); data.put("client_secret", jct.clientSecret); params.put("privateKey", "data:application/json;base64," + new String(Base64.getEncoder().encode( objectMapper.writeValueAsString(data).getBytes(StandardCharsets.UTF_8)))); params.put("issuerUrl", jct.issuerUrl); params.put("audience", jct.audience); params.put("scope", jct.scope); authenticationOAuth2.configure(objectMapper.writeValueAsString(params)); PulsarClient client = PulsarClient.builder() .serviceUrl(jct.serviceUrl) .authentication(authenticationOAuth2) .build(); Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES) .topic(topic) .subscriptionName("your-sub-name") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); for (int i = 0; i < 10; i++) { Message<byte[]> msg = consumer.receive(); consumer.acknowledge(msg); System.out.println("Receive message " + new String(msg.getData())); } consumer.close(); client.close(); } }
Create a Java producer and use the Java producer to produce messages.
You can create and configure the Java producer to produce messages using the OAuth2 credential file. Or, you can define a Base64 String when creating the Java producer. Then, the producer can extract the client ID and client Secret from the OAuth2 credential file and use them to produce messages.
Using the OAuth2 credential file
public class SampleProducer { public static void main(String[] args) throws Exception { JCommanderPulsar jct = new JCommanderPulsar(); JCommander jCommander = new JCommander(jct, args); if (jct.help) { jCommander.usage(); return; } String topic = "your-topic"; PulsarClient client = PulsarClient.builder() .serviceUrl(jct.serviceUrl) .authentication( AuthenticationFactoryOAuth2.clientCredentials(new URL(jct.issuerUrl), new URL(jct.credentialsUrl), jct.audience)) .build(); ProducerBuilder<byte[]> producerBuilder = client.newProducer().topic(topic) .producerName("your-producer-name"); Producer<byte[]> producer = producerBuilder.create(); for (int i = 0; i < 10; i++) { String message = "my-message-" + i; MessageId msgID = producer.send(message.getBytes()); System.out.println("Publish " + "my-message-" + i + " and message ID " + msgID); } producer.close(); client.close(); } }
Using the OAuth2 client ID and client Secret
package io.streamnative.examples.oauth2; import com.beust.jcommander.JCommander; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Maps; import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.Map; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2; public class SampleProducerNoCredentialFile { public static void main(String[] args) throws Exception { JCommanderPulsar jct = new JCommanderPulsar(); JCommander jCommander = new JCommander(jct, args); if (jct.help) { jCommander.usage(); return; } String topic = "your-topic"; AuthenticationOAuth2 authenticationOAuth2 = new AuthenticationOAuth2(); ObjectMapper objectMapper = new ObjectMapper(); Map<String, String> params = Maps.newHashMap(); Map<String, String> data = Maps.newHashMap(); data.put("client_id", jct.clientId); data.put("client_secret", jct.clientSecret); params.put("privateKey", "data:application/json;base64," + new String(Base64.getEncoder().encode( objectMapper.writeValueAsString(data).getBytes(StandardCharsets.UTF_8)))); params.put("issuerUrl", jct.issuerUrl); params.put("audience", jct.audience); params.put("scope", jct.scope); authenticationOAuth2.configure(objectMapper.writeValueAsString(params)); PulsarClient client = PulsarClient.builder() .serviceUrl(jct.serviceUrl) .authentication(authenticationOAuth2) .build(); ProducerBuilder<byte[]> producerBuilder = client.newProducer().topic(topic) .producerName("your-producer-name"); Producer<byte[]> producer = producerBuilder.create(); for (int i = 0; i < 10; i++) { String message = "my-message-" + i; MessageId msgID = producer.send(message.getBytes()); System.out.println("Publish " + "my-message-" + i + " and message ID " + msgID); } producer.close(); client.close(); } }
Connect to cluster through Token authentication plugin
To connect a cluster through the Token authentication plugin, follow these steps.
Get the service URL of your Pulsar cluster. For details, see get a service URL.
Get the token of your service account. For details, see get a token.
Connect to a Pulsar cluster through the Token authentication plugin.
PulsarClient client = PulsarClient.builder() .serviceUrl(SERVICE_URL) .authentication(AuthenticationFactory.token(AUTH_PARAMS)) .build();
SERVICE_URL
: the broker service URL of your Pulsar cluster.AUTH_PARAMS
: the token of your service account.
For a complete example about how to connect to a cluster through the Pulsar Java client, see Java client examples.