1. StreamNative Cloud
  2. Compute

Flink SQL cookbook

The StreamNative Flink SQL cookbook is a collection of examples, patterns, and use cases of StreamNative Flink SQL.


This section lists some basic Flink SQL operations.

Create table

StreamNative Flink SQL operates against logical tables, just like a traditional database. The table consists of the logical schema that defines the columns and types in the table and is what queries operate against.

You can test whether the table is properly created by running a simple SELECT statement. In StreamNative Cloud Console, you can see the results printed to the SQL Editor.

This example shows how to create an orders table with a few table schemas.

    order_uid  BIGINT,
    product_id BIGINT,
    price      DECIMAL(32, 2),
    order_time TIMESTAMP(3),
    process_time AS PROCTIME(),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND

Insert data into table

You can also use the pulsar-client CLI tool to generate data into topic directly. Ensure that the pulsar-client CLI tool uses a compatible schema (especially the time types) as the table schema.

  • This example shows how to insert multiple records into the orders table.

      (1, 100, 30.15, CURRENT_TIMESTAMP),
      (2, 200, 40, CURRENT_TIMESTAMP),
      (3, 300, 28000.56, CURRENT_TIMESTAMP),
      (4, 400, 42960.90, CURRENT_TIMESTAMP),
      (5, 500, 50000.1, CURRENT_TIMESTAMP),
      (6, 100, 688888888.7, CURRENT_TIMESTAMP),
      (7, 300, 20.99, CURRENT_TIMESTAMP),
      (8, 100, 6000, CURRENT_TIMESTAMP)
  • This example takes the orders table, filters for the order IDs and the order time, and writes these logs into another table called order_with_time.

    CREATE TABLE order_with_time (
      order_uid  BIGINT,
      order_time TIMESTAMP(3)
    INSERT INTO order_with_time
    SELECT order_uid, order_time
    FROM orders /*+ OPTIONS('scan.startup.mode'='earliest') */

Filter data

This example shows how to use a standard WHERE clause to filter the order IDs, product IDs, and order time out of orders table.

SELECT order_uid, product_id, order_time
FROM orders /*+ OPTIONS('scan.startup.mode'='earliest') */
WHERE price >= 1000

You can add the scan.startup.mode configuration option to specify the start position for your SQL queries. The valid values are:

  • earliest: start from the earliest position.
  • latest: start from the latest position.
  • external-subscription: start from the external subscription. For this startup mode, you need to set the scan.startup.sub-name parameter, such as scan.startup.sub-name=dev-subscription.
  • specific-offset: start from user-supplied specific offsets. For this startup mode, you need to set the scan.startup.specific-offsets parameter, such as scan.startup.specific-offsets=42:1012:0;44:1011:1;44:1008:2. The scan.startup.specific-offsets parameter is in a format of combination of the Pulsar message Id (ledgerId:entryId:partitionId) and the subscription positions in topic partitions. In the specific-offset startup mode, the source can only use topics, but does not support configuring the topic-pattern or multiple topics.

By default, the scan.startup.mode is set to latest.

Aggregate data

This example shows how to use the standard GROUP BY clause to aggregate the price data in the orders table based on the product_id in real time. A GROUP BY clause on a streaming table produces an updating result, so you can see the aggregated sum.

SELECT product_id, SUM(price) AS total_price
FROM orders /*+ OPTIONS('scan.startup.mode'='earliest') */
GROUP BY product_id

You can play around with other standard SQL aggregate functions (such as COUNT, AVG, MIN, MAX).

Encapsulate logic with (temporary) views

This example shows how to use (temporary) views to reuse codes and to structure long queries and scripts.

CREATE (TEMPORARY) VIEW defines a view from a query. A view is not physically materialized. Instead, the query runs every time when the view is referenced in a query.

Temporary views are useful to structure and decompose more complicated queries and to re-use queries within a longer script. Non-temporary views, stored in a persistent catalog, can also be used to share common queries within your organization.

This example creates a view on the orders where the product_id equals to 3. This view encapsulates the logic of filtering the data. This logic can subsequently be used by any query or script that has access to the catalog.

FROM orders
WHERE product_id = 3

Aggregations and analytics

This section lists Flink SQL aggregations and analytics operations.

Aggregate time-series data

Many streaming applications work with time-series data. In this example, to sum the total price every 30 seconds, rows need to be grouped based on the time. It is special to group based on time, because time always moves forward, which means Flink can generate final results after the interval is completed.

This example shows how to aggregate time-series data in real-time using a TUMBLE window. TUMBLE is a built-in function for grouping timestamps into time intervals (as known as windows). Unlike other aggregations, TUMBLE only produces a single final result for each key when the interval is completed.

If the product ID does not have a timestamp, a timestamp can be generated using a computed column. process_time AS PROCTIME() appends a column to the table with the current system time.

  TUMBLE_PROCTIME(process_time, INTERVAL '30' SECOND) AS window_interval,
FROM orders /*+ OPTIONS('scan.startup.mode'='earliest') */
  TUMBLE(process_time, INTERVAL '30' SECOND), product_id

Work with watermarks

This example shows how to use WATERMARK to work with timestamps in records. In this example, all records, with 30 seconds earlier than the order_time, are expected to be ready.

    TUMBLE_ROWTIME(order_time, INTERVAL '30' SECOND) AS order_time,
    COUNT(*) AS order_cnt
FROM orders /*+ OPTIONS('scan.startup.mode'='earliest') */
    TUMBLE(order_time, INTERVAL '30' SECOND),

Work with sessions in time-series data

To count the number of orders per user for a session, you can use the SESSION built-in group window function. In this example, a session is bounded by an interval of 10 seconds. This means that orders that occur within 10 seconds of the last seen order for each user are merged into the same session window. And any order that occurs outside of this interval triggers the creation of a new session window.

You can use the SESSION_START and SESSION_ROWTIME auxiliary functions to check the lower and upper bounds of session windows.

  SESSION_START(order_time, INTERVAL '10' SECOND) AS session_beg,
  SESSION_ROWTIME(order_time, INTERVAL '10' SECOND) AS session_end,
  COUNT(*) AS order_frequence
FROM orders /*+ OPTIONS('scan.startup.mode'='earliest') */
  SESSION(order_time, INTERVAL '10' SECOND)

Roll aggregations on time-series data

This example shows how to calculate an aggregate or cumulative value based on a group of rows using an OVER window.

The OVER aggregates compute an aggregated value for every input row over a range of ordered rows. In contrast to GROUP BY aggregates, the OVER aggregates do not reduce the number of result rows to a single row for every group. Instead, the OVER aggregates produce an aggregated value for every input row.

This example tries to calculate the total price based on the processing time and the product IDs.

SELECT product_id, COUNT(*) OVER w as cnt, SUM(price) OVER w as total
FROM orders /*+ OPTIONS('scan.startup.mode'='earliest') */
  PARTITION BY product_id
  ORDER BY process_time

Work with continuous top-N

This example shows how to use using an OVER window and the ROW_NUMBER() function to continuously calculate the "top-N" rows based on a given attribute.

SELECT product_id, order_cnt
    SELECT *,
    ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY orders DESC) AS row_num
    FROM (SELECT product_id, COUNT(*) AS order_cnt FROM orders /*+ OPTIONS('scan.startup.mode'='earliest') */ GROUP BY product_id)
