1. StreamNative Cloud
  2. Connect

Connect to cluster through Kafka Java client

This document assumes that you have created a Pulsar cluster and a service account, and have granted the service account produce and consume permissions to a namespace on your Pulsar cluster. For details, see get started with the Kafka protocol.

This document describes how to connect to your Pulsar cluster using the Kafka Java client through the OAuth2 authentication plugin.

Prerequisites

Steps

  1. Add Maven dependencies.

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.1.0</version>
    </dependency>
    <dependency>
        <groupId>io.streamnative.pulsar.handlers</groupId>
        <artifactId>oauth-client</artifactId>
        <version>2.8.3.1</version>
    </dependency>
    
  2. Open a terminal and run a Kafka consumer to receive a message from the test-topic topic.

    package org.example;
    
    import io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    /**
    * An OAuth2 authentication example of Kafka consumer to StreamNative Cloud
    */
    public class SNCloudOAuth2Consumer {
        public static void main(String[] args) {
            // replace these config with your cluster
            String serverUrl = "your-pulsar-service-url";
            String keyPath = "YOUR-KEY-FILE-ABSOLUTE-PATH";
            String audience = "YOUR-AUDIENCE-STRING";
    
            // 1. Create properties of oauth2 authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
            final Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello-world");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.setProperty("sasl.login.callback.handler.class", OauthLoginCallbackHandler.class.getName());
            props.setProperty("security.protocol", "SASL_SSL");
            props.setProperty("sasl.mechanism", "OAUTHBEARER");
            final String jaasTemplate = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required"
                    + " oauth.issuer.url=\"%s\""
                    + " oauth.credentials.url=\"%s\""
                    + " oauth.audience=\"%s\";";
            props.setProperty("sasl.jaas.config", String.format(jaasTemplate,
                    "https://auth.streamnative.cloud/",
                    "file://" + keyPath,
                    audience
            ));
    
            // 2. Create a consumer
            final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            final String topicName = "persistent://public/default/test-topic";
            consumer.subscribe(Collections.singleton(topicName));
    
            // 2. Consume some messages and quit immediately
            boolean running = true;
            while (running) {
                System.out.println("running");
                final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                if (!records.isEmpty()) {
                    records.forEach(record -> System.out.println("Receive record: " + record.value() + " from "
                            + record.topic() + "-" + record.partition() + "@" + record.offset()));
                    running = false;
                }
            }
            consumer.close();
        }
    }
    
    • keyPath: the path to your downloaded OAuth2 credential file. For details about how to get the OAuth2 credential file, see get an OAuth2 credential file.
    • serverUrl: the broker service URL of your Pulsar cluster. For details, see get a service URL.
    • audience: the audience parameter is a combination of the urn:sn:pulsar, your organization name, and your Pulsar instance name.
  3. Open another terminal and run a Kafka producer to send a message to the test-topic topic.

    package org.example;
    
    import io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.io.IOException;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    /**
    * An OAuth2 authentication example of Kafka producer to StreamNative Cloud
    */
    public class SNCloudOAuth2Producer {
        public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
            // 1. Create a producer with oauth2 authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
            final Properties props = new Properties();
            // replace these config with your cluster
            String serverUrl =
                    "your-pulsar-service-url";
            String keyPath = "YOUR-KEY-FILE-ABSOLUTE-PATH";
            String audience = "YOUR-AUDIENCE-STRING";
    
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.setProperty("sasl.login.callback.handler.class", OauthLoginCallbackHandler.class.getName());
            props.setProperty("security.protocol", "SASL_SSL");
            props.setProperty("sasl.mechanism", "OAUTHBEARER");
            final String jaasTemplate = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required"
                    + " oauth.issuer.url=\"%s\""
                    + " oauth.credentials.url=\"%s\""
                    + " oauth.audience=\"%s\";";
            props.setProperty("sasl.jaas.config", String.format(jaasTemplate,
                    "https://auth.streamnative.cloud/",
                    "file://" + keyPath,
                    audience
            ));
            final KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    
            // 2. Produce one message
            final String topicName = "persistent://public/default/test-topic";
            final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, "hello"));
            final RecordMetadata recordMetadata = recordMetadataFuture.get();
            System.out.println("Send hello to " + recordMetadata);
            producer.close();
        }
    }
    
Previous
Kafka - CLI