- Build Applications
- Pulsar Clients
Connect to your cluster using the Pulsar Java client
This document describes how to connect to a cluster using a Java client, and use the Java producer and consumer to produce and consume messages to and from a topic. The Java client supports connecting to a StreamNative cluster using either OAuth2 or API Keys authentication.
Note
This document assumes that you have created a StreamNative cluster and a service account, and have granted the service account produce
and consume
permissions to the namespace for the target topic.
Prerequisites
- Java 1.8 or higher version
- Pulsar client 2.6.1 or higher version
Connect to your cluster using API keys
To connect a StreamNative cluster using API keys, follow these steps.
Step 1: Get the broker service URL of your cluster
To get the service URL(s) of a StreamNative cluster, follow these steps.
Navigate to the Cluster Dashboard page by switching to the cluster workspace.
On the Cluster Dashboard page, click Details tab.
You will see the available service URLs in the Access Points area.
You can click Copy at the end of the row of the service URL that you want to use.
Step 2: Create an API key of your service account
Note
Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
You can follow the instructions to create an API key for the service account you choose to use.
Step 3: Connect to your cluster
For a complete example of how to connect to a cluster using the Pulsar Java client, see Java client examples.
Create a Java consumer to consume messages
You can create and configure a Java consumer to consume messages using API keys as follows. For more information about the placeholders in the code sample, see parameters for Token authentication.
import java.net.URL;
import org.apache.pulsar.client.api.*;
public class SNConsumer {
public static void main(String[] args) throws Exception
{
PulsarClient client = PulsarClient.builder()
.serviceUrl("${brokerServiceURL}")
.authentication(
AuthenticationFactory.token("${apikey}")
)
.build();
Consumer consumer = client.newConsumer()
.topic("persistent://${tenant}/${namespace}/${topic}")
.subscriptionName("${subscription}")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
for (int i = 0; i < 10; i++) {
Message<byte[]> msg = consumer.receive();
consumer.acknowledge(msg);
System.out.println("Receive message " + new String(msg.getData()));
}
consumer.close();
client.close();
}
}
Create a Java producer to produce messages
You can create and configure a Java consumer to consume messages using API keys as follows. For more information about the placeholders in the code sample, see parameters for Token authentication.
import java.net.URL;
import org.apache.pulsar.client.api.*;
public class SNProducer {
public static void main(String[] args) throws Exception
{
PulsarClient client = PulsarClient.builder()
.serviceUrl("${brokerServiceURL}")
.authentication(
AuthenticationFactory.token("${apikey}")
)
.build();
Producer<byte[]> producer = client.newProducer()
.topic("persistent://${tenant}/${namespace}/${topic}")
.create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
MessageId msgID = producer.send(message.getBytes());
System.out.println("Publish " + "my-message-" + i
+ " and message ID " + msgID);
}
producer.close();
client.close();
}
}
Parameters for Token authentication
${brokerServiceURL}
: the broker service URL of your StreamNative cluster.${apikey}
: an API key of your service account.${tenant}/${namespace}/${topic}
: the full name of the topic for message production & consumption. It is a combination of the tenant name, the namespace name and the topic name.${subscription}
: the name of the subscription that will determine how messages are delivered.
Connect to your cluster using OAuth2 authentication
To connect a StreamNative cluster using OAuth2 authentication, follow these steps.
Step 1: Get the broker service URL of your cluster
To get the service URL(s) of a StreamNative cluster, follow these steps.
Navigate to the Cluster Dashboard page by switching to the cluster workspace.
On the Cluster Dashboard page, click Details tab.
You will see the available service URLs in the Access Points area.
You can click Copy at the end of the row of the service URL that you want to use.
Step 2: Get the OAuth2 credential file of your service account
To get an OAuth2 credential file of a service account through the StreamNative Console, follow these steps.
On the left navigation pane, click Service Accounts.
In the row of the service account you want to use, in the Key File column, click the Download icon to download the OAuth2 credential file to your local directory.
The OAuth2 credential file should be something like this:
{ "type": "SN_SERVICE_ACCOUNT", "client_id": "CLIENT_ID", "client_secret": "CLIENT_SECRET", "client_email": "[email protected]", "issuer_url": "https://auth.streamnative.cloud" }
Step 3: Connect to your cluster
For a complete example of how to connect to a cluster using the Pulsar Java client, see Java client examples.
Create a Java consumer to consume messages
You can create and configure a Java consumer to consume messages using the OAuth2 credential file as follows. For more information about the placeholders in the code sample, see parameters for OAuth2 authentication.
import java.net.URL;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
public class SNConsumer {
public static void main(String[] args) throws Exception
{
String issuerUrl = "https://auth.streamnative.cloud/";
String credentialsUrl = "{{ file://YOUR-KEY-FILE-PATH }}";
String audience = "urn:sn:pulsar:${orgName}:${instanceName}";
PulsarClient client = PulsarClient.builder()
.serviceUrl("${brokerServiceURL}")
.authentication(
AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl),
new URL(credentialsUrl),
audience))
.build();
Consumer consumer = client.newConsumer()
.topic("persistent://${tenant}/${namespace}/${topic}")
.subscriptionName("${subscription}")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
for (int i = 0; i < 10; i++) {
Message<byte[]> msg = consumer.receive();
consumer.acknowledge(msg);
System.out.println("Receive message " + new String(msg.getData()));
}
consumer.close();
client.close();
}
}
Create a Java producer to produce messages
You can create and configure a Java producer to produce messages using the OAuth2 credential file as follows. For more information about the placeholders in the code sample, see parameters for OAuth2 authentication.
import java.net.URL;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
public class SNProducer {
public static void main(String[] args) throws Exception
{
String issuerUrl = "https://auth.streamnative.cloud/";
String credentialsUrl = "file:///YOUR-KEY-FILE-PATH"; // Absolute path of your downloaded key file
String audience = "urn:sn:pulsar:${orgName}:${instanceName}";
PulsarClient client = PulsarClient.builder()
.serviceUrl("${brokerServiceURL}")
.authentication(
AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl),
new URL(credentialsUrl),
audience))
.build();
Producer<byte[]> producer = client.newProducer()
.topic("persistent://${tenant}/${namespace}/${topic}")
.create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
MessageId msgID = producer.send(message.getBytes());
System.out.println("Publish " + "my-message-" + i
+ " and message ID " + msgID);
}
producer.close();
client.close();
}
}
Parameters for OAuth2 authentication
credentialsUrl
: your downloaded OAuth2 credential. This parameter supports the following two pattern formats:file:///path/to/file
: the path to your downloaded OAuth2 credential file.data:application/json;base64,<base64-encoded value>
: the credential file content encoded into Base64 format.
audience
: the Uniform Resource Name (URN), which is a combination of theurn:sn:pulsar
, your organization name, and your Pulsar instance name.${orgName}
: the name of your organization.${instanceName}
: the name of your instance.
${brokerServiceUrl}
: the broker service URL of your StreamNative cluster.${tenant}/${namespace}/${topic}
: the full name of the topic for message production & consumption. It is a combination of the tenant name, the namespace name and the topic name.${subscription}
: the name of the subscription that will determine how messages are delivered.