1. Process Data Streams
  2. Kafka Streams & KSQL

Connect to your cluster using KSQL

Note

This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account produce and consume permissions to a namespace for the target topic.

If you are using a Ursa-Engine powered cluster, please note that KStreams and KSQLDB support in Ursa Engine has certain limitations. It doesn't support functionalities that require transactions and topic compaction.

This document describes how to connect to your StreamNative cluster using KSQL with SASL/PLAIN authentication.

Before you begin

Note

  • Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
  • The password for different utilities as kcat will be equal to token:<API KEY>.

You can follow the instructions to create an API key for the service account you choose to use.

Steps

  1. Open the etc/ksqldb/ksql-server.properties file and configure the KSQL server with the following properties:

    #------ Kafka -------
    # The set of Kafka brokers to bootstrap Kafka cluster information from:
    bootstrap.servers=<SERVER-URL>
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
      required username="public/default" \
      password="token:<YOUR-API-KEY>";
    
    • bootstrap.servers: the Kafka service URL of your StreamNative cluster.
    • password: an API key of your service account.
  2. Start the KSQL server.

    bin/ksql-server-start etc/ksqldb/ksql-server.properties
    

    After the KSQL server is started, you should see the following output:

    [2023-04-06 14:51:02,811] INFO ksqlDB API server listening on http://0.0.0.0:8088 (io.confluent.ksql.rest.server.KsqlRestApplication:382)
    
                      ===========================================
                      =       _              _ ____  ____       =
                      =      | | _____  __ _| |  _ \| __ )      =
                      =      | |/ / __|/ _` | | | | |  _ \      =
                      =      |   <\__ \ (_| | | |_| | |_) |     =
                      =      |_|\_\___/\__, |_|____/|____/      =
                      =                   |_|                   =
                      =        The Database purpose-built       =
                      =        for stream processing apps       =
                      ===========================================
    
    Copyright 2017-2022 Confluent Inc.
    
    Server 7.3.2 listening on http://0.0.0.0:8088
    
    To access the KSQL CLI, run:
    ksql http://0.0.0.0:8088
    
    [2023-04-06 14:51:02,814] INFO Server up and running (io.confluent.ksql.rest.server.KsqlServerMain:153)
    [2023-04-06 14:51:04,117] INFO Successfully submitted metrics to Confluent via secure endpoint (io.confluent.support.metrics.submitters.ConfluentSubmitter:146)
    
  3. Start the KSQL CLI tool.

    LOG_DIR=./ksql_logs bin/ksql http://localhost:8088
    

    After the KSQL CLI tool is started, you should see the following output:

    CLI v7.3.2, Server v7.3.2 located at http://localhost:8088
    Server Status: RUNNING
    
  4. Create a stream and tables using the KSQL CLI tool.

    a. Create a stream named riderLocations:

    CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
      WITH (kafka_topic='locations', value_format='json', partitions=1);
    

    b. Create two tables (currentLocation and ridersNearMountainView ) to track the latest location of the riders using a materialized view.

    CREATE TABLE currentLocation AS
      SELECT profileId,
            LATEST_BY_OFFSET(latitude) AS la,
            LATEST_BY_OFFSET(longitude) AS lo
      FROM riderlocations
      GROUP BY profileId
      EMIT CHANGES;
    
    CREATE TABLE ridersNearMountainView AS
      SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles,
            COLLECT_LIST(profileId) AS riders,
            COUNT(*) AS count
      FROM currentLocation
      GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);
    
  5. Insert and query data.

    a. Open a terminal to run a push query over the stream.

    -- Mountain View lat, long: 37.4133, -122.1162
    SELECT * FROM riderLocations
      WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
    

    b. Open another terminal to start another KSQL CLI tool and insert data into the stream.

    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);
    

    You should see the following output in the first terminal:

    >  WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
    +---------------------------------+---------------------------------+---------------------------------+
    |PROFILEID                        |LATITUDE                         |LONGITUDE                        |
    +---------------------------------+---------------------------------+---------------------------------+
    |4ab5cbad                         |37.3952                          |-122.0813                        |
    |8b6eae59                         |37.3944                          |-122.0813                        |
    |4a7c7b41                         |37.4049                          |-122.0822                        |
    
Previous
Kafka Streams