1. Build Applications
  2. Kafka Clients

Connect to your cluster using KSQL

Note

This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account produce and consume permissions to a namespace for the target topic.

This document describes how to connect to your StreamNative cluster using KSQL with SASL/PLAIN authentication.

Before you begin

Note

  • Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
  • The password for different utilities as kcat will be equal to token:<API KEY>.

You can follow the instructions to create an API key for the service account you choose to use.

Steps

  1. Open the etc/ksqldb/ksql-server.properties file and configure the KSQL server with the following properties:

    #------ Kafka -------
    # The set of Kafka brokers to bootstrap Kafka cluster information from:
    bootstrap.servers=<SERVER-URL>
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
      required username="public/default" \
      password="token:<YOUR-API-KEY>";
    
    • bootstrap.servers: the Kafka service URL of your StreamNative cluster.
    • password: an API key of your service account.
  2. Start the KSQL server.

    bin/ksql-server-start etc/ksqldb/ksql-server.properties
    

    After the KSQL server is started, you should see the following output:

    [2023-04-06 14:51:02,811] INFO ksqlDB API server listening on http://0.0.0.0:8088 (io.confluent.ksql.rest.server.KsqlRestApplication:382)
    
                      ===========================================
                      =       _              _ ____  ____       =
                      =      | | _____  __ _| |  _ \| __ )      =
                      =      | |/ / __|/ _` | | | | |  _ \      =
                      =      |   <\__ \ (_| | | |_| | |_) |     =
                      =      |_|\_\___/\__, |_|____/|____/      =
                      =                   |_|                   =
                      =        The Database purpose-built       =
                      =        for stream processing apps       =
                      ===========================================
    
    Copyright 2017-2022 Confluent Inc.
    
    Server 7.3.2 listening on http://0.0.0.0:8088
    
    To access the KSQL CLI, run:
    ksql http://0.0.0.0:8088
    
    [2023-04-06 14:51:02,814] INFO Server up and running (io.confluent.ksql.rest.server.KsqlServerMain:153)
    [2023-04-06 14:51:04,117] INFO Successfully submitted metrics to Confluent via secure endpoint (io.confluent.support.metrics.submitters.ConfluentSubmitter:146)
    
  3. Start the KSQL CLI tool.

    LOG_DIR=./ksql_logs bin/ksql http://localhost:8088
    

    After the KSQL CLI tool is started, you should see the following output:

    CLI v7.3.2, Server v7.3.2 located at http://localhost:8088
    Server Status: RUNNING
    
  4. Create a stream and tables using the KSQL CLI tool.

    a. Create a stream named riderLocations:

    CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
      WITH (kafka_topic='locations', value_format='json', partitions=1);
    

    b. Create two tables (currentLocation and ridersNearMountainView ) to track the latest location of the riders using a materialized view.

    CREATE TABLE currentLocation AS
      SELECT profileId,
            LATEST_BY_OFFSET(latitude) AS la,
            LATEST_BY_OFFSET(longitude) AS lo
      FROM riderlocations
      GROUP BY profileId
      EMIT CHANGES;
    
    CREATE TABLE ridersNearMountainView AS
      SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles,
            COLLECT_LIST(profileId) AS riders,
            COUNT(*) AS count
      FROM currentLocation
      GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);
    
  5. Insert and query data.

    a. Open a terminal to run a push query over the stream.

    -- Mountain View lat, long: 37.4133, -122.1162
    SELECT * FROM riderLocations
      WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
    

    b. Open another terminal to start another KSQL CLI tool and insert data into the stream.

    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);
    

    You should see the following output in the first terminal:

    >  WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
    +---------------------------------+---------------------------------+---------------------------------+
    |PROFILEID                        |LATITUDE                         |LONGITUDE                        |
    +---------------------------------+---------------------------------+---------------------------------+
    |4ab5cbad                         |37.3952                          |-122.0813                        |
    |8b6eae59                         |37.3944                          |-122.0813                        |
    |4a7c7b41                         |37.4049                          |-122.0822                        |
    
Previous
Kafka Stream