> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Connect to your cluster using the Kafka Stream

<Note title="Note">
  This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account `produce` and `consume` permissions to a namespace for the target topic.

  If you are using a [Ursa-Engine](/cloud/overview/data-streaming-engine) powered cluster, please note that KStreams and KSQLDB support in Ursa Engine has certain limitations. It doesn't support functionalities that require transactions and topic compaction.
</Note>

This document describes how to connect to your StreamNative cluster using the [Kafka Stream](https://kafka.apache.org/documentation/streams/) with OAuth2 authentication.

## Before you begin

* Get the OAuth2 credential file.

  1. On the left navigation pane, click **Service Accounts**.
  2. In the row of the service account you want to use, in the **Key File** column, click the **Download** icon to download the OAuth2 credential file to your local directory.

* Get the service URL of your StreamNative cluster.

  1. On the left navigation pane, in the **Admin** area, click **Pulsar Clusters**.
  2. Select the **Details** tab, and in the **Access Points** area, click **Copy** at the end of the row of the **Kafka Service URL (TCP)**.

- Create two topics for the kafka Stream application, named `<kafka-stream-application-name>-counts-store-repartition` and `<kafka-stream-application-name>-counts-store-changelog`.

## Steps

1. Add the Kafka Stream and OAuth Maven dependencies.

   ```xml theme={null}
   <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
       <version>3.4.0</version>
   </dependency>
   <dependency>
       <groupId>io.streamnative.pulsar.handlers</groupId>
       <artifactId>oauth-client</artifactId>
       <version>3.1.0.1</version>
   </dependency>

   <!-- Optional: add an SLF4J dependency if you want to see the log output -->
   <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-log4j12</artifactId>
       <version>1.7.30</version>
   </dependency>
   ```

2. (Optional) Add the [RocksDB](https://rocksdb.org/) dependency if you encounter the `java.lang.UnsatisfiedLinkError` error when starting the Kafka Stream application.

   ```xml theme={null}
   <dependency>
       <groupId>org.rocksdb</groupId>
       <artifactId>rocksdbjni</artifactId>
       <version>7.0.3</version>
   </dependency>
   ```

3. Build a Kafka Stream application.

   This example builds a Kafka Stream application named `wordcount-application`.

   ```java theme={null}
   import io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler;
   import java.util.concurrent.CountDownLatch;
   import org.apache.kafka.common.serialization.Serdes;
   import org.apache.kafka.common.utils.Bytes;
   import org.apache.kafka.streams.KafkaStreams;
   import org.apache.kafka.streams.StreamsBuilder;
   import org.apache.kafka.streams.StreamsConfig;
   import org.apache.kafka.streams.kstream.KStream;
   import org.apache.kafka.streams.kstream.KTable;
   import org.apache.kafka.streams.kstream.Materialized;
   import org.apache.kafka.streams.state.KeyValueStore;
   import org.apache.kafka.clients.CommonClientConfigs;

   import java.util.Arrays;
   import java.util.Properties;
   import org.apache.log4j.BasicConfigurator;
   import org.apache.log4j.Level;
   import org.apache.log4j.Logger;

   public class SNWordCountApplication {

       public static void main(final String[] args) {

           BasicConfigurator.configure();
           Logger.getRootLogger().setLevel(Level.INFO);

           // Step 1: replace with your configurations
           String serverUrl = "SERVER-URL";
           String keyPath = "YOUR-KEY-FILE-PATH";
           String audience = "YOUR-AUDIENCE-STRING";

           // Step 2: create Kafka Stream properties
           Properties props = new Properties();
           // stream application name
           props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
           props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
           props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
           props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

           // OAuth config
           props.setProperty("sasl.login.callback.handler.class", OauthLoginCallbackHandler.class.getName());
           props.setProperty("security.protocol", "SASL_SSL");
           props.setProperty("sasl.mechanism", "OAUTHBEARER");
           final String jaasTemplate = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required"
                   + " oauth.issuer.url=\"%s\""
                   + " oauth.credentials.url=\"%s\""
                   + " oauth.audience=\"%s\";";
           props.setProperty("sasl.jaas.config", String.format(jaasTemplate,
                   "https://auth.streamnative.cloud/",
                   "file://" + keyPath,
                   audience
           ));

           // Step 3: build the Kafka Stream process
           String inputTopic = "TextLinesTopic";

           StreamsBuilder builder = new StreamsBuilder();
           KStream<String, String> textLines = builder.stream(inputTopic);
           KTable<String, Long> wordCounts = textLines
                   .flatMapValues(textLine -> {
                       System.out.println("stream application receive: " + textLine);
                       return Arrays.asList(textLine.toLowerCase().split("\\W+"));
                   })
                   .groupBy((key, word) -> word)
                   .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
           wordCounts.toStream()
                   .foreach((word, count) -> System.out.println("word: " + word + " -> " + count));

           KafkaStreams streams = new KafkaStreams(builder.build(), props);

           final CountDownLatch latch = new CountDownLatch(1);
           Runtime.getRuntime().addShutdownHook(new Thread("stream") {
               @Override
               public void run() {
                   streams.close();
                   latch.countDown();
               }
           });

           try {
               // Step 4: start the Kafka Stream
               streams.start();
               latch.await();
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           System.exit(0);
       }
   }
   ```

   * `serverUrl`: the Kafka service URL of your StreamNative cluster.
   * `keyPath`: the path to your downloaded OAuth2 credential file.
   * `audience`: the `audience` parameter is a combination of the `urn:sn:pulsar`, your organization name, and your Pulsar instance name.

4. Run the Kafka Stream application to check the connectivity.

   1. Open another terminal and use the Kafka CLI tool to send some messages to the `TextLinesTopic` topic.

      ```bash theme={null}
      ./bin/kafka-console-producer.sh \
          --bootstrap-server `SERVER-URL` \
          --producer.config ./kafka.properties \
          --topic TextLinesTopic
      ```

   2. Type some texts.

      ```bash theme={null}
      hello world
      hello world hello world
      ```

   Then, you should see the following input:

   ```bash theme={null}
   stream application receive: hello world
   stream application receive: hello world hello world
   word: hello -> 3
   word: world -> 3
   ```
