> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Connect to your cluster using the Pulsar Java client

This document describes how to connect to a cluster using a Java client, and use the Java producer and consumer to produce and consume messages to and from a topic. The Java client supports connecting to a StreamNative cluster using either [OAuth2](#use-oauth2) or [API Keys](#use-apikeys) authentication.

<Note title="Note">
  This document assumes that you have created a StreamNative cluster and a service account, and have granted the service account `produce` and `consume` permissions to the namespace for the target topic.
</Note>

## Prerequisites

* Java 1.8 or higher version
* Pulsar client 2.6.1 or higher version

## Connect to your cluster using API keys

<span id="use-apikeys" />

To connect a StreamNative cluster using [API keys](/cloud/security/authentication/service-accounts/use-api-keys/api-keys-overview), follow these steps.

### Step 1: Get the broker service URL of your cluster

To get the service URL(s) of a StreamNative cluster, follow these steps.

<Tabs>
  <Tab title="StreamNative Console">
    1. Navigate to the **Cluster Dashboard** page by [switching to the cluster workspace](/cloud/get-started/cloud-console#switch-a-cluster).

    2. On the **Cluster Dashboard** page, click **Details** tab.

    3. You will see the available service URLs in the **Access Points** area.

    4. You can click **Copy** at the end of the row of the service URL that you want to use.
  </Tab>
</Tabs>

### Step 2: Create an API key of your service account

<Note title="Note">
  Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
</Note>

You can follow the instructions to [create an API key](/cloud/security/authentication/service-accounts/use-api-keys/api-keys-overview#using-api-keys-to-connect-to-your-cluster) for the service account you choose to use.

### Step 3: Connect to your cluster

For a complete example of how to connect to a cluster using the Pulsar Java client, see [Java client examples](https://github.com/streamnative/cloud-manager/tree/master/ui/src/data/code/clients/java).

#### Create a Java consumer to consume messages

You can create and configure a Java consumer to consume messages using API keys as follows. For more information about the placeholders in the code sample, see [parameters for Token authentication](/cloud/build/pulsar-clients/cloud-connect-java#parameters-for-token-authentication).

```java theme={null}
import java.net.URL;
import org.apache.pulsar.client.api.*;

public class SNConsumer {

    public static void main(String[] args) throws Exception
    {

        PulsarClient client = PulsarClient.builder()
            .serviceUrl("${brokerServiceURL}")
            .authentication(
                AuthenticationFactory.token("${apikey}")
            )
            .build();

        Consumer consumer = client.newConsumer()
                .topic("persistent://${tenant}/${namespace}/${topic}")
                .subscriptionName("${subscription}")
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();

        for (int i = 0; i < 10; i++) {
            Message<byte[]> msg = consumer.receive();
            consumer.acknowledge(msg);
            System.out.println("Receive message " + new String(msg.getData()));
        }

        consumer.close();
        client.close();

    }

}
```

#### Create a Java producer to produce messages

You can create and configure a Java consumer to consume messages using API keys as follows. For more information about the placeholders in the code sample, see [parameters for Token authentication](/cloud/build/pulsar-clients/cloud-connect-java#parameters-for-token-authentication).

```java theme={null}
import java.net.URL;
import org.apache.pulsar.client.api.*;

public class SNProducer {

    public static void main(String[] args) throws Exception
    {

        PulsarClient client = PulsarClient.builder()
            .serviceUrl("${brokerServiceURL}")
            .authentication(
                AuthenticationFactory.token("${apikey}")
            )
            .build();

        Producer<byte[]> producer = client.newProducer()
            .topic("persistent://${tenant}/${namespace}/${topic}")
            .create();

        for (int i = 0; i < 10; i++) {
            String message = "my-message-" + i;
            MessageId msgID = producer.send(message.getBytes());
            System.out.println("Publish " + "my-message-" + i
                               + " and message ID " + msgID);
        }

        producer.close();
        client.close();

    }

}
```

#### Parameters for Token authentication

* `${brokerServiceURL}`: the broker service URL of your StreamNative cluster.
* `${apikey}`: an API key of your service account.
* `${tenant}/${namespace}/${topic}`: the full name of the topic for message production & consumption. It is a combination of the tenant name, the namespace name and the topic name.
* `${subscription}`: the name of the subscription that will determine how messages are delivered.

## Connect to your cluster using OAuth2 authentication

<span id="use-oauth2" />

To connect a StreamNative cluster using OAuth2 authentication, follow these steps.

### Step 1: Get the broker service URL of your cluster

To get the service URL(s) of a StreamNative cluster, follow these steps.

<Tabs>
  <Tab title="StreamNative Console">
    1. Navigate to the **Cluster Dashboard** page by [switching to the cluster workspace](/cloud/get-started/cloud-console#switch-a-cluster).

    2. On the **Cluster Dashboard** page, click **Details** tab.

    3. You will see the available service URLs in the **Access Points** area.

    4. You can click **Copy** at the end of the row of the service URL that you want to use.
  </Tab>
</Tabs>

### Step 2: Get the OAuth2 credential file of your service account

To get an OAuth2 credential file of a service account through the StreamNative Console, follow these steps.

1. On the left navigation pane, click **Service Accounts**.

2. In the row of the service account you want to use, in the **Key File** column, click the **Download** icon to download the OAuth2 credential file to your local directory.

   The OAuth2 credential file should be something like this:

   ```json theme={null}
   {
     "type": "SN_SERVICE_ACCOUNT",
     "client_id": "CLIENT_ID",
     "client_secret": "CLIENT_SECRET",
     "client_email": "test@auth.streamnative.cloud",
     "issuer_url": "https://auth.streamnative.cloud"
   }
   ```

### Step 3: Connect to your cluster

For a complete example of how to connect to a cluster using the Pulsar Java client, see [Java client examples](https://github.com/streamnative/cloud-manager/tree/master/ui/src/data/code/clients/java).

#### Create a Java consumer to consume messages

You can create and configure a Java consumer to consume messages using the OAuth2 credential file as follows. For more information about the placeholders in the code sample, see [parameters for OAuth2 authentication](/cloud/build/pulsar-clients/cloud-connect-java#parameters-for-oauth2-authentication).

```java theme={null}
import java.net.URL;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;

public class SNConsumer {

  public static void main(String[] args) throws Exception
  {
      String issuerUrl = "https://auth.streamnative.cloud/";
      String credentialsUrl = "{{ file://YOUR-KEY-FILE-PATH }}";
      String audience = "urn:sn:pulsar:${orgName}:${instanceName}";

      PulsarClient client = PulsarClient.builder()
          .serviceUrl("${brokerServiceURL}")
          .authentication(
                AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl),
                                                              new URL(credentialsUrl),
                                                              audience))
          .build();

      Consumer consumer = client.newConsumer()
              .topic("persistent://${tenant}/${namespace}/${topic}")
              .subscriptionName("${subscription}")
              .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
              .subscribe();

      for (int i = 0; i < 10; i++) {
          Message<byte[]> msg = consumer.receive();
          consumer.acknowledge(msg);
          System.out.println("Receive message " + new String(msg.getData()));
      }

      consumer.close();
      client.close();

  }
}
```

#### Create a Java producer to produce messages

You can create and configure a Java producer to produce messages using the OAuth2 credential file as follows. For more information about the placeholders in the code sample, see [parameters for OAuth2 authentication](/cloud/build/pulsar-clients/cloud-connect-java#parameters-for-oauth2-authentication).

```java theme={null}
import java.net.URL;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;

public class SNProducer {

  public static void main(String[] args) throws Exception
  {

      String issuerUrl = "https://auth.streamnative.cloud/";
      String credentialsUrl = "file:///YOUR-KEY-FILE-PATH"; // Absolute path of your downloaded key file
      String audience = "urn:sn:pulsar:${orgName}:${instanceName}";

      PulsarClient client = PulsarClient.builder()
          .serviceUrl("${brokerServiceURL}")
          .authentication(
                AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl),
                                                              new URL(credentialsUrl),
                                                              audience))
          .build();

      Producer<byte[]> producer = client.newProducer()
          .topic("persistent://${tenant}/${namespace}/${topic}")
          .create();

      for (int i = 0; i < 10; i++) {
          String message = "my-message-" + i;
          MessageId msgID = producer.send(message.getBytes());
          System.out.println("Publish " + "my-message-" + i
                             + " and message ID " + msgID);
      }

      producer.close();
      client.close();
  }
}
```

#### Parameters for OAuth2 authentication

* `credentialsUrl`: your downloaded OAuth2 credential. This parameter supports the following two pattern formats:
  * `file:///path/to/file`: the path to your downloaded OAuth2 credential file.
  * `data:application/json;base64,<base64-encoded value>`: the credential file content encoded into Base64 format.
* `audience`: the [Uniform Resource Name (URN)](/cloud/references/glossary#urn), which is a combination of the `urn:sn:pulsar`, your organization name, and your Pulsar instance name.
  * `${orgName}`: the name of your [organization](/cloud/references/glossary#organization).
  * `${instanceName}`: the name of your [instance](/cloud/references/glossary#instance).
* `${brokerServiceUrl}`: the broker service URL of your StreamNative cluster.
* `${tenant}/${namespace}/${topic}`: the full name of the topic for message production & consumption. It is a combination of the tenant name, the namespace name and the topic name.
* `${subscription}`: the name of the subscription that will determine how messages are delivered.
