- Build Applications
- Kafka Clients
Connect to your cluster using KSQL
Note
This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account produce
and consume
permissions to a namespace for the target topic.
This document describes how to connect to your StreamNative cluster using KSQL with SASL/PLAIN authentication.
Before you begin
Note
- Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
- The password for different utilities as
kcat
will be equal totoken:<API KEY>
.
You can follow the instructions to create an API key for the service account you choose to use.
Steps
Open the
etc/ksqldb/ksql-server.properties
file and configure the KSQL server with the following properties:#------ Kafka ------- # The set of Kafka brokers to bootstrap Kafka cluster information from: bootstrap.servers=<SERVER-URL> security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \ required username="public/default" \ password="token:<YOUR-API-KEY>";
bootstrap.servers
: the Kafka service URL of your StreamNative cluster.password
: an API key of your service account.
Start the KSQL server.
bin/ksql-server-start etc/ksqldb/ksql-server.properties
After the KSQL server is started, you should see the following output:
[2023-04-06 14:51:02,811] INFO ksqlDB API server listening on http://0.0.0.0:8088 (io.confluent.ksql.rest.server.KsqlRestApplication:382) =========================================== = _ _ ____ ____ = = | | _____ __ _| | _ \| __ ) = = | |/ / __|/ _` | | | | | _ \ = = | <\__ \ (_| | | |_| | |_) | = = |_|\_\___/\__, |_|____/|____/ = = |_| = = The Database purpose-built = = for stream processing apps = =========================================== Copyright 2017-2022 Confluent Inc. Server 7.3.2 listening on http://0.0.0.0:8088 To access the KSQL CLI, run: ksql http://0.0.0.0:8088 [2023-04-06 14:51:02,814] INFO Server up and running (io.confluent.ksql.rest.server.KsqlServerMain:153) [2023-04-06 14:51:04,117] INFO Successfully submitted metrics to Confluent via secure endpoint (io.confluent.support.metrics.submitters.ConfluentSubmitter:146)
Start the KSQL CLI tool.
LOG_DIR=./ksql_logs bin/ksql http://localhost:8088
After the KSQL CLI tool is started, you should see the following output:
CLI v7.3.2, Server v7.3.2 located at http://localhost:8088 Server Status: RUNNING
Create a stream and tables using the KSQL CLI tool.
a. Create a stream named
riderLocations
:CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE) WITH (kafka_topic='locations', value_format='json', partitions=1);
b. Create two tables (
currentLocation
andridersNearMountainView
) to track the latest location of the riders using a materialized view.CREATE TABLE currentLocation AS SELECT profileId, LATEST_BY_OFFSET(latitude) AS la, LATEST_BY_OFFSET(longitude) AS lo FROM riderlocations GROUP BY profileId EMIT CHANGES;
CREATE TABLE ridersNearMountainView AS SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles, COLLECT_LIST(profileId) AS riders, COUNT(*) AS count FROM currentLocation GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);
Insert and query data.
a. Open a terminal to run a push query over the stream.
-- Mountain View lat, long: 37.4133, -122.1162 SELECT * FROM riderLocations WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
b. Open another terminal to start another KSQL CLI tool and insert data into the stream.
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);
You should see the following output in the first terminal:
> WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES; +---------------------------------+---------------------------------+---------------------------------+ |PROFILEID |LATITUDE |LONGITUDE | +---------------------------------+---------------------------------+---------------------------------+ |4ab5cbad |37.3952 |-122.0813 | |8b6eae59 |37.3944 |-122.0813 | |4a7c7b41 |37.4049 |-122.0822 |