> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Connect to your cluster using the Kafka Java client

<Note title="Note">
  This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account `produce` and `consume` permissions to a namespace for the target topic.
</Note>

This document describes how to connect to a StreamNative cluster through a Kafka Java client, and use the Java producer and consumer to produce and consume messages to and from a topic. The Java client supports connecting to a StreamNative cluster using either [OAuth2](#use-oauth2) or [API Keys](/cloud/security/authentication/service-accounts/use-api-keys/api-keys-overview#kafka-clients) authentication.

## Prerequisites

* Install [Java 17](https://www.oracle.com/java/technologies/downloads/#java17). For details, see [overview of JDK installation](https://docs.oracle.com/en/java/javase/17/install/overview-jdk-installation.html).

* Install Maven. For details, see [installing Apache Maven](https://maven.apache.org/install.html).

## Connect to your cluster using API keys

<span id="use-apikeys" />

This section describes how to connect to your StreamNative cluster using the Kafka Java client with [SASL/PLAIN authentication](/cloud/security/authentication/service-accounts/use-api-keys/api-keys-overview#kafka-clients).

### Before you begin

<Note title="Note">
  * Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
  * The password for different utilities as `kcat` will be equal to `token:<API KEY>`.
</Note>

You can follow the instructions to [create an API key](/cloud/security/authentication/service-accounts/use-api-keys/api-keys-overview#using-api-keys-to-connect-to-your-cluster) for the service account you choose to use.

### Steps

1. Add Maven dependencies.

   ```xml theme={null}
   <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
       <version>3.6.1</version>
   </dependency>
   ```

2. Open a terminal and run a Kafka consumer to receive a message from the `test-kafka-topic` topic.

   ```java theme={null}
   package org.example;

   import java.io.IOException;
   import java.time.Duration;
   import java.util.Collections;
   import java.util.Properties;
   import java.util.concurrent.ExecutionException;
   import java.util.concurrent.Future;
   import org.apache.kafka.clients.CommonClientConfigs;
   import org.apache.kafka.clients.consumer.ConsumerConfig;
   import org.apache.kafka.clients.consumer.ConsumerRecords;
   import org.apache.kafka.clients.consumer.KafkaConsumer;
   import org.apache.kafka.clients.producer.KafkaProducer;
   import org.apache.kafka.clients.producer.ProducerConfig;
   import org.apache.kafka.clients.producer.ProducerRecord;
   import org.apache.kafka.clients.producer.RecordMetadata;
   import org.apache.kafka.common.serialization.StringDeserializer;
   import org.apache.kafka.common.serialization.StringSerializer;

   /**
    * A JWT token authentication example of Kafka consumer to StreamNative Cloud
    */
   public class SNCloudJWTTokenConsumer {
       public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
           BasicConfigurator.configure();
           Logger.getRootLogger().setLevel(Level.INFO);

           // replace these configs for your cluster
           String serverUrl = "SERVER-URL";
           String jwtToken = "API-KEY";
           String token = "token:" + jwtToken;

           final String topicName = "test-kafka-topic";
           String namespace = "public/default";

           final Properties props = new Properties();
           props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
           props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
           props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
           props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
           props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
           props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello-world");
           props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
           props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
           props.put("sasl.mechanism", "PLAIN");
           props.put("sasl.jaas.config", String.format(
                   "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
                   namespace, token));

           // Create a consumer
           final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
           consumer.subscribe(Collections.singleton(topicName));

           // Consume some messages and quit immediately
           boolean running = true;
           while (running) {
               System.out.println("running");
               final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
               if (!records.isEmpty()) {
                   records.forEach(record -> System.out.println("Receive record: " + record.value() + " from "
                           + record.topic() + "-" + record.partition() + "@" + record.offset()));
                   running = false;
               }
           }
           consumer.close();
       }
   }
   ```

   * `SERVER-URL`: the Kafka service URL of your StreamNative cluster.
   * `API-KEY`: an API key of your service account.

3. Open another terminal and run a Kafka producer to send a message to the `test-kafka-topic` topic.

   ```java theme={null}
   package org.example;

   import java.io.IOException;
   import java.util.Properties;
   import java.util.concurrent.ExecutionException;
   import java.util.concurrent.Future;
   import org.apache.kafka.clients.CommonClientConfigs;
   import org.apache.kafka.clients.producer.KafkaProducer;
   import org.apache.kafka.clients.producer.ProducerConfig;
   import org.apache.kafka.clients.producer.ProducerRecord;
   import org.apache.kafka.clients.producer.RecordMetadata;
   import org.apache.kafka.common.serialization.StringDeserializer;
   import org.apache.kafka.common.serialization.StringSerializer;

   /**
    * A JWT token authentication example of Kafka producer to StreamNative Cloud
   */
   public class SNCloudJWTTokenProducer {
       public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
           BasicConfigurator.configure();
           Logger.getRootLogger().setLevel(Level.INFO);

           // replace these configs for your cluster
           String serverUrl = "SERVER-URL";
           String jwtToken = "API-KEY";
           String token = "token:" + jwtToken;
           final String topicName = "test-kafka-topic";
           String namespace = "public/default";

           // 2. Create a producer with token authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
           final Properties props = new Properties();
           props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
           props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
           props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
           props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
           props.put("sasl.mechanism", "PLAIN");
           props.put("sasl.jaas.config", String.format(
                   "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
                   namespace, token));

           // 2. Create a producer
           final KafkaProducer<String, String> producer = new KafkaProducer<>(props);

           // 3. Produce one message
           for (int i = 0; i < 5; i++) {
               String value = "hello world";
               final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, value));
               final RecordMetadata recordMetadata = recordMetadataFuture.get();
               System.out.println("Send " + value + " to " + recordMetadata);
           }
           producer.close();
       }
   }
   ```

   * `SERVER-URL`: the Kafka service URL of your StreamNative cluster.
   * `API-KEY`: an API key of your service account.

## Connect to your cluster using OAuth2 authentication

<span id="use-oauth2" />

This section describes how to connect to your StreamNative cluster using the Kafka Java client with OAuth2 authentication.

### Before you begin

* Get the OAuth2 credential file.

  1. On the left navigation pane, click **Service Accounts**.
  2. In the row of the service account you want to use, in the **Key File** column, click the **Download** icon to download the OAuth2 credential file to your local directory.

* Get the service URL of your StreamNative cluster.

  1. On the left navigation pane, in the **Admin** area, click **Pulsar Clusters**.
  2. Select the **Details** tab, and in the **Access Points** area, click **Copy** at the end of the row of the **Kafka Service URL (TCP)**.

### Steps

1. Add Maven dependencies.

   ```xml theme={null}
   <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
       <version>3.4.0</version>
   </dependency>
   <dependency>
       <groupId>io.streamnative.pulsar.handlers</groupId>
       <artifactId>oauth-client</artifactId>
       <version>3.1.0.1</version>
   </dependency>
   ```

2. Open a terminal and run a Kafka consumer to receive a message from the `test-topic` topic.

   ```java theme={null}
   package org.example;

   import io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler;
   import org.apache.kafka.clients.consumer.ConsumerConfig;
   import org.apache.kafka.clients.consumer.ConsumerRecords;
   import org.apache.kafka.clients.consumer.KafkaConsumer;
   import org.apache.kafka.common.serialization.StringDeserializer;
   import java.time.Duration;
   import java.util.Collections;
   import java.util.Properties;

   /**
   * An OAuth2 authentication example of Kafka consumer to StreamNative Cloud
   */
   public class SNCloudOAuth2Consumer {
       public static void main(String[] args) {
           // replace these configs with your cluster
           String serverUrl = "YOUR-KAFKA-SERVICE-URL";
           String keyPath = "YOUR-KEY-FILE-ABSOLUTE-PATH";
           String audience = "YOUR-AUDIENCE-STRING";

           // 1. Create properties of oauth2 authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
           final Properties props = new Properties();
           props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
           props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
           props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
           props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
           props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
           props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello-world");
           props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
           props.setProperty("sasl.login.callback.handler.class", OauthLoginCallbackHandler.class.getName());
           props.setProperty("security.protocol", "SASL_SSL");
           props.setProperty("sasl.mechanism", "OAUTHBEARER");
           final String jaasTemplate = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required"
                   + " oauth.issuer.url=\"%s\""
                   + " oauth.credentials.url=\"%s\""
                   + " oauth.audience=\"%s\";";
           props.setProperty("sasl.jaas.config", String.format(jaasTemplate,
                   "https://auth.streamnative.cloud/",
                   "file://" + keyPath,
                   audience
           ));

           // 2. Create a consumer
           final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
           final String topicName = "test-topic";
           consumer.subscribe(Collections.singleton(topicName));

           // 2. Consume some messages and quit immediately
           boolean running = true;
           while (running) {
               System.out.println("running");
               final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
               if (!records.isEmpty()) {
                   records.forEach(record -> System.out.println("Receive record: " + record.value() + " from "
                           + record.topic() + "-" + record.partition() + "@" + record.offset()));
                   running = false;
               }
           }
           consumer.close();
       }
   }
   ```

   * `YOUR-KEY-FILE-ABSOLUTE-PATH`: the path to your downloaded OAuth2 credential file.
   * `YOUR-KAFKA-SERVICE-URL`: the Kafka service URL of your StreamNative cluster.
   * `YOUR-AUDIENCE-STRING`: the `audience` parameter is a combination of the `urn:sn:pulsar`, your organization name, and your Pulsar instance name.

3. Open another terminal and run a Kafka producer to send a message to the `test-topic` topic.

   ```java theme={null}
   package org.example;

   import io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler;
   import org.apache.kafka.clients.producer.KafkaProducer;
   import org.apache.kafka.clients.producer.ProducerConfig;
   import org.apache.kafka.clients.producer.ProducerRecord;
   import org.apache.kafka.clients.producer.RecordMetadata;
   import org.apache.kafka.common.serialization.StringSerializer;
   import java.io.IOException;
   import java.util.Properties;
   import java.util.concurrent.ExecutionException;
   import java.util.concurrent.Future;

   /**
   * An OAuth2 authentication example of Kafka producer to StreamNative Cloud
   */
   public class SNCloudOAuth2Producer {
       public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
           // 1. Create a producer with oauth2 authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
           final Properties props = new Properties();
           // replace these configs with your cluster
           String serverUrl = "YOUR-KAFKA-SERVICE-URL";
           String keyPath = "YOUR-KEY-FILE-ABSOLUTE-PATH";
           String audience = "YOUR-AUDIENCE-STRING";

           props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
           props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
           props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
           props.setProperty("sasl.login.callback.handler.class", OauthLoginCallbackHandler.class.getName());
           props.setProperty("security.protocol", "SASL_SSL");
           props.setProperty("sasl.mechanism", "OAUTHBEARER");
           final String jaasTemplate = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required"
                   + " oauth.issuer.url=\"%s\""
                   + " oauth.credentials.url=\"%s\""
                   + " oauth.audience=\"%s\";";
           props.setProperty("sasl.jaas.config", String.format(jaasTemplate,
                   "https://auth.streamnative.cloud/",
                   "file://" + keyPath,
                   audience
           ));
           final KafkaProducer<String, String> producer = new KafkaProducer<>(props);

           // 2. Produce one message
           final String topicName = "test-topic";
           final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, "hello"));
           final RecordMetadata recordMetadata = recordMetadataFuture.get();
           System.out.println("Send hello to " + recordMetadata);
           producer.close();
       }
   }
   ```

   * `YOUR-KEY-FILE-ABSOLUTE-PATH`: the path to your downloaded OAuth2 credential file.
   * `YOUR-KAFKA-SERVICE-URL`: the Kafka service URL of your StreamNative cluster.
   * `YOUR-AUDIENCE-STRING`: the `audience` parameter is a combination of the `urn:sn:pulsar`, your organization name, and your Pulsar instance name.
