- Build Applications
- Kafka Clients
Connect to your cluster using the Kafka Java client
Note
This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account produce
and consume
permissions to a namespace for the target topic.
This document describes how to connect to a StreamNative cluster through a Kafka Java client, and use the Java producer and consumer to produce and consume messages to and from a topic. The Java client supports connecting to a StreamNative cluster using either OAuth2 or API Keys authentication.
Prerequisites
Install Java 17. For details, see overview of JDK installation.
Install Maven. For details, see installing Apache Maven.
Connect to your cluster using API keys
This section describes how to connect to your StreamNative cluster using the Kafka Java client with SASL/PLAIN authentication.
Before you begin
Note
- Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
- The password for different utilities as
kcat
will be equal totoken:<API KEY>
.
You can follow the instructions to create an API key for the service account you choose to use.
Steps
Add Maven dependencies.
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.6.1</version> </dependency>
Open a terminal and run a Kafka consumer to receive a message from the
test-kafka-topic
topic.package org.example; import java.io.IOException; import java.time.Duration; import java.util.Collections; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; /** * A JWT token authentication example of Kafka consumer to StreamNative Cloud */ public class SNCloudJWTTokenConsumer { public static void main(String[] args) throws ExecutionException, InterruptedException, IOException { BasicConfigurator.configure(); Logger.getRootLogger().setLevel(Level.INFO); // replace these configs for your cluster String serverUrl = "SERVER-URL"; String jwtToken = "API-KEY"; String token = "token:" + jwtToken; final String topicName = "test-kafka-topic"; String namespace = "public/default"; final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello-world"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", String.format( "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", namespace, token)); // Create a consumer final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singleton(topicName)); // Consume some messages and quit immediately boolean running = true; while (running) { System.out.println("running"); final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); if (!records.isEmpty()) { records.forEach(record -> System.out.println("Receive record: " + record.value() + " from " + record.topic() + "-" + record.partition() + "@" + record.offset())); running = false; } } consumer.close(); } }
SERVER-URL
: the Kafka service URL of your StreamNative cluster.API-KEY
: an API key of your service account.
Open another terminal and run a Kafka producer to send a message to the
test-kafka-topic
topic.package org.example; import java.io.IOException; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; /** * A JWT token authentication example of Kafka producer to StreamNative Cloud */ public class SNCloudJWTTokenProducer { public static void main(String[] args) throws ExecutionException, InterruptedException, IOException { BasicConfigurator.configure(); Logger.getRootLogger().setLevel(Level.INFO); // replace these configs for your cluster String serverUrl = "SERVER-URL"; String jwtToken = "API-KEY"; String token = "token:" + jwtToken; final String topicName = "test-kafka-topic"; String namespace = "public/default"; // 2. Create a producer with token authentication, which is equivalent to SASL/PLAIN mechanism in Kafka final Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", String.format( "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", namespace, token)); // 2. Create a producer final KafkaProducer<String, String> producer = new KafkaProducer<>(props); // 3. Produce one message for (int i = 0; i < 5; i++) { String value = "hello world"; final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, value)); final RecordMetadata recordMetadata = recordMetadataFuture.get(); System.out.println("Send " + value + " to " + recordMetadata); } producer.close(); } }
SERVER-URL
: the Kafka service URL of your StreamNative cluster.API-KEY
: an API key of your service account.
Connect to your cluster using OAuth2 authentication
This section describes how to connect to your StreamNative cluster using the Kafka Java client with OAuth2 authentication.
Before you begin
Get the OAuth2 credential file.
- On the left navigation pane, click Service Accounts.
- In the row of the service account you want to use, in the Key File column, click the Download icon to download the OAuth2 credential file to your local directory.
Get the service URL of your StreamNative cluster.
- On the left navigation pane, in the Admin area, click Pulsar Clusters.
- Select the Details tab, and in the Access Points area, click Copy at the end of the row of the Kafka Service URL (TCP).
Steps
Add Maven dependencies.
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.4.0</version> </dependency> <dependency> <groupId>io.streamnative.pulsar.handlers</groupId> <artifactId>oauth-client</artifactId> <version>3.1.0.1</version> </dependency>
Open a terminal and run a Kafka consumer to receive a message from the
test-topic
topic.package org.example; import io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; /** * An OAuth2 authentication example of Kafka consumer to StreamNative Cloud */ public class SNCloudOAuth2Consumer { public static void main(String[] args) { // replace these configs with your cluster String serverUrl = "YOUR-KAFKA-SERVICE-URL"; String keyPath = "YOUR-KEY-FILE-ABSOLUTE-PATH"; String audience = "YOUR-AUDIENCE-STRING"; // 1. Create properties of oauth2 authentication, which is equivalent to SASL/PLAIN mechanism in Kafka final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello-world"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.setProperty("sasl.login.callback.handler.class", OauthLoginCallbackHandler.class.getName()); props.setProperty("security.protocol", "SASL_SSL"); props.setProperty("sasl.mechanism", "OAUTHBEARER"); final String jaasTemplate = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required" + " oauth.issuer.url=\"%s\"" + " oauth.credentials.url=\"%s\"" + " oauth.audience=\"%s\";"; props.setProperty("sasl.jaas.config", String.format(jaasTemplate, "https://auth.streamnative.cloud/", "file://" + keyPath, audience )); // 2. Create a consumer final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); final String topicName = "test-topic"; consumer.subscribe(Collections.singleton(topicName)); // 2. Consume some messages and quit immediately boolean running = true; while (running) { System.out.println("running"); final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); if (!records.isEmpty()) { records.forEach(record -> System.out.println("Receive record: " + record.value() + " from " + record.topic() + "-" + record.partition() + "@" + record.offset())); running = false; } } consumer.close(); } }
YOUR-KEY-FILE-ABSOLUTE-PATH
: the path to your downloaded OAuth2 credential file.YOUR-KAFKA-SERVICE-URL
: the Kafka service URL of your StreamNative cluster.YOUR-AUDIENCE-STRING
: theaudience
parameter is a combination of theurn:sn:pulsar
, your organization name, and your Pulsar instance name.
Open another terminal and run a Kafka producer to send a message to the
test-topic
topic.package org.example; import io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.io.IOException; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; /** * An OAuth2 authentication example of Kafka producer to StreamNative Cloud */ public class SNCloudOAuth2Producer { public static void main(String[] args) throws ExecutionException, InterruptedException, IOException { // 1. Create a producer with oauth2 authentication, which is equivalent to SASL/PLAIN mechanism in Kafka final Properties props = new Properties(); // replace these configs with your cluster String serverUrl = "YOUR-KAFKA-SERVICE-URL"; String keyPath = "YOUR-KEY-FILE-ABSOLUTE-PATH"; String audience = "YOUR-AUDIENCE-STRING"; props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.setProperty("sasl.login.callback.handler.class", OauthLoginCallbackHandler.class.getName()); props.setProperty("security.protocol", "SASL_SSL"); props.setProperty("sasl.mechanism", "OAUTHBEARER"); final String jaasTemplate = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required" + " oauth.issuer.url=\"%s\"" + " oauth.credentials.url=\"%s\"" + " oauth.audience=\"%s\";"; props.setProperty("sasl.jaas.config", String.format(jaasTemplate, "https://auth.streamnative.cloud/", "file://" + keyPath, audience )); final KafkaProducer<String, String> producer = new KafkaProducer<>(props); // 2. Produce one message final String topicName = "test-topic"; final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, "hello")); final RecordMetadata recordMetadata = recordMetadataFuture.get(); System.out.println("Send hello to " + recordMetadata); producer.close(); } }
YOUR-KEY-FILE-ABSOLUTE-PATH
: the path to your downloaded OAuth2 credential file.YOUR-KAFKA-SERVICE-URL
: the Kafka service URL of your StreamNative cluster.YOUR-AUDIENCE-STRING
: theaudience
parameter is a combination of theurn:sn:pulsar
, your organization name, and your Pulsar instance name.