1. Process Data Streams
  2. Kafka Streams & KSQL

Connect to your cluster using the Kafka Stream

Note

This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account produce and consume permissions to a namespace for the target topic.

If you are using a Ursa-Engine powered cluster, please note that KStreams and KSQLDB support in Ursa Engine has certain limitations. It doesn't support functionalities that require transactions and topic compaction.

This document describes how to connect to your StreamNative cluster using the Kafka Stream with OAuth2 authentication.

Before you begin

  • Get the OAuth2 credential file.

    1. On the left navigation pane, click Service Accounts.
    2. In the row of the service account you want to use, in the Key File column, click the Download icon to download the OAuth2 credential file to your local directory.
  • Get the service URL of your StreamNative cluster.

    1. On the left navigation pane, in the Admin area, click Pulsar Clusters.
    2. Select the Details tab, and in the Access Points area, click Copy at the end of the row of the Kafka Service URL (TCP).
  • Create two topics for the kafka Stream application, named <kafka-stream-application-name>-counts-store-repartition and <kafka-stream-application-name>-counts-store-changelog.

Steps

  1. Add the Kafka Stream and OAuth Maven dependencies.

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.4.0</version>
    </dependency>
    <dependency>
        <groupId>io.streamnative.pulsar.handlers</groupId>
        <artifactId>oauth-client</artifactId>
        <version>3.1.0.1</version>
    </dependency>
    
    <!-- Optional: add an SLF4J dependency if you want to see the log output -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.30</version>
    </dependency>
    
  2. (Optional) Add the RocksDB dependency if you encounter the java.lang.UnsatisfiedLinkError error when starting the Kafka Stream application.

    <dependency>
        <groupId>org.rocksdb</groupId>
        <artifactId>rocksdbjni</artifactId>
        <version>7.0.3</version>
    </dependency>
    
  3. Build a Kafka Stream application.

    This example builds a Kafka Stream application named wordcount-application.

    import io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler;
    import java.util.concurrent.CountDownLatch;
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.common.utils.Bytes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.kstream.KStream;
    import org.apache.kafka.streams.kstream.KTable;
    import org.apache.kafka.streams.kstream.Materialized;
    import org.apache.kafka.streams.state.KeyValueStore;
    import org.apache.kafka.clients.CommonClientConfigs;
    
    import java.util.Arrays;
    import java.util.Properties;
    import org.apache.log4j.BasicConfigurator;
    import org.apache.log4j.Level;
    import org.apache.log4j.Logger;
    
    public class SNWordCountApplication {
    
        public static void main(final String[] args) {
    
            BasicConfigurator.configure();
            Logger.getRootLogger().setLevel(Level.INFO);
    
            // Step 1: replace with your configurations
            String serverUrl = "SERVER-URL";
            String keyPath = "YOUR-KEY-FILE-PATH";
            String audience = "YOUR-AUDIENCE-STRING";
    
            // Step 2: create Kafka Stream properties
            Properties props = new Properties();
            // stream application name
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    
            // OAuth config
            props.setProperty("sasl.login.callback.handler.class", OauthLoginCallbackHandler.class.getName());
            props.setProperty("security.protocol", "SASL_SSL");
            props.setProperty("sasl.mechanism", "OAUTHBEARER");
            final String jaasTemplate = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required"
                    + " oauth.issuer.url=\"%s\""
                    + " oauth.credentials.url=\"%s\""
                    + " oauth.audience=\"%s\";";
            props.setProperty("sasl.jaas.config", String.format(jaasTemplate,
                    "https://auth.streamnative.cloud/",
                    "file://" + keyPath,
                    audience
            ));
    
            // Step 3: build the Kafka Stream process
            String inputTopic = "TextLinesTopic";
    
            StreamsBuilder builder = new StreamsBuilder();
            KStream<String, String> textLines = builder.stream(inputTopic);
            KTable<String, Long> wordCounts = textLines
                    .flatMapValues(textLine -> {
                        System.out.println("stream application receive: " + textLine);
                        return Arrays.asList(textLine.toLowerCase().split("\\W+"));
                    })
                    .groupBy((key, word) -> word)
                    .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
            wordCounts.toStream()
                    .foreach((word, count) -> System.out.println("word: " + word + " -> " + count));
    
            KafkaStreams streams = new KafkaStreams(builder.build(), props);
    
            final CountDownLatch latch = new CountDownLatch(1);
            Runtime.getRuntime().addShutdownHook(new Thread("stream") {
                @Override
                public void run() {
                    streams.close();
                    latch.countDown();
                }
            });
    
            try {
                // Step 4: start the Kafka Stream
                streams.start();
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.exit(0);
        }
    }
    
    • serverUrl: the Kafka service URL of your StreamNative cluster.
    • keyPath: the path to your downloaded OAuth2 credential file.
    • audience: the audience parameter is a combination of the urn:sn:pulsar, your organization name, and your Pulsar instance name.
  4. Run the Kafka Stream application to check the connectivity.

    1. Open another terminal and use the Kafka CLI tool to send some messages to the TextLinesTopic topic.

      ./bin/kafka-console-producer.sh \
          --bootstrap-server `SERVER-URL` \
          --producer.config ./kafka.properties \
          --topic TextLinesTopic
      
    2. Type some texts.

      hello world
      hello world hello world
      

    Then, you should see the following input:

    stream application receive: hello world
    stream application receive: hello world hello world
    word: hello -> 3
    word: world -> 3
    
Previous
Overview