Connect to your cluster using KSQL

Note

This QuickStart assumes that you have created a Pulsar cluster with the Kafka protocol enabled, created a service account, and granted the service account produce and consume permissions to a namespace for the target topic.

This document describes how to connect to your Pulsar cluster using KSQL through Token authentication.

Before you begin

Note

  • Before getting the token of a service account, verify that the service account is authorized as a superuser or an admin of the tenants and namespaces.
  • A token has a system-defined Time-To-Live (TTL) of 7 days. Before a token expires, ensure that you generate a new token for your service account.
  • The password for different utilities as kcat will be equal to token:TOKEN
  • Get the JWT token.

    1. On the left navigation pane, click Service Accounts.

    2. In the row of the service account you want to use, in the Token column, click Generate new token, then click the Copy icon to copy the token to your clipboard.

  • Get the service URL of your Pulsar cluster.

    1. On the left navigation pane, in the Admin area, click Pulsar Clusters.
    2. Select the Details tab, and in the Access Points area, click Copy at the end of the row of the Kafka Service URL (TCP).

Steps

  1. Open the etc/ksqldb/ksql-server.properties file and configure the KSQL server with the following properties:

    #------ Kafka -------
    # The set of Kafka brokers to bootstrap Kafka cluster information from:
    bootstrap.servers=<SERVER-URL>
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
      required username="public/default" \
      password="token:<YOUR-TOKEN>";
    
    • bootstrap.servers: the Kafka service URL of your Pulsar cluster.
    • password: the token of your service account.
  2. Start the KSQL server.

    bin/ksql-server-start etc/ksqldb/ksql-server.properties
    

    After the KSQL server is started, you should see the following output:

    [2023-04-06 14:51:02,811] INFO ksqlDB API server listening on http://0.0.0.0:8088 (io.confluent.ksql.rest.server.KsqlRestApplication:382)
    
                      ===========================================
                      =       _              _ ____  ____       =
                      =      | | _____  __ _| |  _ \| __ )      =
                      =      | |/ / __|/ _` | | | | |  _ \      =
                      =      |   <\__ \ (_| | | |_| | |_) |     =
                      =      |_|\_\___/\__, |_|____/|____/      =
                      =                   |_|                   =
                      =        The Database purpose-built       =
                      =        for stream processing apps       =
                      ===========================================
    
    Copyright 2017-2022 Confluent Inc.
    
    Server 7.3.2 listening on http://0.0.0.0:8088
    
    To access the KSQL CLI, run:
    ksql http://0.0.0.0:8088
    
    [2023-04-06 14:51:02,814] INFO Server up and running (io.confluent.ksql.rest.server.KsqlServerMain:153)
    [2023-04-06 14:51:04,117] INFO Successfully submitted metrics to Confluent via secure endpoint (io.confluent.support.metrics.submitters.ConfluentSubmitter:146)
    
  3. Start the KSQL CLI tool.

    LOG_DIR=./ksql_logs bin/ksql http://localhost:8088
    

    After the KSQL CLI tool is started, you should see the following output:

    CLI v7.3.2, Server v7.3.2 located at http://localhost:8088
    Server Status: RUNNING
    
  4. Create a stream and tables using the KSQL CLI tool.

    a. Create a stream named riderLocations:

    CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
      WITH (kafka_topic='locations', value_format='json', partitions=1);
    

    b. Create two tables (currentLocation and ridersNearMountainView ) to track the latest location of the riders using a materialized view.

    CREATE TABLE currentLocation AS
      SELECT profileId,
            LATEST_BY_OFFSET(latitude) AS la,
            LATEST_BY_OFFSET(longitude) AS lo
      FROM riderlocations
      GROUP BY profileId
      EMIT CHANGES;
    
    CREATE TABLE ridersNearMountainView AS
      SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles,
            COLLECT_LIST(profileId) AS riders,
            COUNT(*) AS count
      FROM currentLocation
      GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);
    
  5. Insert and query data.

    a. Open a terminal to run a push query over the stream.

    -- Mountain View lat, long: 37.4133, -122.1162
    SELECT * FROM riderLocations
      WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
    

    b. Open another terminal to start another KSQL CLI tool and insert data into the stream.

    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);
    

    You should see the following output in the first terminal:

    >  WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
    +---------------------------------+---------------------------------+---------------------------------+
    |PROFILEID                        |LATITUDE                         |LONGITUDE                        |
    +---------------------------------+---------------------------------+---------------------------------+
    |4ab5cbad                         |37.3952                          |-122.0813                        |
    |8b6eae59                         |37.3944                          |-122.0813                        |
    |4a7c7b41                         |37.4049                          |-122.0822                        |
    
Previous
Kafka Stream