This feature is currently in alpha. If you want to try it out or have any questions, submit a ticket to the support team.
Step1: Create a source connector
Assume you want to create a data generator source connector named “pipeline-source” that outputs data to the “pipeline-datagen-source” topic with a message-sending rate of 500ms.Enter the following SQL when creating a query.
Step2: Use queries to process messages
After creating a source connector, you can select either of the following ways to continue processing these messages.- Filter messages
- Route messages
- Transform messages
Filter messages
To filter the data from the “pipeline-datagen-source” topic and inserts it into another topic according to specific age criteria (greater than or equal to 18), you can do the following:Enter the following SQL when creating a query.
Route messages
To route the messages from the “pipeline-datagen-source” topic to multiple topics according to specific age criteria, you can implement the following logic: If theage
field is less than 18, the messages go to the “pipeline-datagen-age-routing-1” topic.
If it is between 18 and 60 (exclusive), the messages go to the “pipeline-datagen-age-routing-2” topic.
Otherwise, the messages are routed to the “pipeline-datagen-age-routing-3” topic.
Enter the following SQL when creating a query.
Transform messages
To extract a field from the “pipeline-datagen-source” topic and insert the field into another topic according to specific age criteria, you can do the following:Enter the following SQL when creating a query.
Step3: Access query results
You can use the ‘pulsar-client’ tool and run the following command to subscribe to the output topic and receive the processed messages. For example, to access the query results after transforming the messages, you can run the following command:pfsql query preview
command to preview the query result (for Avro and JSON schemas only) and push it to the console.
Step4: Clean up queries
After running the SQL queries, you can delete queries that you don’t need anymore.Please refer to Delete a query.