1. StreamNative Cloud
  2. Connect

Connect to cluster through Java client

This document describes how to connect to a cluster through a Java client, and use the Java producer and consumer to produce and consume messages to and from a topic. The Java client supports to connect to a Pulsar cluster either through the OAuth2 authentication plugin or Token authentication plugin.

This document assumes that you have created a Pulsar cluster and a service account, and have granted the service account produce and consume permissions to the namespace for the target topic.

Prerequisites

  • Java 1.8 or higher version
  • Pulsar client 2.6.1 or higher version

Connect to cluster through OAuth2 authentication plugin

To connect a cluster through the OAuth2 authentication plugin, follow these steps.

  1. Get the service URL of your Pulsar cluster. For details, see get a service URL.

  2. Get the OAuth2 credential file of your service account. For details, see get an OAuth2 credential file.

  3. Connect to a Pulsar cluster through the OAuth2 authentication plugin.

    PulsarClient client = PulsarClient.builder()
      .serviceUrl("broker_service_url")
      .authentication(
        AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl), new URL(credentialsUrl), audience))
      .build();
    
    • serviceUrl: the broker service URL of your Pulsar cluster.
    • issuerUrl: the URL of your OAuth2 authentication provider. You can get the value from your downloaded OAuth2 credential file.
    • credentialsUrl: the path to your downloaded OAuth2 credential file. The privateKey parameter supports the following three pattern formats:
      • file:///path/to/file
      • file:/path/to/file
      • data:application/json;base64,<base64-encoded value>
    • audience: the audience parameter is the Uniform Resource Name (URN), which is a combination of the urn:sn:pulsar, the organization name, and the Pulsar instance name, in this format urn:sn:pulsar:<org_name>:<instance_name>.
  4. Create a Java consumer and use the Java consumer to consume messages.

    You can create and configure the Java consumer to consume messages using the OAuth2 credential file. Or, you can define a Base64 String when creating the Java consumer. Then, the consumer can extract the client ID and client Secret from the OAuth2 credential file and use them to consume messages.

    • Using the OAuth2 credential file

      public class SampleConsumer {
        public static void main(String[] args) throws Exception {
          JCommanderPulsar jct = new JCommanderPulsar();
          JCommander jCommander = new JCommander(jct, args);
          if (jct.help) {
            jCommander.usage();
            return;
          }
          String topic = "your-topic";
          PulsarClient client = PulsarClient.builder()
            .serviceUrl(jct.serviceUrl)
            .authentication(
              AuthenticationFactoryOAuth2.clientCredentials(new URL(jct.issuerUrl), new URL(jct.credentialsUrl), jct.audience))
            .build();
          Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
            .topic(topic)
            .subscriptionName("your-sub-name")
            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
            .subscribe();
          for (int i = 0; i < 10; i++) {
            Message<byte[]> msg = consumer.receive();
            consumer.acknowledge(msg);
            System.out.println("Receive message " + new String(msg.getData()));
          }
          consumer.close();
          client.close();
        }
      }
      
    • Using the OAuth2 client ID and client Secret

      package io.streamnative.examples.oauth2;
      import com.beust.jcommander.JCommander;
      import com.fasterxml.jackson.databind.ObjectMapper;
      import com.google.common.collect.Maps;
      import java.nio.charset.StandardCharsets;
      import java.util.Base64;
      import java.util.Map;
      import org.apache.pulsar.client.api.Consumer;
      import org.apache.pulsar.client.api.Message;
      import org.apache.pulsar.client.api.PulsarClient;
      import org.apache.pulsar.client.api.Schema;
      import org.apache.pulsar.client.api.SubscriptionInitialPosition;
      import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2;
      public class SampleConsumerNoCredentialFile {
        public static void main(String[] args) throws Exception {
          JCommanderPulsar jct = new JCommanderPulsar();
          JCommander jCommander = new JCommander(jct, args);
          if (jct.help) {
            jCommander.usage();
            return;
          }
          String topic = "your-topic";
          AuthenticationOAuth2 authenticationOAuth2 = new AuthenticationOAuth2();
          ObjectMapper objectMapper = new ObjectMapper();
          Map<String, String> params = Maps.newHashMap();
          Map<String, String> data = Maps.newHashMap();
          data.put("client_id", jct.clientId);
          data.put("client_secret", jct.clientSecret);
          params.put("privateKey", "data:application/json;base64,"
            + new String(Base64.getEncoder().encode(
              objectMapper.writeValueAsString(data).getBytes(StandardCharsets.UTF_8))));
          params.put("issuerUrl", jct.issuerUrl);
          params.put("audience", jct.audience);
          params.put("scope", jct.scope);
          authenticationOAuth2.configure(objectMapper.writeValueAsString(params));
          PulsarClient client = PulsarClient.builder()
            .serviceUrl(jct.serviceUrl)
            .authentication(authenticationOAuth2)
            .build();
          Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
            .topic(topic)
            .subscriptionName("your-sub-name")
            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
            .subscribe();
          for (int i = 0; i < 10; i++) {
            Message<byte[]> msg = consumer.receive();
            consumer.acknowledge(msg);
            System.out.println("Receive message " + new String(msg.getData()));
          }
          consumer.close();
          client.close();
        }
      }
      
  5. Create a Java producer and use the Java producer to produce messages.

    You can create and configure the Java producer to produce messages using the OAuth2 credential file. Or, you can define a Base64 String when creating the Java producer. Then, the producer can extract the client ID and client Secret from the OAuth2 credential file and use them to produce messages.

    • Using the OAuth2 credential file

      public class SampleProducer {
        public static void main(String[] args) throws Exception {
          JCommanderPulsar jct = new JCommanderPulsar();
          JCommander jCommander = new JCommander(jct, args);
          if (jct.help) {
            jCommander.usage();
            return;
          }
          String topic = "your-topic";
          PulsarClient client = PulsarClient.builder()
            .serviceUrl(jct.serviceUrl)
            .authentication(
              AuthenticationFactoryOAuth2.clientCredentials(new URL(jct.issuerUrl), new URL(jct.credentialsUrl), jct.audience))
            .build();
          ProducerBuilder<byte[]> producerBuilder = client.newProducer().topic(topic)
            .producerName("your-producer-name");
          Producer<byte[]> producer = producerBuilder.create();
          for (int i = 0; i < 10; i++) {
            String message = "my-message-" + i;
            MessageId msgID = producer.send(message.getBytes());
            System.out.println("Publish " + "my-message-" + i + " and message ID " + msgID);
          }
          producer.close();
          client.close();
        }
      }
      
    • Using the OAuth2 client ID and client Secret

      package io.streamnative.examples.oauth2;
      import com.beust.jcommander.JCommander;
      import com.fasterxml.jackson.databind.ObjectMapper;
      import com.google.common.collect.Maps;
      import java.nio.charset.StandardCharsets;
      import java.util.Base64;
      import java.util.Map;
      import org.apache.pulsar.client.api.MessageId;
      import org.apache.pulsar.client.api.Producer;
      import org.apache.pulsar.client.api.ProducerBuilder;
      import org.apache.pulsar.client.api.PulsarClient;
      import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2;
      public class SampleProducerNoCredentialFile {
        public static void main(String[] args) throws Exception {
          JCommanderPulsar jct = new JCommanderPulsar();
          JCommander jCommander = new JCommander(jct, args);
          if (jct.help) {
            jCommander.usage();
            return;
          }
          String topic = "your-topic";
          AuthenticationOAuth2 authenticationOAuth2 = new AuthenticationOAuth2();
          ObjectMapper objectMapper = new ObjectMapper();
          Map<String, String> params = Maps.newHashMap();
          Map<String, String> data = Maps.newHashMap();
          data.put("client_id", jct.clientId);
          data.put("client_secret", jct.clientSecret);
          params.put("privateKey", "data:application/json;base64,"
            + new String(Base64.getEncoder().encode(
              objectMapper.writeValueAsString(data).getBytes(StandardCharsets.UTF_8))));
          params.put("issuerUrl", jct.issuerUrl);
          params.put("audience", jct.audience);
          params.put("scope", jct.scope);
          authenticationOAuth2.configure(objectMapper.writeValueAsString(params));
          PulsarClient client = PulsarClient.builder()
            .serviceUrl(jct.serviceUrl)
            .authentication(authenticationOAuth2)
            .build();
          ProducerBuilder<byte[]> producerBuilder = client.newProducer().topic(topic)
            .producerName("your-producer-name");
          Producer<byte[]> producer = producerBuilder.create();
          for (int i = 0; i < 10; i++) {
            String message = "my-message-" + i;
            MessageId msgID = producer.send(message.getBytes());
            System.out.println("Publish " + "my-message-" + i + " and message ID " + msgID);
          }
          producer.close();
          client.close();
        }
      }
      

Connect to cluster through Token authentication plugin

To connect a cluster through the Token authentication plugin, follow these steps.

  1. Get the service URL of your Pulsar cluster. For details, see get a service URL.

  2. Get the token of your service account. For details, see get a token.

  3. Connect to a Pulsar cluster through the Token authentication plugin.

    PulsarClient client = PulsarClient.builder()
      .serviceUrl(SERVICE_URL)
      .authentication(AuthenticationFactory.token(AUTH_PARAMS))
      .build();
    
    • SERVICE_URL: the broker service URL of your Pulsar cluster.
    • AUTH_PARAMS: the token of your service account.

For a complete example about how to connect to a cluster through the Pulsar Java client, see Java client examples.

Previous
Overview