WHERE row_num <= 2


This example shows how to identify and filter out duplicated information in a stream of events and use deduplication to only keep the latest record for each order_uid.

  SELECT order_uid,
         ROW_NUMBER() OVER (PARTITION BY order_uid ORDER BY order_time) AS rownum
  FROM orders /*+ OPTIONS('scan.startup.mode'='earliest') */
WHERE rownum = 1

Built-in functions and operators

This section lists a set of built-in functions for data transformations.

Work with time

This example shows how to use built-in date and time functions to manipulate temporal fields.

The start_date and end_date are Unix timestamps (i.e. epochs) — which are not very human-readable and should be converted. Also, you want to parse the payment_expiration timestamp into its corresponding day, month and year parts.

  • TO_TIMESTAMP(string[, format]): convert a STRING value to a TIMESTAMP using the specified format (By default, it is in a format of 'yyyy-MM-dd HH:mm:ss').

  • FROM_UNIXTIME(numeric[, string]): convert an epoch to a formatted STRING (By default, it is in a format of 'yyyy-MM-dd HH:mm:ss').

  • DATE_FORMAT(timestamp, string): convert a TIMESTAMP to a STRING using the specified format.

  • EXTRACT(timeintervalunit FROM temporal): return a LONG extracted from the specified date part of a temporal field ( DAY, MONTH, YEAR).

  • TIMESTAMPDIFF(unit, timepoint1, timepoint2): return the number of time units (SECOND, MINUTE, HOUR, DAY, MONTH or YEAR) between timepoint1 and timepoint2

  • CURRENT_TIMESTAMP: return the current SQL timestamp (UTC).

