Connect to your cluster using the Kafka Java client

Note

This QuickStart assumes that you have created a Pulsar cluster with the Kafka protocol enabled, created a service account, and granted the service account produce and consume permissions to a namespace for the target topic.

This document describes how to connect to a Pulsar cluster through a Kafka Java client, and use the Java producer and consumer to produce and consume messages to and from a topic. The Java client supports connecting to a Pulsar cluster through either OAuth2 or Token authentication.

Prerequisites

Connect to your cluster through OAuth2 authentication

This section describes how to connect to your Pulsar cluster using the Kafka Java client through OAuth2 authentication.

Before you begin

  • Get the OAuth2 credential file.

    1. On the left navigation pane, click Service Accounts.
    2. In the row of the service account you want to use, in the Key File column, click the Download icon to download the OAuth2 credential file to your local directory.
  • Get the service URL of your Pulsar cluster.

    1. On the left navigation pane, in the Admin area, click Pulsar Clusters.
    2. Select the Details tab, and in the Access Points area, click Copy at the end of the row of the Kafka Service URL (TCP).

Steps

  1. Add Maven dependencies.

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.4.0</version>
    </dependency>
    <dependency>
        <groupId>io.streamnative.pulsar.handlers</groupId>
        <artifactId>oauth-client</artifactId>
        <version>3.1.0.1</version>
    </dependency>
    
  2. Open a terminal and run a Kafka consumer to receive a message from the test-topic topic.

    package org.example;
    
    import io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    /**
    * An OAuth2 authentication example of Kafka consumer to StreamNative Cloud
    */
    public class SNCloudOAuth2Consumer {
        public static void main(String[] args) {
            // replace these configs with your cluster
            String serverUrl = "your-pulsar-service-url";
            String keyPath = "YOUR-KEY-FILE-ABSOLUTE-PATH";
            String audience = "YOUR-AUDIENCE-STRING";
    
            // 1. Create properties of oauth2 authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
            final Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello-world");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.setProperty("sasl.login.callback.handler.class", OauthLoginCallbackHandler.class.getName());
            props.setProperty("security.protocol", "SASL_SSL");
            props.setProperty("sasl.mechanism", "OAUTHBEARER");
            final String jaasTemplate = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required"
                    + " oauth.issuer.url=\"%s\""
                    + " oauth.credentials.url=\"%s\""
                    + " oauth.audience=\"%s\";";
            props.setProperty("sasl.jaas.config", String.format(jaasTemplate,
                    "https://auth.streamnative.cloud/",
                    "file://" + keyPath,
                    audience
            ));
    
            // 2. Create a consumer
            final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            final String topicName = "test-topic";
            consumer.subscribe(Collections.singleton(topicName));
    
            // 2. Consume some messages and quit immediately
            boolean running = true;
            while (running) {
                System.out.println("running");
                final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                if (!records.isEmpty()) {
                    records.forEach(record -> System.out.println("Receive record: " + record.value() + " from "
                            + record.topic() + "-" + record.partition() + "@" + record.offset()));
                    running = false;
                }
            }
            consumer.close();
        }
    }
    
    • keyPath: the path to your downloaded OAuth2 credential file.
    • serverUrl: the Kafka service URL of your Pulsar cluster.
    • audience: the audience parameter is a combination of the urn:sn:pulsar, your organization name, and your Pulsar instance name.
  3. Open another terminal and run a Kafka producer to send a message to the test-topic topic.

    package org.example;
    
    import io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.io.IOException;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    /**
    * An OAuth2 authentication example of Kafka producer to StreamNative Cloud
    */
    public class SNCloudOAuth2Producer {
        public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
            // 1. Create a producer with oauth2 authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
            final Properties props = new Properties();
            // replace these configs with your cluster
            String serverUrl = "your-pulsar-service-url";
            String keyPath = "YOUR-KEY-FILE-ABSOLUTE-PATH";
            String audience = "YOUR-AUDIENCE-STRING";
    
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.setProperty("sasl.login.callback.handler.class", OauthLoginCallbackHandler.class.getName());
            props.setProperty("security.protocol", "SASL_SSL");
            props.setProperty("sasl.mechanism", "OAUTHBEARER");
            final String jaasTemplate = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required"
                    + " oauth.issuer.url=\"%s\""
                    + " oauth.credentials.url=\"%s\""
                    + " oauth.audience=\"%s\";";
            props.setProperty("sasl.jaas.config", String.format(jaasTemplate,
                    "https://auth.streamnative.cloud/",
                    "file://" + keyPath,
                    audience
            ));
            final KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    
            // 2. Produce one message
            final String topicName = "test-topic";
            final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, "hello"));
            final RecordMetadata recordMetadata = recordMetadataFuture.get();
            System.out.println("Send hello to " + recordMetadata);
            producer.close();
        }
    }
    

