- StreamNative Cloud
- Compute
Flink SQL cookbook
The StreamNative Flink SQL cookbook is a collection of examples, patterns, and use cases of StreamNative Flink SQL.
Foundations
This section lists some basic Flink SQL operations.
Create table
StreamNative Flink SQL operates against logical tables, just like a traditional database. The table consists of the logical schema that defines the columns and types in the table and is what queries operate against.
You can test whether the table is properly created by running a simple SELECT
statement. In StreamNative Cloud Console, you can see the results printed to the SQL Editor.
This example shows how to create an orders
table with a few table schemas.
CREATE TABLE orders (
order_uid BIGINT,
product_id BIGINT,
price DECIMAL(32, 2),
order_time TIMESTAMP(3),
process_time AS PROCTIME(),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
)
Insert data into table
You can also use the pulsar-client CLI tool to generate data into topic directly. Ensure that the pulsar-client CLI tool uses a compatible schema (especially the time types) as the table schema.
This example shows how to insert multiple records into the
orders
table.INSERT INTO orders VALUES (1, 100, 30.15, CURRENT_TIMESTAMP), (2, 200, 40, CURRENT_TIMESTAMP), (3, 300, 28000.56, CURRENT_TIMESTAMP), (4, 400, 42960.90, CURRENT_TIMESTAMP), (5, 500, 50000.1, CURRENT_TIMESTAMP), (6, 100, 688888888.7, CURRENT_TIMESTAMP), (7, 300, 20.99, CURRENT_TIMESTAMP), (8, 100, 6000, CURRENT_TIMESTAMP)
This example takes the
orders
table, filters for the order IDs and the order time, and writes these logs into another table calledorder_with_time
.CREATE TABLE order_with_time ( order_uid BIGINT, order_time TIMESTAMP(3) ) INSERT INTO order_with_time SELECT order_uid, order_time FROM orders /*+ OPTIONS('scan.startup.mode'='earliest') */
Filter data
This example shows how to use a standard WHERE
clause to filter the order IDs, product IDs, and order time out of orders
table.
SELECT order_uid, product_id, order_time
FROM orders /*+ OPTIONS('scan.startup.mode'='earliest') */
WHERE price >= 1000
You can add the scan.startup.mode
configuration option to specify the start position for your SQL queries. The valid values are:
earliest
: start from the earliest position.latest
: start from the latest position.external-subscription
: start from the external subscription. For this startup mode, you need to set thescan.startup.sub-name
parameter, such as scan.startup.sub-name=dev-subscription.specific-offset
: start from user-supplied specific offsets. For this startup mode, you need to set thescan.startup.specific-offsets
parameter, such asscan.startup.specific-offsets=42:1012:0;44:1011:1;44:1008:2
. Thescan.startup.specific-offsets
parameter is in a format of combination of the Pulsar message Id (ledgerId:entryId:partitionId) and the subscription positions in topic partitions. In thespecific-offset
startup mode, the source can only use topics, but does not support configuring the topic-pattern or multiple topics.
By default, the scan.startup.mode
is set to latest
.
Aggregate data
This example shows how to use the standard GROUP BY
clause to aggregate the price
data in the orders
table based on the product_id
in real time. A GROUP BY
clause on a streaming table produces an updating result, so you can see the aggregated sum.
SELECT product_id, SUM(price) AS total_price
FROM orders /*+ OPTIONS('scan.startup.mode'='earliest') */
GROUP BY product_id
You can play around with other standard SQL aggregate functions (such as COUNT
, AVG
, MIN
, MAX
).
Encapsulate logic with (temporary) views
This example shows how to use (temporary) views to reuse codes and to structure long queries and scripts.
CREATE (TEMPORARY) VIEW
defines a view from a query. A view is not physically materialized. Instead, the query runs every time when the view is referenced in a query.
Temporary views are useful to structure and decompose more complicated queries and to re-use queries within a longer script. Non-temporary views, stored in a persistent catalog, can also be used to share common queries within your organization.
This example creates a view on the orders
where the product_id
equals to 3
. This view encapsulates the logic of filtering the data. This logic can subsequently be used by any query or script that has access to the catalog.
CREATE VIEW p3_order AS
SELECT *
FROM orders
WHERE product_id = 3
Aggregations and analytics
This section lists Flink SQL aggregations and analytics operations.
Aggregate time-series data
Many streaming applications work with time-series data. In this example, to sum the total price every 30 seconds, rows need to be grouped based on the time. It is special to group based on time, because time always moves forward, which means Flink can generate final results after the interval is completed.
This example shows how to aggregate time-series data in real-time using a TUMBLE
window. TUMBLE
is a built-in function for grouping timestamps into time intervals (as known as windows). Unlike other aggregations, TUMBLE
only produces a single final result for each key when the interval is completed.
If the product ID does not have a timestamp, a timestamp can be generated using a computed column. process_time AS PROCTIME()
appends a column to the table with the current system time.
SELECT
product_id,
TUMBLE_PROCTIME(process_time, INTERVAL '30' SECOND) AS window_interval,
SUM(price)
FROM orders /*+ OPTIONS('scan.startup.mode'='earliest') */
GROUP BY
TUMBLE(process_time, INTERVAL '30' SECOND), product_id
Work with watermarks
This example shows how to use WATERMARK
to work with timestamps in records. In this example, all records, with 30 seconds earlier than the order_time
, are expected to be ready.
SELECT
product_id,
TUMBLE_ROWTIME(order_time, INTERVAL '30' SECOND) AS order_time,
COUNT(*) AS order_cnt
FROM orders /*+ OPTIONS('scan.startup.mode'='earliest') */
GROUP BY
TUMBLE(order_time, INTERVAL '30' SECOND),
product_id
Work with sessions in time-series data
To count the number of orders per user for a session, you can use the SESSION
built-in group window function. In this example, a session is bounded by an interval of 10 seconds. This means that orders that occur within 10 seconds of the last seen order for each user are merged into the same session window. And any order that occurs outside of this interval triggers the creation of a new session window.
You can use the SESSION_START
and SESSION_ROWTIME
auxiliary functions to check the lower and upper bounds of session windows.
SELECT
product_id,
SESSION_START(order_time, INTERVAL '10' SECOND) AS session_beg,
SESSION_ROWTIME(order_time, INTERVAL '10' SECOND) AS session_end,
COUNT(*) AS order_frequence
FROM orders /*+ OPTIONS('scan.startup.mode'='earliest') */
GROUP BY
product_id,
SESSION(order_time, INTERVAL '10' SECOND)
Roll aggregations on time-series data
This example shows how to calculate an aggregate or cumulative value based on a group of rows using an OVER
window.
The OVER
aggregates compute an aggregated value for every input row over a range of ordered rows. In contrast to GROUP BY
aggregates, the OVER
aggregates do not reduce the number of result rows to a single row for every group. Instead, the OVER
aggregates produce an aggregated value for every input row.
This example tries to calculate the total price based on the processing time and the product IDs.
SELECT product_id, COUNT(*) OVER w as cnt, SUM(price) OVER w as total
FROM orders /*+ OPTIONS('scan.startup.mode'='earliest') */
WINDOW w AS (
PARTITION BY product_id
ORDER BY process_time
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
Work with continuous top-N
This example shows how to use using an OVER
window and the ROW_NUMBER()
function to continuously calculate the "top-N" rows based on a given attribute.
SELECT product_id, order_cnt
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY orders DESC) AS row_num
FROM (SELECT product_id, COUNT(*) AS order_cnt FROM orders /*+ OPTIONS('scan.startup.mode'='earliest') */ GROUP BY product_id)
)
WHERE row_num <= 2
Deduplication
This example shows how to identify and filter out duplicated information in a stream of events and use deduplication to only keep the latest record for each order_uid
.
SELECT
order_uid,
order_time
FROM (
SELECT order_uid,
order_time,
ROW_NUMBER() OVER (PARTITION BY order_uid ORDER BY order_time) AS rownum
FROM orders /*+ OPTIONS('scan.startup.mode'='earliest') */
)
WHERE rownum = 1
Built-in functions and operators
This section lists a set of built-in functions for data transformations.
Work with time
This example shows how to use built-in date and time functions to manipulate temporal fields.
The start_date
and end_date
are Unix timestamps (i.e. epochs) — which are not very human-readable and should be converted. Also, you want to parse the payment_expiration
timestamp into its corresponding day, month and year parts.
TO_TIMESTAMP(string[, format])
: convert aSTRING
value to aTIMESTAMP
using the specified format (By default, it is in a format of'yyyy-MM-dd HH:mm:ss'
).FROM_UNIXTIME(numeric[, string])
: convert an epoch to a formattedSTRING
(By default, it is in a format of'yyyy-MM-dd HH:mm:ss'
).DATE_FORMAT(timestamp, string)
: convert aTIMESTAMP
to aSTRING
using the specified format.EXTRACT(timeintervalunit FROM temporal)
: return aLONG
extracted from the specified date part of a temporal field (DAY
,MONTH
,YEAR
).TIMESTAMPDIFF(unit, timepoint1, timepoint2)
: return the number of time units (SECOND
,MINUTE
,HOUR
,DAY
,MONTH
orYEAR
) betweentimepoint1
andtimepoint2
C
URRENT_TIMESTAMP
: return the current SQL timestamp (UTC).
-- prepare subscriptions table
CREATE TABLE subscriptions (
id STRING,
start_date INT,
end_date INT,
payment_expiration TIMESTAMP(3)
-- select data by specifying the ID and the time
SELECT
id,
TO_TIMESTAMP(FROM_UNIXTIME(start_date)) AS start_date,
TO_TIMESTAMP(FROM_UNIXTIME(end_date)) AS end_date,
DATE_FORMAT(payment_expiration,'YYYYww') AS exp_yweek,
EXTRACT(DAY FROM payment_expiration) AS exp_day, --same as DAYOFMONTH(ts)
EXTRACT(MONTH FROM payment_expiration) AS exp_month, --same as MONTH(ts)
EXTRACT(YEAR FROM payment_expiration) AS exp_year --same as YEAR(ts)
FROM subscriptions
WHERE
TIMESTAMPDIFF(DAY,CURRENT_TIMESTAMP,payment_expiration) < 30;
Build the union of multiple streams
This example combines the product ID, price, and processing time by using the UNION ALL
set operation. The UNION ALL
does not combine equivalent rows.
select product_id, price, process_time from orders /*+ OPTIONS('scan.startup.mode'='earliest') */
union all
select product_id, price, process_time from orders /*+ OPTIONS('scan.startup.mode'='earliest') */
Join
Flink SQL supports complex and flexible join operations over continuous tables. There are several different types of joins to account for the wide variety of semantics queries may require.
Regular join
Regular joins are the most generic and flexible type of join. These include the standard INNER
and [FULL|LEFT|RIGHT] OUTER
joins that are available in most modern databases.
This example shows how to use joins to correlate the order_uid
row of the orders
table to the product
table.
-- prepare product table
CREATE TABLE product (
product_id BIGINT,
name STRING
)
INSERT INTO product VALUES
(0, 'x'), (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'),
(5, 'e'), (6, 'f'), (7, 'g'), (8, 'h'), (9, 'i'),
(10, 'j'), (11, 'k'), (12, 'l'), (13, 'm'), (14, 'n'),
(15, 'o'), (16, 'p'), (17, 'q'), (18, 'r'), (19, 's')
-- join for order product name
SELECT
order_uid,
name
FROM orders /*+ OPTIONS('scan.startup.mode'='earliest') */
INNER JOIN product /*+ OPTIONS('scan.startup.mode'='earliest') */ ON orders.product_id = product.product_id
Interval join
An interval join is defined by a join predicate that checks if the time attributes of the input events are within certain time constraints. The interval join is more useful to keep resource utilization from growing indefinitely.
This example shows how to join events of two tables (order_simple
and ship
) that correlate to each other. To reduce the number of input rows, Flink has to retain and optimize the join operation. You can define a time constraint in the WHERE
clause to bound the time on both sides to that specific interval using a BETWEEN
predicate.
-- prepare order_simple table
CREATE TABLE order_simple (
order_id BIGINT,
order_time TIMESTAMP(3)
)
-- insert values into order_simple table
INSERT INTO order_simple VALUES
(1, TIMESTAMP '2021-04-27 11:22:33'),
(2, TIMESTAMP '2021-04-27 11:22:33'),
(3, TIMESTAMP '2021-04-27 11:22:33'),
(4, TIMESTAMP '2021-04-27 11:22:33'),
(5, TIMESTAMP '2021-04-27 11:22:33'),
(6, TIMESTAMP '2021-04-27 11:22:33')
-- prepare ship table
CREATE TABLE ship (
order_id BIGINT,
ship_time TIMESTAMP(3)
)
-- insert values into ship table
INSERT INTO ship VALUES
(1, TIMESTAMP '2021-04-27 11:22:33'),
(2, TIMESTAMP '2021-04-27 11:22:34'),
(3, TIMESTAMP '2021-04-27 11:22:35'),
(4, TIMESTAMP '2021-04-30 11:22:35'),
(5, TIMESTAMP '2021-04-30 11:22:35'),
(6, TIMESTAMP '2021-04-30 11:22:35')
SELECT
o.order_id AS order_id,
o.order_time,
s.ship_time
FROM order_simple /*+ OPTIONS('scan.startup.mode'='earliest') */ o
JOIN ship /*+ OPTIONS('scan.startup.mode'='earliest') */ s ON o.order_id = s.order_id
WHERE
o.order_time BETWEEN s.ship_time - INTERVAL '3' DAY AND s.ship_time