This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account produce
and consume
permissions to a namespace for the target topic.
This document describes how to connect to a StreamNative cluster through a Kafka Java client, and use the Java producer and consumer to produce and consume messages to and from a topic. The Java client supports connecting to a StreamNative cluster using either OAuth2 or API Keys authentication.
Prerequisites
Connect to your cluster using API keys
This section describes how to connect to your StreamNative cluster using the Kafka Java client with SASL/PLAIN authentication.
Before you begin
- Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
- The password for different utilities as
kcat
will be equal to token:<API KEY>
.
You can follow the instructions to create an API key for the service account you choose to use.
Steps
-
Add Maven dependencies.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>
-
Open a terminal and run a Kafka consumer to receive a message from the test-kafka-topic
topic.
package org.example;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
/**
* A JWT token authentication example of Kafka consumer to StreamNative Cloud
*/
public class SNCloudJWTTokenConsumer {
public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
BasicConfigurator.configure();
Logger.getRootLogger().setLevel(Level.INFO);
// replace these configs for your cluster
String serverUrl = "SERVER-URL";
String jwtToken = "API-KEY";
String token = "token:" + jwtToken;
final String topicName = "test-kafka-topic";
String namespace = "public/default";
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello-world");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config", String.format(
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
namespace, token));
// Create a consumer
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton(topicName));
// Consume some messages and quit immediately
boolean running = true;
while (running) {
System.out.println("running");
final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
if (!records.isEmpty()) {
records.forEach(record -> System.out.println("Receive record: " + record.value() + " from "
+ record.topic() + "-" + record.partition() + "@" + record.offset()));
running = false;
}
}
consumer.close();
}
}
SERVER-URL
: the Kafka service URL of your StreamNative cluster.
API-KEY
: an API key of your service account.
-
Open another terminal and run a Kafka producer to send a message to the test-kafka-topic
topic.
package org.example;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
/**
* A JWT token authentication example of Kafka producer to StreamNative Cloud
*/
public class SNCloudJWTTokenProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
BasicConfigurator.configure();
Logger.getRootLogger().setLevel(Level.INFO);
// replace these configs for your cluster
String serverUrl = "SERVER-URL";
String jwtToken = "API-KEY";
String token = "token:" + jwtToken;
final String topicName = "test-kafka-topic";
String namespace = "public/default";
// 2. Create a producer with token authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
final Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config", String.format(
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
namespace, token));
// 2. Create a producer
final KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 3. Produce one message
for (int i = 0; i < 5; i++) {
String value = "hello world";
final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, value));
final RecordMetadata recordMetadata = recordMetadataFuture.get();
System.out.println("Send " + value + " to " + recordMetadata);
}
producer.close();
}
}
SERVER-URL
: the Kafka service URL of your StreamNative cluster.
API-KEY
: an API key of your service account.
Connect to your cluster using OAuth2 authentication
This section describes how to connect to your StreamNative cluster using the Kafka Java client with OAuth2 authentication.
Before you begin
Steps
-
Add Maven dependencies.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>io.streamnative.pulsar.handlers</groupId>
<artifactId>oauth-client</artifactId>
<version>3.1.0.1</version>
</dependency>
-
Open a terminal and run a Kafka consumer to receive a message from the test-topic
topic.
package org.example;
import io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* An OAuth2 authentication example of Kafka consumer to StreamNative Cloud
*/
public class SNCloudOAuth2Consumer {
public static void main(String[] args) {
// replace these configs with your cluster
String serverUrl = "YOUR-KAFKA-SERVICE-URL";
String keyPath = "YOUR-KEY-FILE-ABSOLUTE-PATH";
String audience = "YOUR-AUDIENCE-STRING";
// 1. Create properties of oauth2 authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello-world");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.setProperty("sasl.login.callback.handler.class", OauthLoginCallbackHandler.class.getName());
props.setProperty("security.protocol", "SASL_SSL");
props.setProperty("sasl.mechanism", "OAUTHBEARER");
final String jaasTemplate = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required"
+ " oauth.issuer.url=\"%s\""
+ " oauth.credentials.url=\"%s\""
+ " oauth.audience=\"%s\";";
props.setProperty("sasl.jaas.config", String.format(jaasTemplate,
"https://auth.streamnative.cloud/",
"file://" + keyPath,
audience
));
// 2. Create a consumer
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
final String topicName = "test-topic";
consumer.subscribe(Collections.singleton(topicName));
// 2. Consume some messages and quit immediately
boolean running = true;
while (running) {
System.out.println("running");
final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
if (!records.isEmpty()) {
records.forEach(record -> System.out.println("Receive record: " + record.value() + " from "
+ record.topic() + "-" + record.partition() + "@" + record.offset()));
running = false;
}
}
consumer.close();
}
}
YOUR-KEY-FILE-ABSOLUTE-PATH
: the path to your downloaded OAuth2 credential file.
YOUR-KAFKA-SERVICE-URL
: the Kafka service URL of your StreamNative cluster.
YOUR-AUDIENCE-STRING
: the audience
parameter is a combination of the urn:sn:pulsar
, your organization name, and your Pulsar instance name.
-
Open another terminal and run a Kafka producer to send a message to the test-topic
topic.
package org.example;
import io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* An OAuth2 authentication example of Kafka producer to StreamNative Cloud
*/
public class SNCloudOAuth2Producer {
public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
// 1. Create a producer with oauth2 authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
final Properties props = new Properties();
// replace these configs with your cluster
String serverUrl = "YOUR-KAFKA-SERVICE-URL";
String keyPath = "YOUR-KEY-FILE-ABSOLUTE-PATH";
String audience = "YOUR-AUDIENCE-STRING";
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.setProperty("sasl.login.callback.handler.class", OauthLoginCallbackHandler.class.getName());
props.setProperty("security.protocol", "SASL_SSL");
props.setProperty("sasl.mechanism", "OAUTHBEARER");
final String jaasTemplate = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required"
+ " oauth.issuer.url=\"%s\""
+ " oauth.credentials.url=\"%s\""
+ " oauth.audience=\"%s\";";
props.setProperty("sasl.jaas.config", String.format(jaasTemplate,
"https://auth.streamnative.cloud/",
"file://" + keyPath,
audience
));
final KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 2. Produce one message
final String topicName = "test-topic";
final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, "hello"));
final RecordMetadata recordMetadata = recordMetadataFuture.get();
System.out.println("Send hello to " + recordMetadata);
producer.close();
}
}
YOUR-KEY-FILE-ABSOLUTE-PATH
: the path to your downloaded OAuth2 credential file.
YOUR-KAFKA-SERVICE-URL
: the Kafka service URL of your StreamNative cluster.
YOUR-AUDIENCE-STRING
: the audience
parameter is a combination of the urn:sn:pulsar
, your organization name, and your Pulsar instance name.