Connect to your cluster through Token authentication

This section describes how to connect to your Pulsar cluster using the Kafka Java client through Token authentication.

Before you begin

Note

  • Before getting the token of a service account, verify that the service account is authorized as a superuser or an admin of the tenants and namespaces.
  • A token has a system-defined Time-To-Live (TTL) of 7 days. Before a token expires, ensure that you generate a new token for your service account.
  • The password for different utilities as kcat will be equal to token:TOKEN
  • Get the JWT token.

    1. On the left navigation pane, click Service Accounts.

    2. In the row of the service account you want to use, in the Token column, click Generate new token, then click the Copy icon to copy the token to your clipboard.

  • Get the service URL of your Pulsar cluster.

    1. On the left navigation pane, in the Admin area, click Pulsar Clusters.
    2. Select the Details tab, and in the Access Points area, click Copy at the end of the row of the Kafka Service URL (TCP).

Steps

  1. Add Maven dependencies.

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.4.0</version>
    </dependency>
    
  2. Open a terminal and run a Kafka consumer to receive a message from the test-kafka-topic topic.

    package org.example;
    
    import java.io.IOException;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    /**
     * A JWT token authentication example of Kafka consumer to StreamNative Cloud
     */
    public class SNCloudJWTTokenConsumer {
        public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
            BasicConfigurator.configure();
            Logger.getRootLogger().setLevel(Level.INFO);
    
            // replace these configs for your cluster
            String serverUrl = "SERVER-URL";
            String jwtToken = "YOUR-TOKEN";
            String token = "token:" + jwtToken;
    
            final String topicName = "test-kafka-topic";
            String namespace = "public/default";
    
            final Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello-world");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config", String.format(
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
                    namespace, token));
    
            // Create a consumer
            final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singleton(topicName));
    
            // Consume some messages and quit immediately
            boolean running = true;
            while (running) {
                System.out.println("running");
                final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                if (!records.isEmpty()) {
                    records.forEach(record -> System.out.println("Receive record: " + record.value() + " from "
                            + record.topic() + "-" + record.partition() + "@" + record.offset()));
                    running = false;
                }
            }
            consumer.close();
        }
    }
    
    • serverUrl: the Kafka service URL of your Pulsar cluster.
    • jwtToken: the token of your service account.
  3. Open another terminal and run a Kafka producer to send a message to the test-kafka-topic topic.

    package org.example;
    
    import java.io.IOException;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    /**
     * A JWT token authentication example of Kafka producer to StreamNative Cloud
    */
    public class SNCloudJWTTokenProducer {
        public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
            BasicConfigurator.configure();
            Logger.getRootLogger().setLevel(Level.INFO);
    
            // replace these configs for your cluster
            String serverUrl = "SERVER-URL";
            String jwtToken = "YOUR-TOKEN";
            String token = "token:" + jwtToken;
            final String topicName = "test-kafka-topic";
            String namespace = "public/default";
    
            // 2. Create a producer with token authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
            final Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config", String.format(
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
                    namespace, token));
    
            // 2. Create a producer
            final KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    
            // 3. Produce one message
            for (int i = 0; i < 5; i++) {
                String value = "hello world";
                final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, value));
                final RecordMetadata recordMetadata = recordMetadataFuture.get();
                System.out.println("Send " + value + " to " + recordMetadata);
            }
            producer.close();
        }
    }
    
Previous
Multi-Tenancy