1. Build Applications
  2. Kafka Clients

Connect to your cluster using the Kafka Java client

Note

This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account produce and consume permissions to a namespace for the target topic.

This document describes how to connect to a StreamNative cluster through a Kafka Java client, and use the Java producer and consumer to produce and consume messages to and from a topic. The Java client supports connecting to a StreamNative cluster using either OAuth2 or API Keys authentication.

Prerequisites

Connect to your cluster using API keys

This section describes how to connect to your StreamNative cluster using the Kafka Java client with SASL/PLAIN authentication.

Before you begin

Note

  • Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
  • The password for different utilities as kcat will be equal to token:<API KEY>.

You can follow the instructions to create an API key for the service account you choose to use.

Steps

  1. Add Maven dependencies.

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.6.1</version>
    </dependency>
    
  2. Open a terminal and run a Kafka consumer to receive a message from the test-kafka-topic topic.

    package org.example;
    
    import java.io.IOException;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    /**
     * A JWT token authentication example of Kafka consumer to StreamNative Cloud
     */
    public class SNCloudJWTTokenConsumer {
        public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
            BasicConfigurator.configure();
            Logger.getRootLogger().setLevel(Level.INFO);
    
            // replace these configs for your cluster
            String serverUrl = "SERVER-URL";
            String jwtToken = "API-KEY";
            String token = "token:" + jwtToken;
    
            final String topicName = "test-kafka-topic";
            String namespace = "public/default";
    
            final Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello-world");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config", String.format(
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
                    namespace, token));
    
            // Create a consumer
            final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singleton(topicName));
    
            // Consume some messages and quit immediately
            boolean running = true;
            while (running) {
                System.out.println("running");
                final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                if (!records.isEmpty()) {
                    records.forEach(record -> System.out.println("Receive record: " + record.value() + " from "
                            + record.topic() + "-" + record.partition() + "@" + record.offset()));
                    running = false;
                }
            }
            consumer.close();
        }
    }
    
    • SERVER-URL: the Kafka service URL of your StreamNative cluster.
    • API-KEY: an API key of your service account.
  3. Open another terminal and run a Kafka producer to send a message to the test-kafka-topic topic.

    package org.example;
    
    import java.io.IOException;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    /**
     * A JWT token authentication example of Kafka producer to StreamNative Cloud
    */
    public class SNCloudJWTTokenProducer {
        public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
            BasicConfigurator.configure();
            Logger.getRootLogger().setLevel(Level.INFO);
    
            // replace these configs for your cluster
            String serverUrl = "SERVER-URL";
            String jwtToken = "API-KEY";
            String token = "token:" + jwtToken;
            final String topicName = "test-kafka-topic";
            String namespace = "public/default";
    
            // 2. Create a producer with token authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
            final Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config", String.format(
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
                    namespace, token));
    
            // 2. Create a producer
            final KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    
            // 3. Produce one message
            for (int i = 0; i < 5; i++) {
                String value = "hello world";
                final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, value));
                final RecordMetadata recordMetadata = recordMetadataFuture.get();
                System.out.println("Send " + value + " to " + recordMetadata);
            }
            producer.close();
        }
    }
    
    • SERVER-URL: the Kafka service URL of your StreamNative cluster.
    • API-KEY: an API key of your service account.

Connect to your cluster using OAuth2 authentication

This section describes how to connect to your StreamNative cluster using the Kafka Java client with OAuth2 authentication.

Before you begin

  • Get the OAuth2 credential file.

    1. On the left navigation pane, click Service Accounts.
    2. In the row of the service account you want to use, in the Key File column, click the Download icon to download the OAuth2 credential file to your local directory.
  • Get the service URL of your StreamNative cluster.

    1. On the left navigation pane, in the Admin area, click Pulsar Clusters.
    2. Select the Details tab, and in the Access Points area, click Copy at the end of the row of the Kafka Service URL (TCP).

