Documentation Index
Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
Use this file to discover all available pages before exploring further.
This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account produce and consume permissions to a namespace for the target topic.If you are using a Ursa-Engine powered cluster, please note that KStreams and KSQLDB support in Ursa Engine has certain limitations. It doesn’t support functionalities that require transactions and topic compaction.
This document describes how to connect to your StreamNative cluster using KSQL with SASL/PLAIN authentication.
Before you begin
- Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
- The password for different utilities as
kcat will be equal to token:<API KEY>.
You can follow the instructions to create an API key for the service account you choose to use.
Steps
-
Open the
etc/ksqldb/ksql-server.properties file and configure the KSQL server with the following properties:
#------ Kafka -------
# The set of Kafka brokers to bootstrap Kafka cluster information from:
bootstrap.servers=<SERVER-URL>
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="public/default" \
password="token:<YOUR-API-KEY>";
bootstrap.servers: the Kafka service URL of your StreamNative cluster.
password: an API key of your service account.
-
Start the KSQL server.
bin/ksql-server-start etc/ksqldb/ksql-server.properties
After the KSQL server is started, you should see the following output:
[2023-04-06 14:51:02,811] INFO ksqlDB API server listening on http://0.0.0.0:8088 (io.confluent.ksql.rest.server.KsqlRestApplication:382)
===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= The Database purpose-built =
= for stream processing apps =
===========================================
Copyright 2017-2022 Confluent Inc.
Server 7.3.2 listening on http://0.0.0.0:8088
To access the KSQL CLI, run:
ksql http://0.0.0.0:8088
[2023-04-06 14:51:02,814] INFO Server up and running (io.confluent.ksql.rest.server.KsqlServerMain:153)
[2023-04-06 14:51:04,117] INFO Successfully submitted metrics to Confluent via secure endpoint (io.confluent.support.metrics.submitters.ConfluentSubmitter:146)
-
Start the KSQL CLI tool.
LOG_DIR=./ksql_logs bin/ksql http://localhost:8088
After the KSQL CLI tool is started, you should see the following output:
CLI v7.3.2, Server v7.3.2 located at http://localhost:8088
Server Status: RUNNING
-
Create a stream and tables using the KSQL CLI tool.
a. Create a stream named
riderLocations:
CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
WITH (kafka_topic='locations', value_format='json', partitions=1);
b. Create two tables (currentLocation and ridersNearMountainView ) to track the latest location of the riders using a materialized view.
CREATE TABLE currentLocation AS
SELECT profileId,
LATEST_BY_OFFSET(latitude) AS la,
LATEST_BY_OFFSET(longitude) AS lo
FROM riderlocations
GROUP BY profileId
EMIT CHANGES;
CREATE TABLE ridersNearMountainView AS
SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles,
COLLECT_LIST(profileId) AS riders,
COUNT(*) AS count
FROM currentLocation
GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);
-
Insert and query data.
a. Open a terminal to run a push query over the stream.
-- Mountain View lat, long: 37.4133, -122.1162
SELECT * FROM riderLocations
WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
b. Open another terminal to start another KSQL CLI tool and insert data into the stream.
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);
You should see the following output in the first terminal:
> WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
+---------------------------------+---------------------------------+---------------------------------+
|PROFILEID |LATITUDE |LONGITUDE |
+---------------------------------+---------------------------------+---------------------------------+
|4ab5cbad |37.3952 |-122.0813 |
|8b6eae59 |37.3944 |-122.0813 |
|4a7c7b41 |37.4049 |-122.0822 |