- Process Data Streams
- pfSQL (Alpha)
Understand pfSQL
Note
This feature is currently in alpha. If you want to try it out or have any questions, submit a ticket to the support team.
This section walks you through the basic pfSQL elements, syntax and typical examples.
pfSQL identifiers
pfSQL identifiers are used to reference objects, such as Pulsar topics, structural data fields, functions, or connectors.
pfSQL supports the following three types of identifiers.
Unquoted identifiers
Unquoted identifiers must begin with a letter or underscore (_) and cannot contain extended characters or blank spaces.
Some examples of valid unquoted identifiers include:
customer
order_details
_id
email_address
For example, in a SQL query, you might reference a column using an unquoted identifier like this:
INSERT INTO `output-topic` SELECT customer_name, order_date FROM `input-topic`;
In this example, customer_name
and order_date
are both unquoted identifiers that refer to column names.
Note
Don’t use reserved keywords (such as SELECT, INSERT, and so on) to name columns or tables. Instead, you can use backquotes (`) to enclose reserved keywords or use a different name for your object.
Backquoted Identifiers
Backquoted identifiers are case-sensitive and can start with or contain any valid characters, including:
- Numbers
- Special characters (., ', !, @, #, $, %, ^, &, *, and so on)
- Extended ASCII and non-ASCII characters
- Blank spaces
- Reserved keywords
It is highly recommended to use backquoted identifiers in the whole query, including the reference of Pulsar topics, structural data fields, and so on.
String identifiers
Unlike unquoted and backquoted identifiers, string identifiers are string-type consts, such as text and dates.
String identifiers can contain any combination of characters, including whitespace, punctuation, and special characters.
In general, each string identifier must be enclosed by double quotes (“) or single quotes ('). For example, 'foo' in the following query is a string that represents the text “foo”.
INSERT INTO `output-topic` SELECT customer_name, order_date FROM `input-topic` WHERE customer_name = 'foo';
If you want to include a single quote or double quote within the string itself, you need to escape it using a backslash ().
The following is an example of using a backslash to escape a single quote.
INSERT INTO `output-topic` SELECT customer_name, order_date FROM `input-topic` WHERE description = “Apple\’s product”;
Logical/Boolean operators
The following table outlines the SQL logical/boolean operators that pfSQL supports.
Operator | Description | Example |
---|---|---|
AND | Returns TRUE if both operands are TRUE. | KEY = ‘key1’ AND PROPERTIES[ROUTING] = ‘true’ |
OR | Returns TRUE if either operand is TRUE. | KEY = ‘key1’ OR KEY = ‘key2’ |
NOT | Returns TRUE if the operand is FALSE. | NOT KEY = ‘key1’ |
Comparison operators
The following table outlines the SQL comparison operators that pfSQL supports.
Operator | Description | Example | Result |
---|---|---|---|
= | Checks if the values of two operands are equal or not. | 1 = 2 | FALSE |
!= | Checks if the values of two operands are equal or not. | 1 != 2 | TRUE |
> | Checks if the value of the left operand is greater than the value of the right operand. | 1 > 2 | FALSE |
< | Checks if the value of the left operand is less than the value of the right operand. | 1 < 2 | TRUE |
>= | Checks if the value of the left operand is greater than or equal to the value of the right operand. | 1 >= 2 | FALSE |
<= | Checks if the value of the left operand is less than or equal to the value of the right operand. | 1 <= 2 | TRUE |
Arithmetic operators
Unary operators
Unary arithmetic operators are used to perform arithmetic operations on a single operand. The unary arithmetic operators are:
+
(unary plus)-
(unary minus)
The unary plus operator does not change the sign of the operand. It is included for completeness.
The unary minus operator changes the sign of the operand.
Supported input data types: INT32
, INT64
, FLOAT
, DOUBLE
Output data type is the same as the input data type.
Binary operators
Binary arithmetic operators are used to perform arithmetic operations on two operands. The binary arithmetic operators are:
+
(addition)-
(subtraction)*
(multiplication)/
(division)%
(modulo)
Supported input data types: INT32
, INT64
, FLOAT
, DOUBLE
Output data type is following the precedence rules:
- If both operands are
INT32
, the output data type isINT32
. - If both operands are
INT64
, the output data type isINT64
. - If both operands are
FLOAT
, the output data type isFLOAT
. - If both operands are
DOUBLE
, the output data type isDOUBLE
. - If one operand is
INT32
and the other operand isINT64
, the output data type isINT64
. - If one operand is
INT32
and the other operand isFLOAT
, the output data type isFLOAT
. - If one operand is
INT32
and the other operand isDOUBLE
, the output data type isDOUBLE
. - If one operand is
INT64
and the other operand isFLOAT
, the output data type isFLOAT
. - If one operand is
INT64
and the other operand isDOUBLE
, the output data type isDOUBLE
. - If one operand is
FLOAT
and the other operand isDOUBLE
, the output data type isDOUBLE
.
The modulo operator is only supported for INT32
and INT64
operands. The output data type is the same as the input data type.
pfSQL statements
The basic statements of pfSQL include SELECT
, INSERT INTO
, and INSERT MULTI
. You can use a combination of them for routing, filtering, and projection purposes.
Note
The statements are case-insensitive. It’s highly recommended to use upper cases for reserved keywords, such as SELECT, INSERT, and so on.
SELECT
The SELECT
statement is used to select data from a topic. It is the key statement of a pfSQL query to define the message scope and processing conditions.
Syntax scheme
SELECT `selectItem0`[, `selectItem1`, ...]
FROM `fromTopic0`[, `fromTopic1`, ...]
[WHERE {query_where}]
[OPTIONS {query_options}]
Note
- Both
WHERE
clause andOPTION
are optional. WHERE
clause allows you to define a set of conditions that the data needs to match in order to be returned. If you don’t want to add any conditions to limit your query results, you can skip it. For more information about the supported operators, see Logical/Boolean operators.
Example
The following example represents selecting the qualified messages from the input topic. Any messages that do not satisfy the two conditions are excluded.
SELECT * FROM `input_topic` WHERE `field0` = "1" AND `field1` = "demo"
OPTIONS
On top of the SELECT statement, you can use OPTIONS
for specific use cases.
KeyValue Schema Support
KeyValue is a schema type introduced by the Apache Pulsar, and for more details, please refer to https://pulsar.apache.org/docs/3.1.x/schema-understand/#keyvalue-schema.
When using pfSQL to query data in KeyValue schema, you need to use either OPTIONS UNWRAP KEY
or OPTIONS UNWRAP VALUE
to unwrap the KeyValue schema, or OPTIONS MERGE
to merge the KeyValue schema into a single message.
For example, if you have a topic with KeyValue schema, and you want to query the key and value separately, you can use the following SQL:
SELECT * FROM `input` OPTIONS UNWRAP KEY;
SELECT * FROM `input` OPTIONS UNWRAP VALUE;
Note
With UNWRAP KEY
, the SELECT statement only affects the message keys. Take the KeyValue<AVRO, AVRO>
message for example, SELECT * FROM \
topic` OPTIONS UNWRAP KEYonly selects the message key and ignores the message value. Similarly,
UNWRAP VALUE` takes the message value as the output message and ignores the key.
Passing Pulsar Function Configs
You can pass Pulsar Function configs to pfSQL by using OPTIONS
. Available configs are limited to the following:
processingGuarantees
cleanupSubscription
subscriptionPosition
deadLetterTopic
maxMessageRetries
retainOrdering
retainKeyOrdering
subName
For example, if you want to pass subscriptionPosition
, subName
and cleanupSubscription
to pfSQL, you can use the following SQL:
SELECT * FROM `input` OPTIONS ('subscriptionPosition'=Latest, 'cleanupSubscription'=true, 'subName'='test-sub-name');
Using them together
You can use all the above features together. For example, if you want to query data from a topic with KeyValue schema, and you want to pass some configs as well, you can use the following SQL:
SELECT * FROM `input` OPTIONS ('subscriptionPosition'=Latest, 'cleanupSubscription'=true, 'subName'='test-sub-name') UNWRAP KEY;
INSERT INTO
INSERT INTO
routes messages from one topic to another, followed by a SELECT
statement to specify the data source with possible filtering/transformation conditions.
Syntax scheme
INSERT INTO `topic_name`
SELECT {statement}
Example
The following example represents routing all messages from the input topic to the output topic.
INSERT INTO `output_topic`
SELECT * FROM `input_topic`
INSERT MULTI
INSERT MULTI
routes messages from a topic to multiple ones based on specific criteria, followed by a SELECT
statement to specify the data source with possible filtering/transformation conditions.
Syntax scheme
INSERT MULTI
IF {when_condition} THEN INTO `topic_name`,
...
ELSE INTO `topic_name`
SELECT {statement}
Example
The following example represents routing qualified messages from the input topic to three output topics based on the value of the key and the routing property.
INSERT MULTI
IF KEY="us" THEN INTO `output_topic0`,
IF KEY="cn" THEN INTO `output_topic1`,
ELSE INTO `output_topic2`
SELECT * FROM `input_topic` WHERE KEY = "us" OR KEY = "cn" AND PROPERTIES[ROUTING] = "true"
CREATE SOURCE
To feed data from external systems into your Pulsar cluster, you can use the CREATE SOURCE
statement to create a Pulsar source connector.
Note
You may still using pulsar-admin
, pulsarctl
, terraform
, or Cloud Console
to manage source connectors, CREATE SOURCE
is just another way to create source connectors.
Syntax scheme
CREATE SOURCE `source_name` [("source_property_name"="source_property_value", ...)]
FROM ( "source_type" | "source_package_url" )
OUTPUT "output_topic_name"
Example
CREATE SOURCE `source-connector`("configs/sleepBetweenMessages"="500") FROM `data-generator` OUTPUT `topic0`
CREATE SINK
To feed data from your Pulsar cluster into external systems, use the CREATE SINK
statement to create a Pulsa sink connector.
Note
You may still using pulsar-admin
, pulsarctl
, terraform
, or Cloud Console
to manage sink connectors, CREATE SINK
is just another way to create sink connectors.
Syntax scheme
CREATE SINK `sink_name`[("sink_property_name"="sink_property_value", ...)]
FROM ( `sink_type` | `sink_package_url` )
INPUT `input_topic`, ...
Example
CREATE SINK `sink-connector`("topic_to_serde_className/input1"="serde1", "topictoserdeclassName/input2"="serde2") FROM `data-generator` INPUT `topic0`
Pulsar Schema in pfSQL
pfSQL loads the schema of the input record and creates an output record with the same schema type. For example, if the input topic is in the AVRO schema, then the output topic is in the AVRO schema as well.
When using the SELECT
statement to transform messages, the generated messages use the same schema type as the input topic but with new schema info based on the transformation rules.
pfSQL currently supports the schema types of AVRO, JSON, and Key/Value when accessing the payload fields. Protobuf and Protobuf Native is not supported yet.