- Process Data Streams
- Kafka Streams & KSQL
Connect to your cluster using KSQL
Note
This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account produce
and consume
permissions to a namespace for the target topic.
If you are using a Ursa-Engine powered cluster, please note that KStreams and KSQLDB support in Ursa Engine has certain limitations. It doesn't support functionalities that require transactions and topic compaction.
This document describes how to connect to your StreamNative cluster using KSQL with SASL/PLAIN authentication.
Before you begin
Note
- Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
- The password for different utilities as
kcat
will be equal totoken:<API KEY>
.
You can follow the instructions to create an API key for the service account you choose to use.
Steps
Open the
etc/ksqldb/ksql-server.properties
file and configure the KSQL server with the following properties:#------ Kafka ------- # The set of Kafka brokers to bootstrap Kafka cluster information from: bootstrap.servers=<SERVER-URL> security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \ required username="public/default" \ password="token:<YOUR-API-KEY>";
bootstrap.servers
: the Kafka service URL of your StreamNative cluster.password
: an API key of your service account.
Start the KSQL server.
bin/ksql-server-start etc/ksqldb/ksql-server.properties
After the KSQL server is started, you should see the following output:
[2023-04-06 14:51:02,811] INFO ksqlDB API server listening on http://0.0.0.0:8088 (io.confluent.ksql.rest.server.KsqlRestApplication:382) =========================================== = _ _ ____ ____ = = | | _____ __ _| | _ \| __ ) = = | |/ / __|/ _` | | | | | _ \ = = | <\__ \ (_| | | |_| | |_) | = = |_|\_\___/\__, |_|____/|____/ = = |_| = = The Database purpose-built = = for stream processing apps = =========================================== Copyright 2017-2022 Confluent Inc. Server 7.3.2 listening on http://0.0.0.0:8088 To access the KSQL CLI, run: ksql http://0.0.0.0:8088 [2023-04-06 14:51:02,814] INFO Server up and running (io.confluent.ksql.rest.server.KsqlServerMain:153) [2023-04-06 14:51:04,117] INFO Successfully submitted metrics to Confluent via secure endpoint (io.confluent.support.metrics.submitters.ConfluentSubmitter:146)
Start the KSQL CLI tool.
LOG_DIR=./ksql_logs bin/ksql http://localhost:8088
After the KSQL CLI tool is started, you should see the following output:
CLI v7.3.2, Server v7.3.2 located at http://localhost:8088 Server Status: RUNNING
Create a stream and tables using the KSQL CLI tool.
a. Create a stream named
riderLocations
:CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE) WITH (kafka_topic='locations', value_format='json', partitions=1);
b. Create two tables (
currentLocation
andridersNearMountainView
) to track the latest location of the riders using a materialized view.CREATE TABLE currentLocation AS SELECT profileId, LATEST_BY_OFFSET(latitude) AS la, LATEST_BY_OFFSET(longitude) AS lo FROM riderlocations GROUP BY profileId EMIT CHANGES;
CREATE TABLE ridersNearMountainView AS SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles, COLLECT_LIST(profileId) AS riders, COUNT(*) AS count FROM currentLocation GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);
Insert and query data.
a. Open a terminal to run a push query over the stream.
-- Mountain View lat, long: 37.4133, -122.1162 SELECT * FROM riderLocations WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
b. Open another terminal to start another KSQL CLI tool and insert data into the stream.
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);
You should see the following output in the first terminal:
> WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES; +---------------------------------+---------------------------------+---------------------------------+ |PROFILEID |LATITUDE |LONGITUDE | +---------------------------------+---------------------------------+---------------------------------+ |4ab5cbad |37.3952 |-122.0813 | |8b6eae59 |37.3944 |-122.0813 | |4a7c7b41 |37.4049 |-122.0822 |