1. Process Data Streams
  2. pfSQL (Alpha)

Understand pfSQL

Note

This feature is currently in alpha. If you want to try it out or have any questions, submit a ticket to the support team.

This section walks you through the basic pfSQL elements, syntax and typical examples.

pfSQL identifiers

pfSQL identifiers are used to reference objects, such as Pulsar topics, structural data fields, functions, or connectors.

pfSQL supports the following three types of identifiers.

Unquoted identifiers

Unquoted identifiers must begin with a letter or underscore (_) and cannot contain extended characters or blank spaces.

Some examples of valid unquoted identifiers include:

  • customer
  • order_details
  • _id
  • email_address

For example, in a SQL query, you might reference a column using an unquoted identifier like this:

INSERT INTO `output-topic` SELECT customer_name, order_date FROM `input-topic`;

In this example, customer_name and order_date are both unquoted identifiers that refer to column names.

Note

Don’t use reserved keywords (such as SELECT, INSERT, and so on) to name columns or tables. Instead, you can use backquotes (`) to enclose reserved keywords or use a different name for your object.

Backquoted Identifiers

Backquoted identifiers are case-sensitive and can start with or contain any valid characters, including:

  • Numbers
  • Special characters (., ', !, @, #, $, %, ^, &, *, and so on)
  • Extended ASCII and non-ASCII characters
  • Blank spaces
  • Reserved keywords

It is highly recommended to use backquoted identifiers in the whole query, including the reference of Pulsar topics, structural data fields, and so on.

String identifiers

Unlike unquoted and backquoted identifiers, string identifiers are string-type consts, such as text and dates.

String identifiers can contain any combination of characters, including whitespace, punctuation, and special characters.

In general, each string identifier must be enclosed by double quotes (“) or single quotes ('). For example, 'foo' in the following query is a string that represents the text “foo”.

INSERT INTO `output-topic` SELECT customer_name, order_date FROM `input-topic` WHERE customer_name = 'foo';

If you want to include a single quote or double quote within the string itself, you need to escape it using a backslash ().

The following is an example of using a backslash to escape a single quote.

INSERT INTO `output-topic` SELECT customer_name, order_date FROM `input-topic` WHERE description = “Apple\’s product”;

Logical/Boolean operators

The following table outlines the SQL logical/boolean operators that pfSQL supports.

OperatorDescriptionExample
ANDReturns TRUE if both operands are TRUE.KEY = ‘key1’ AND PROPERTIES[ROUTING] = ‘true’
ORReturns TRUE if either operand is TRUE.KEY = ‘key1’ OR KEY = ‘key2’
NOTReturns TRUE if the operand is FALSE.NOT KEY = ‘key1’

Comparison operators

The following table outlines the SQL comparison operators that pfSQL supports.

OperatorDescriptionExampleResult
=Checks if the values of two operands are equal or not.1 = 2FALSE
!=Checks if the values of two operands are equal or not.1 != 2TRUE
>Checks if the value of the left operand is greater than the value of the right operand.1 > 2FALSE
<Checks if the value of the left operand is less than the value of the right operand.1 < 2TRUE
>=Checks if the value of the left operand is greater than or equal to the value of the right operand.1 >= 2FALSE
<=Checks if the value of the left operand is less than or equal to the value of the right operand.1 <= 2TRUE

Arithmetic operators

Unary operators

Unary arithmetic operators are used to perform arithmetic operations on a single operand. The unary arithmetic operators are:

  • + (unary plus)
  • - (unary minus)

The unary plus operator does not change the sign of the operand. It is included for completeness.

The unary minus operator changes the sign of the operand.

Supported input data types: INT32, INT64, FLOAT, DOUBLE

Output data type is the same as the input data type.

Binary operators

Binary arithmetic operators are used to perform arithmetic operations on two operands. The binary arithmetic operators are:

  • + (addition)
  • - (subtraction)
  • * (multiplication)
  • / (division)
  • % (modulo)

Supported input data types: INT32, INT64, FLOAT, DOUBLE

Output data type is following the precedence rules:

  • If both operands are INT32, the output data type is INT32.
  • If both operands are INT64, the output data type is INT64.
  • If both operands are FLOAT, the output data type is FLOAT.
  • If both operands are DOUBLE, the output data type is DOUBLE.
  • If one operand is INT32 and the other operand is INT64, the output data type is INT64.
  • If one operand is INT32 and the other operand is FLOAT, the output data type is FLOAT.
  • If one operand is INT32 and the other operand is DOUBLE, the output data type is DOUBLE.
  • If one operand is INT64 and the other operand is FLOAT, the output data type is FLOAT.
  • If one operand is INT64 and the other operand is DOUBLE, the output data type is DOUBLE.
  • If one operand is FLOAT and the other operand is DOUBLE, the output data type is DOUBLE.

The modulo operator is only supported for INT32 and INT64 operands. The output data type is the same as the input data type.

pfSQL statements

The basic statements of pfSQL include SELECT, INSERT INTO, and INSERT MULTI. You can use a combination of them for routing, filtering, and projection purposes.

Note

The statements are case-insensitive. It’s highly recommended to use upper cases for reserved keywords, such as SELECT, INSERT, and so on.

SELECT

The SELECT statement is used to select data from a topic. It is the key statement of a pfSQL query to define the message scope and processing conditions.

Syntax scheme

SELECT `selectItem0`[, `selectItem1`, ...]
    FROM `fromTopic0`[, `fromTopic1`, ...]
    [WHERE {query_where}]
    [OPTIONS {query_options}]

Note

  1. Both WHERE clause and OPTION are optional.
  2. WHERE clause allows you to define a set of conditions that the data needs to match in order to be returned. If you don’t want to add any conditions to limit your query results, you can skip it. For more information about the supported operators, see Logical/Boolean operators.

Example

The following example represents selecting the qualified messages from the input topic. Any messages that do not satisfy the two conditions are excluded.

SELECT * FROM `input_topic` WHERE `field0` = "1" AND `field1` = "demo"

OPTIONS

On top of the SELECT statement, you can use OPTIONS for specific use cases.

KeyValue Schema Support

KeyValue is a schema type introduced by the Apache Pulsar, and for more details, please refer to https://pulsar.apache.org/docs/3.1.x/schema-understand/#keyvalue-schema.

When using pfSQL to query data in KeyValue schema, you need to use either OPTIONS UNWRAP KEY or OPTIONS UNWRAP VALUE to unwrap the KeyValue schema, or OPTIONS MERGE to merge the KeyValue schema into a single message.

For example, if you have a topic with KeyValue schema, and you want to query the key and value separately, you can use the following SQL:

SELECT * FROM `input` OPTIONS UNWRAP KEY;

SELECT * FROM `input` OPTIONS UNWRAP VALUE;

Note

With UNWRAP KEY, the SELECT statement only affects the message keys. Take the KeyValue<AVRO, AVRO> message for example, SELECT * FROM \topic` OPTIONS UNWRAP KEYonly selects the message key and ignores the message value. Similarly,UNWRAP VALUE` takes the message value as the output message and ignores the key.

Passing Pulsar Function Configs

You can pass Pulsar Function configs to pfSQL by using OPTIONS. Available configs are limited to the following:

  • processingGuarantees
  • cleanupSubscription
  • subscriptionPosition
  • deadLetterTopic
  • maxMessageRetries
  • retainOrdering
  • retainKeyOrdering
  • subName

For example, if you want to pass subscriptionPosition, subName and cleanupSubscription to pfSQL, you can use the following SQL:

SELECT * FROM `input` OPTIONS ('subscriptionPosition'=Latest, 'cleanupSubscription'=true, 'subName'='test-sub-name');
Using them together

You can use all the above features together. For example, if you want to query data from a topic with KeyValue schema, and you want to pass some configs as well, you can use the following SQL:

SELECT * FROM `input` OPTIONS ('subscriptionPosition'=Latest, 'cleanupSubscription'=true, 'subName'='test-sub-name') UNWRAP KEY;

INSERT INTO

INSERT INTO routes messages from one topic to another, followed by a SELECT statement to specify the data source with possible filtering/transformation conditions.

Syntax scheme

INSERT INTO `topic_name`
SELECT {statement}

Example

The following example represents routing all messages from the input topic to the output topic.

INSERT INTO `output_topic`
SELECT * FROM `input_topic`

INSERT MULTI

INSERT MULTI routes messages from a topic to multiple ones based on specific criteria, followed by a SELECT statement to specify the data source with possible filtering/transformation conditions.

Syntax scheme

INSERT MULTI
IF {when_condition} THEN INTO `topic_name`,
...
ELSE INTO `topic_name`
SELECT {statement}

Example

The following example represents routing qualified messages from the input topic to three output topics based on the value of the key and the routing property.

INSERT MULTI
IF KEY="us" THEN INTO `output_topic0`,
IF KEY="cn" THEN INTO `output_topic1`,
ELSE INTO `output_topic2`
SELECT * FROM `input_topic` WHERE KEY = "us" OR KEY = "cn" AND PROPERTIES[ROUTING] = "true"

CREATE SOURCE

To feed data from external systems into your Pulsar cluster, you can use the CREATE SOURCE statement to create a Pulsar source connector.

Note

You may still using pulsar-admin, pulsarctl, terraform, or Cloud Console to manage source connectors, CREATE SOURCE is just another way to create source connectors.

Syntax scheme

CREATE SOURCE `source_name` [("source_property_name"="source_property_value", ...)]
    FROM ( "source_type" | "source_package_url" )
    OUTPUT "output_topic_name"

Example

CREATE SOURCE `source-connector`("configs/sleepBetweenMessages"="500") FROM `data-generator` OUTPUT `topic0`

CREATE SINK

To feed data from your Pulsar cluster into external systems, use the CREATE SINK statement to create a Pulsa sink connector.

Note

You may still using pulsar-admin, pulsarctl, terraform, or Cloud Console to manage sink connectors, CREATE SINK is just another way to create sink connectors.

Syntax scheme

CREATE SINK `sink_name`[("sink_property_name"="sink_property_value", ...)]
    FROM ( `sink_type` | `sink_package_url` )
    INPUT `input_topic`, ...

Example

CREATE SINK `sink-connector`("topic_to_serde_className/input1"="serde1", "topictoserdeclassName/input2"="serde2") FROM `data-generator` INPUT `topic0`

Pulsar Schema in pfSQL

pfSQL loads the schema of the input record and creates an output record with the same schema type. For example, if the input topic is in the AVRO schema, then the output topic is in the AVRO schema as well.

When using the SELECT statement to transform messages, the generated messages use the same schema type as the input topic but with new schema info based on the transformation rules.

pfSQL currently supports the schema types of AVRO, JSON, and Key/Value when accessing the payload fields. Protobuf and Protobuf Native is not supported yet.

What’s next

Previous
Get Started