- StreamNative Cloud
- Connect
Connect to cluster through Kafka Java client
This document assumes that you have created a Pulsar cluster and a service account, and have granted the service account produce and consume permissions to a namespace on your Pulsar cluster. For details, see get started with the Kafka protocol.
This document describes how to connect to your Pulsar cluster using the Kafka Java client through the OAuth2 authentication plugin.
Prerequisites
Get the OAuth2 credential file. For details, see get an OAuth2 credential file.
Get the broker service URL of your Pulsar cluster. For details, see get a service URL.
Install Java 17. For details, see overview of JDK installation.
Install Maven. For details, see installing Apache Maven.
Steps
Add Maven dependencies.
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>io.streamnative.pulsar.handlers</groupId> <artifactId>oauth-client</artifactId> <version>2.8.3.1</version> </dependency>
Open a terminal and run a Kafka consumer to receive a message from the
test-topic
topic.package org.example; import io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; /** * An OAuth2 authentication example of Kafka consumer to StreamNative Cloud */ public class SNCloudOAuth2Consumer { public static void main(String[] args) { // replace these config with your cluster String serverUrl = "your-pulsar-service-url"; String keyPath = "YOUR-KEY-FILE-ABSOLUTE-PATH"; String audience = "YOUR-AUDIENCE-STRING"; // 1. Create properties of oauth2 authentication, which is equivalent to SASL/PLAIN mechanism in Kafka final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello-world"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.setProperty("sasl.login.callback.handler.class", OauthLoginCallbackHandler.class.getName()); props.setProperty("security.protocol", "SASL_SSL"); props.setProperty("sasl.mechanism", "OAUTHBEARER"); final String jaasTemplate = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required" + " oauth.issuer.url=\"%s\"" + " oauth.credentials.url=\"%s\"" + " oauth.audience=\"%s\";"; props.setProperty("sasl.jaas.config", String.format(jaasTemplate, "https://auth.streamnative.cloud/", "file://" + keyPath, audience )); // 2. Create a consumer final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); final String topicName = "persistent://public/default/test-topic"; consumer.subscribe(Collections.singleton(topicName)); // 2. Consume some messages and quit immediately boolean running = true; while (running) { System.out.println("running"); final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); if (!records.isEmpty()) { records.forEach(record -> System.out.println("Receive record: " + record.value() + " from " + record.topic() + "-" + record.partition() + "@" + record.offset())); running = false; } } consumer.close(); } }
keyPath
: the path to your downloaded OAuth2 credential file. For details about how to get the OAuth2 credential file, see get an OAuth2 credential file.serverUrl
: the broker service URL of your Pulsar cluster. For details, see get a service URL.audience
: theaudience
parameter is a combination of theurn:sn:pulsar
, your organization name, and your Pulsar instance name.
Open another terminal and run a Kafka producer to send a message to the
test-topic
topic.package org.example; import io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.io.IOException; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; /** * An OAuth2 authentication example of Kafka producer to StreamNative Cloud */ public class SNCloudOAuth2Producer { public static void main(String[] args) throws ExecutionException, InterruptedException, IOException { // 1. Create a producer with oauth2 authentication, which is equivalent to SASL/PLAIN mechanism in Kafka final Properties props = new Properties(); // replace these config with your cluster String serverUrl = "your-pulsar-service-url"; String keyPath = "YOUR-KEY-FILE-ABSOLUTE-PATH"; String audience = "YOUR-AUDIENCE-STRING"; props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.setProperty("sasl.login.callback.handler.class", OauthLoginCallbackHandler.class.getName()); props.setProperty("security.protocol", "SASL_SSL"); props.setProperty("sasl.mechanism", "OAUTHBEARER"); final String jaasTemplate = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required" + " oauth.issuer.url=\"%s\"" + " oauth.credentials.url=\"%s\"" + " oauth.audience=\"%s\";"; props.setProperty("sasl.jaas.config", String.format(jaasTemplate, "https://auth.streamnative.cloud/", "file://" + keyPath, audience )); final KafkaProducer<String, String> producer = new KafkaProducer<>(props); // 2. Produce one message final String topicName = "persistent://public/default/test-topic"; final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, "hello")); final RecordMetadata recordMetadata = recordMetadataFuture.get(); System.out.println("Send hello to " + recordMetadata); producer.close(); } }