- Process Data Streams
- Kafka Streams & KSQL
Connect to your cluster using the Kafka Stream
Note
This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account produce
and consume
permissions to a namespace for the target topic.
If you are using a Ursa-Engine powered cluster, please note that KStreams and KSQLDB support in Ursa Engine has certain limitations. It doesn't support functionalities that require transactions and topic compaction.
This document describes how to connect to your StreamNative cluster using the Kafka Stream with OAuth2 authentication.
Before you begin
Get the OAuth2 credential file.
- On the left navigation pane, click Service Accounts.
- In the row of the service account you want to use, in the Key File column, click the Download icon to download the OAuth2 credential file to your local directory.
Get the service URL of your StreamNative cluster.
- On the left navigation pane, in the Admin area, click Pulsar Clusters.
- Select the Details tab, and in the Access Points area, click Copy at the end of the row of the Kafka Service URL (TCP).
- Create two topics for the kafka Stream application, named
<kafka-stream-application-name>-counts-store-repartition
and<kafka-stream-application-name>-counts-store-changelog
.
Steps
Add the Kafka Stream and OAuth Maven dependencies.
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.4.0</version> </dependency> <dependency> <groupId>io.streamnative.pulsar.handlers</groupId> <artifactId>oauth-client</artifactId> <version>3.1.0.1</version> </dependency> <!-- Optional: add an SLF4J dependency if you want to see the log output --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.30</version> </dependency>
(Optional) Add the RocksDB dependency if you encounter the
java.lang.UnsatisfiedLinkError
error when starting the Kafka Stream application.<dependency> <groupId>org.rocksdb</groupId> <artifactId>rocksdbjni</artifactId> <version>7.0.3</version> </dependency>
Build a Kafka Stream application.
This example builds a Kafka Stream application named
wordcount-application
.import io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler; import java.util.concurrent.CountDownLatch; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.clients.CommonClientConfigs; import java.util.Arrays; import java.util.Properties; import org.apache.log4j.BasicConfigurator; import org.apache.log4j.Level; import org.apache.log4j.Logger; public class SNWordCountApplication { public static void main(final String[] args) { BasicConfigurator.configure(); Logger.getRootLogger().setLevel(Level.INFO); // Step 1: replace with your configurations String serverUrl = "SERVER-URL"; String keyPath = "YOUR-KEY-FILE-PATH"; String audience = "YOUR-AUDIENCE-STRING"; // Step 2: create Kafka Stream properties Properties props = new Properties(); // stream application name props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // OAuth config props.setProperty("sasl.login.callback.handler.class", OauthLoginCallbackHandler.class.getName()); props.setProperty("security.protocol", "SASL_SSL"); props.setProperty("sasl.mechanism", "OAUTHBEARER"); final String jaasTemplate = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required" + " oauth.issuer.url=\"%s\"" + " oauth.credentials.url=\"%s\"" + " oauth.audience=\"%s\";"; props.setProperty("sasl.jaas.config", String.format(jaasTemplate, "https://auth.streamnative.cloud/", "file://" + keyPath, audience )); // Step 3: build the Kafka Stream process String inputTopic = "TextLinesTopic"; StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> textLines = builder.stream(inputTopic); KTable<String, Long> wordCounts = textLines .flatMapValues(textLine -> { System.out.println("stream application receive: " + textLine); return Arrays.asList(textLine.toLowerCase().split("\\W+")); }) .groupBy((key, word) -> word) .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")); wordCounts.toStream() .foreach((word, count) -> System.out.println("word: " + word + " -> " + count)); KafkaStreams streams = new KafkaStreams(builder.build(), props); final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("stream") { @Override public void run() { streams.close(); latch.countDown(); } }); try { // Step 4: start the Kafka Stream streams.start(); latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.exit(0); } }
serverUrl
: the Kafka service URL of your StreamNative cluster.keyPath
: the path to your downloaded OAuth2 credential file.audience
: theaudience
parameter is a combination of theurn:sn:pulsar
, your organization name, and your Pulsar instance name.
Run the Kafka Stream application to check the connectivity.
Open another terminal and use the Kafka CLI tool to send some messages to the
TextLinesTopic
topic../bin/kafka-console-producer.sh \ --bootstrap-server `SERVER-URL` \ --producer.config ./kafka.properties \ --topic TextLinesTopic
Type some texts.
hello world hello world hello world
Then, you should see the following input:
stream application receive: hello world stream application receive: hello world hello world word: hello -> 3 word: world -> 3