Steps

  1. Add Maven dependencies.

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.4.0</version>
    </dependency>
    <dependency>
        <groupId>io.streamnative.pulsar.handlers</groupId>
        <artifactId>oauth-client</artifactId>
        <version>3.1.0.1</version>
    </dependency>
    
  2. Open a terminal and run a Kafka consumer to receive a message from the test-topic topic.

    package org.example;
    
    import io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    /**
    * An OAuth2 authentication example of Kafka consumer to StreamNative Cloud
    */
    public class SNCloudOAuth2Consumer {
        public static void main(String[] args) {
            // replace these configs with your cluster
            String serverUrl = "YOUR-KAFKA-SERVICE-URL";
            String keyPath = "YOUR-KEY-FILE-ABSOLUTE-PATH";
            String audience = "YOUR-AUDIENCE-STRING";
    
            // 1. Create properties of oauth2 authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
            final Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello-world");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.setProperty("sasl.login.callback.handler.class", OauthLoginCallbackHandler.class.getName());
            props.setProperty("security.protocol", "SASL_SSL");
            props.setProperty("sasl.mechanism", "OAUTHBEARER");
            final String jaasTemplate = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required"
                    + " oauth.issuer.url=\"%s\""
                    + " oauth.credentials.url=\"%s\""
                    + " oauth.audience=\"%s\";";
            props.setProperty("sasl.jaas.config", String.format(jaasTemplate,
                    "https://auth.streamnative.cloud/",
                    "file://" + keyPath,
                    audience
            ));
    
            // 2. Create a consumer
            final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            final String topicName = "test-topic";
            consumer.subscribe(Collections.singleton(topicName));
    
            // 2. Consume some messages and quit immediately
            boolean running = true;
            while (running) {
                System.out.println("running");
                final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                if (!records.isEmpty()) {
                    records.forEach(record -> System.out.println("Receive record: " + record.value() + " from "
                            + record.topic() + "-" + record.partition() + "@" + record.offset()));
                    running = false;
                }
            }
            consumer.close();
        }
    }
    
    • YOUR-KEY-FILE-ABSOLUTE-PATH: the path to your downloaded OAuth2 credential file.
    • YOUR-KAFKA-SERVICE-URL: the Kafka service URL of your StreamNative cluster.
    • YOUR-AUDIENCE-STRING: the audience parameter is a combination of the urn:sn:pulsar, your organization name, and your Pulsar instance name.
  3. Open another terminal and run a Kafka producer to send a message to the test-topic topic.

    package org.example;
    
    import io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.io.IOException;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    /**
    * An OAuth2 authentication example of Kafka producer to StreamNative Cloud
    */
    public class SNCloudOAuth2Producer {
        public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
            // 1. Create a producer with oauth2 authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
            final Properties props = new Properties();
            // replace these configs with your cluster
            String serverUrl = "YOUR-KAFKA-SERVICE-URL";
            String keyPath = "YOUR-KEY-FILE-ABSOLUTE-PATH";
            String audience = "YOUR-AUDIENCE-STRING";
    
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.setProperty("sasl.login.callback.handler.class", OauthLoginCallbackHandler.class.getName());
            props.setProperty("security.protocol", "SASL_SSL");
            props.setProperty("sasl.mechanism", "OAUTHBEARER");
            final String jaasTemplate = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required"
                    + " oauth.issuer.url=\"%s\""
                    + " oauth.credentials.url=\"%s\""
                    + " oauth.audience=\"%s\";";
            props.setProperty("sasl.jaas.config", String.format(jaasTemplate,
                    "https://auth.streamnative.cloud/",
                    "file://" + keyPath,
                    audience
            ));
            final KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    
            // 2. Produce one message
            final String topicName = "test-topic";
            final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, "hello"));
            final RecordMetadata recordMetadata = recordMetadataFuture.get();
            System.out.println("Send hello to " + recordMetadata);
            producer.close();
        }
    }
    
    • YOUR-KEY-FILE-ABSOLUTE-PATH: the path to your downloaded OAuth2 credential file.
    • YOUR-KAFKA-SERVICE-URL: the Kafka service URL of your StreamNative cluster.
    • YOUR-AUDIENCE-STRING: the audience parameter is a combination of the urn:sn:pulsar, your organization name, and your Pulsar instance name.
Previous
Multi-Tenancy