-- prepare subscriptions table
CREATE TABLE subscriptions (
    id STRING,
    start_date INT,
    end_date INT,
    payment_expiration TIMESTAMP(3)

-- select data by specifying the ID and the time
  TO_TIMESTAMP(FROM_UNIXTIME(start_date)) AS start_date,
  TO_TIMESTAMP(FROM_UNIXTIME(end_date)) AS end_date,
  DATE_FORMAT(payment_expiration,'YYYYww') AS exp_yweek,
  EXTRACT(DAY FROM payment_expiration) AS exp_day,     --same as DAYOFMONTH(ts)
  EXTRACT(MONTH FROM payment_expiration) AS exp_month, --same as MONTH(ts)
  EXTRACT(YEAR FROM payment_expiration) AS exp_year    --same as YEAR(ts)
FROM subscriptions

Build the union of multiple streams

This example combines the product ID, price, and processing time by using the UNION ALL set operation. The UNION ALL does not combine equivalent rows.

select product_id, price, process_time from orders /*+ OPTIONS('scan.startup.mode'='earliest') */
union all
select product_id, price, process_time from orders /*+ OPTIONS('scan.startup.mode'='earliest') */


Flink SQL supports complex and flexible join operations over continuous tables. There are several different types of joins to account for the wide variety of semantics queries may require.

Regular join

Regular joins are the most generic and flexible type of join. These include the standard INNER and [FULL|LEFT|RIGHT] OUTER joins that are available in most modern databases.

This example shows how to use joins to correlate the order_uid row of the orders table to the product table.

-- prepare product table
CREATE TABLE product (
  product_id BIGINT,
  name STRING

  (0, 'x'), (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'),
  (5, 'e'), (6, 'f'), (7, 'g'), (8, 'h'), (9, 'i'),
  (10, 'j'), (11, 'k'), (12, 'l'), (13, 'm'), (14, 'n'),
  (15, 'o'), (16, 'p'), (17, 'q'), (18, 'r'), (19, 's')

-- join for order product name
FROM orders /*+ OPTIONS('scan.startup.mode'='earliest') */
INNER JOIN product /*+ OPTIONS('scan.startup.mode'='earliest') */ ON orders.product_id = product.product_id

Interval join

An interval join is defined by a join predicate that checks if the time attributes of the input events are within certain time constraints. The interval join is more useful to keep resource utilization from growing indefinitely.

This example shows how to join events of two tables (order_simple and ship) that correlate to each other. To reduce the number of input rows, Flink has to retain and optimize the join operation. You can define a time constraint in the WHERE clause to bound the time on both sides to that specific interval using a BETWEEN predicate.

-- prepare order_simple table
CREATE TABLE order_simple (
    order_id BIGINT,
    order_time TIMESTAMP(3)

-- insert values into order_simple table
  (1, TIMESTAMP '2021-04-27 11:22:33'),
  (2, TIMESTAMP '2021-04-27 11:22:33'),
  (3, TIMESTAMP '2021-04-27 11:22:33'),
  (4, TIMESTAMP '2021-04-27 11:22:33'),
  (5, TIMESTAMP '2021-04-27 11:22:33'),
  (6, TIMESTAMP '2021-04-27 11:22:33')

-- prepare ship table
    order_id BIGINT,
    ship_time TIMESTAMP(3)

-- insert values into ship table
  (1, TIMESTAMP '2021-04-27 11:22:33'),
  (2, TIMESTAMP '2021-04-27 11:22:34'),
  (3, TIMESTAMP '2021-04-27 11:22:35'),
  (4, TIMESTAMP '2021-04-30 11:22:35'),
  (5, TIMESTAMP '2021-04-30 11:22:35'),
  (6, TIMESTAMP '2021-04-30 11:22:35')

  o.order_id AS order_id,
FROM order_simple /*+ OPTIONS('scan.startup.mode'='earliest') */ o
JOIN ship /*+ OPTIONS('scan.startup.mode'='earliest') */ s ON o.order_id = s.order_id
    o.order_time BETWEEN s.ship_time - INTERVAL '3' DAY AND s.ship_time
Flink SQL