1. Process Data Streams
  2. pfSQL (Alpha)

Get started with pfSQL

Note

This feature is currently in alpha. If you want to try it out or have any questions, submit a ticket to the support team.

This tutorial walks you through an example of using SQL queries to create a source connector in Pulsar, covering filtering, routing, and transformation operations on the generated data.

Step1: Create a source connector

Assume you want to create a data generator source connector named “pipeline-source” that outputs data to the “pipeline-datagen-source” topic with a message-sending rate of 500ms.

Enter the following SQL when creating a query.

CREATE SOURCE `pipeline-source` FROM `data-generator` OUTPUT `pipeline-datagen-source` WITH("configs/sleepBetweenMessages"='500')

Step2: Use queries to process messages

After creating a source connector, you can select either of the following ways to continue processing these messages.

  • Filter messages
  • Route messages
  • Transform messages

Filter messages

To filter the data from the “pipeline-datagen-source” topic and inserts it into another topic according to specific age criteria (greater than or equal to 18), you can do the following:

Enter the following SQL when creating a query.

INSERT INTO `pipeline-datagen-age-filter` SELECT * FROM `pipeline-datagen-source` WHERE `age` >= 18

After the query is deployed, you can see the filtered data in the “pipeline-datagen-age-filter” topic.

Route messages

To route the messages from the “pipeline-datagen-source” topic to multiple topics according to specific age criteria, you can implement the following logic: If the age field is less than 18, the messages go to the “pipeline-datagen-age-routing-1” topic. If it is between 18 and 60 (exclusive), the messages go to the “pipeline-datagen-age-routing-2” topic. Otherwise, the messages are routed to the “pipeline-datagen-age-routing-3” topic.

Enter the following SQL when creating a query.

INSERT MULTI IF `age` < 18 THEN INTO `pipeline-datagen-age-routing-1`, IF `age` >= 18 AND `age` < 60 THEN INTO `pipeline-datagen-age-routing-2`, ELSE INTO `pipeline-datagen-age-routing-3` SELECT * FROM `pipeline-datagen-source`

After the query is deployed, you can see the routed data in the “pipeline-datagen-age-routing-1”, “pipeline-datagen-age-routing-2”, and “pipeline-datagen-age-routing-3” topics.

Transform messages

To extract a field from the “pipeline-datagen-source” topic and insert the field into another topic according to specific age criteria, you can do the following:

Enter the following SQL when creating a query.

INSERT INTO `pipeline-datagen-age-select` SELECT `age` FROM `pipeline-datagen-source` WHERE `age` >= 18

After the query is deployed, you can see the transformed data in the “pipeline-datagen-age-select” topic.

Step3: Access query results

You can use the ‘pulsar-client’ tool and run the following command to subscribe to the output topic and receive the processed messages.

For example, to access the query results after transforming the messages, you can run the following command:

pulsar-client consume -s my-subscription -p Earliest -n 0 -t persistent://public/default/pipeline-datagen-age-select

You can also use the pfsql query preview command to preview the query result (for Avro and JSON schemas only) and push it to the console.

Step4: Clean up queries

After running the SQL queries, you can delete queries that you don’t need anymore.

Please refer to Delete a query.

What’s next?

Previous
Work With pfSQL CLI