- Build Applications
- Kafka Client Guides
Connect to your cluster using KSQL
Note
This QuickStart assumes that you have created a Pulsar cluster with the Kafka protocol enabled, created a service account, and granted the service account produce
and consume
permissions to a namespace for the target topic.
This document describes how to connect to your Pulsar cluster using KSQL through Token authentication.
Before you begin
Note
- Before getting the token of a service account, verify that the service account is authorized as a superuser or an admin of the tenants and namespaces.
- A token has a system-defined Time-To-Live (TTL) of 7 days. Before a token expires, ensure that you generate a new token for your service account.
Get the JWT token.
On the left navigation pane, click Service Accounts.
In the row of the service account you want to use, in the Token column, click Generate new token, then click the Copy icon to copy the token to your clipboard.
Get the service URL of your Pulsar cluster.
- On the left navigation pane, in the Admin area, click Pulsar Clusters.
- Select the Overview tab, and in the Access Points area, click Copy at the end of the row of the Kafka Service URL (TCP).
Steps
Open the
etc/ksqldb/ksql-server.properties
file and configure the KSQL server with the following properties:#------ Kafka ------- # The set of Kafka brokers to bootstrap Kafka cluster information from: bootstrap.servers=<SERVER-URL> security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \ required username="public/default" \ password="token:<YOUR-TOKEN>";
bootstrap.servers
: the Kafka service URL of your Pulsar cluster.password
: the token of your service account.
Start the KSQL server.
bin/ksql-server-start etc/ksqldb/ksql-server.properties
After the KSQL server is started, you should see the following output:
[2023-04-06 14:51:02,811] INFO ksqlDB API server listening on http://0.0.0.0:8088 (io.confluent.ksql.rest.server.KsqlRestApplication:382) =========================================== = _ _ ____ ____ = = | | _____ __ _| | _ \| __ ) = = | |/ / __|/ _` | | | | | _ \ = = | <\__ \ (_| | | |_| | |_) | = = |_|\_\___/\__, |_|____/|____/ = = |_| = = The Database purpose-built = = for stream processing apps = =========================================== Copyright 2017-2022 Confluent Inc. Server 7.3.2 listening on http://0.0.0.0:8088 To access the KSQL CLI, run: ksql http://0.0.0.0:8088 [2023-04-06 14:51:02,814] INFO Server up and running (io.confluent.ksql.rest.server.KsqlServerMain:153) [2023-04-06 14:51:04,117] INFO Successfully submitted metrics to Confluent via secure endpoint (io.confluent.support.metrics.submitters.ConfluentSubmitter:146)
Start the KSQL CLI tool.
LOG_DIR=./ksql_logs bin/ksql http://localhost:8088
After the KSQL CLI tool is started, you should see the following output:
CLI v7.3.2, Server v7.3.2 located at http://localhost:8088 Server Status: RUNNING
Create a stream and tables using the KSQL CLI tool.
a. Create a stream named
riderLocations
:CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE) WITH (kafka_topic='locations', value_format='json', partitions=1);
b. Create two tables (
currentLocation
andridersNearMountainView
) to track the latest location of the riders using a materialized view.CREATE TABLE currentLocation AS SELECT profileId, LATEST_BY_OFFSET(latitude) AS la, LATEST_BY_OFFSET(longitude) AS lo FROM riderlocations GROUP BY profileId EMIT CHANGES;
CREATE TABLE ridersNearMountainView AS SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles, COLLECT_LIST(profileId) AS riders, COUNT(*) AS count FROM currentLocation GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);
Insert and query data.
a. Open a terminal to run a push query over the stream.
-- Mountain View lat, long: 37.4133, -122.1162 SELECT * FROM riderLocations WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
b. Open another terminal to start another KSQL CLI tool and insert data into the stream.
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);
You should see the following output in the first terminal:
> WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES; +---------------------------------+---------------------------------+---------------------------------+ |PROFILEID |LATITUDE |LONGITUDE | +---------------------------------+---------------------------------+---------------------------------+ |4ab5cbad |37.3952 |-122.0813 | |8b6eae59 |37.3944 |-122.0813 | |4a7c7b41 |37.4049 |-122.0822 |