Get started with pfSQL
This feature is currently in alpha. If you want to try it out or have any questions, submit a ticket to the support team.
This tutorial walks you through an example of using SQL queries to create a source connector in Pulsar, covering filtering, routing, and transformation operations on the generated data.
Step1: Create a source connector
Assume you want to create a data generator source connector named “pipeline-source” that outputs data to the “pipeline-datagen-source” topic with a message-sending rate of 500ms.
Enter the following SQL when creating a query.
Enter the following SQL when creating a query.
Run the command to submit this filtering query to the pfSQL gateway.
If you want to submit the query to a specific tenant and namespace rather than the default one, you can use pfsql run query --query '$query' -p tenant=$tenant -p namespace=$namespace
to pass the information via CLI properties.
Use a backslash (\)
to escape single quotes or double quotes. For more information, see String identifiers.
Step2: Use queries to process messages
After creating a source connector, you can select either of the following ways to continue processing these messages.
- Filter messages
- Route messages
- Transform messages
Filter messages
To filter the data from the “pipeline-datagen-source” topic and inserts it into another topic according to specific age criteria (greater than or equal to 18), you can do the following:
Enter the following SQL when creating a query.
Enter the following SQL when creating a query.
Run the command to submit this filtering query to the pfSQL gateway.
You can use the --preview
flag to preview the query result (for Avro and JSON schemas only) and push it to the console.
Once the filtering query is submitted, you will get a query ID with the pfsql-pfr
prefix. With the query ID, you can get the status and stats of the query with pfsql query status ${queryId}
and pfsql query stats ${queryId}
.
You may also use pfsql query preview ${queryId}
to preview the query result (for Avro and JSON schemas only) and push it to the console.
After the query is deployed, you can see the filtered data in the “pipeline-datagen-age-filter” topic.
Route messages
To route the messages from the “pipeline-datagen-source” topic to multiple topics according to specific age criteria, you can implement the following logic:
If the age
field is less than 18, the messages go to the “pipeline-datagen-age-routing-1” topic.
If it is between 18 and 60 (exclusive), the messages go to the “pipeline-datagen-age-routing-2” topic.
Otherwise, the messages are routed to the “pipeline-datagen-age-routing-3” topic.
Enter the following SQL when creating a query.
Enter the following SQL when creating a query.
Run the command to submit this filtering query to the pfSQL gateway.
You can use the --preview
flag to preview the query result (for Avro and JSON schemas only) and push it to the console.
Once the filtering query is submitted, you will get a query ID with the pfsql-pfr
prefix. With the query ID, you can get the status and stats of the query with pfsql query status ${queryId}
and pfsql query stats ${queryId}
.
You may also use pfsql query preview ${queryId}
to preview the query result (for Avro and JSON schemas only) and push it to the console.
After the query is deployed, you can see the routed data in the “pipeline-datagen-age-routing-1”, “pipeline-datagen-age-routing-2”, and “pipeline-datagen-age-routing-3” topics.
Transform messages
To extract a field from the “pipeline-datagen-source” topic and insert the field into another topic according to specific age criteria, you can do the following:
Enter the following SQL when creating a query.
Enter the following SQL when creating a query.
Run the command to submit this filtering query to the pfSQL gateway.
You can use the --preview
flag to preview the query result (for Avro and JSON schemas only) and push it to the console.
Once the filtering query is submitted, you will get a query ID with the pfsql-pfr
prefix. With the query ID, you can get the status and stats of the query with pfsql query status ${queryId}
and pfsql query stats ${queryId}
.
You may also use pfsql query preview ${queryId}
to preview the query result (for Avro and JSON schemas only) and push it to the console.
After the query is deployed, you can see the transformed data in the “pipeline-datagen-age-select” topic.
Step3: Access query results
You can use the ‘pulsar-client’ tool and run the following command to subscribe to the output topic and receive the processed messages.
For example, to access the query results after transforming the messages, you can run the following command:
You can also use the pfsql query preview
command to preview the query result (for Avro and JSON schemas only) and push it to the console.
Step4: Clean up queries
After running the SQL queries, you can delete queries that you don’t need anymore.
Please refer to Delete a query.
Please refer to Delete a query.
- Run the
pfsql query list
command to get a list of all queries that have been executed. - Copy the returned query ID of each query you want to delete.
- Run the
pfsql query delete ${queryId}
command to delete each query one by one. Replace${queryId}
with the query ID that you copied in step 2.
For example, if the query IDs are pfsql-pfr-1k1u04hs8d5k5-e1023304
and pfsql-pfr-1h7tpcxofd3kz-ef81fe6c
, you can run the following commands to delete them: