> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Get started with pfSQL

<Note title="Note">
  This feature is currently in alpha. If you want to try it out or have any questions, [submit a ticket](https://support.streamnative.io/hc/en-us/requests/new) to the support team.
</Note>

This tutorial walks you through an example of using SQL queries to create a source connector in Pulsar, covering filtering, routing, and transformation operations on the generated data.

## Step1: Create a source connector

Assume you want to create a data generator source connector named “pipeline-source” that outputs data to the “pipeline-datagen-source” topic with a message-sending rate of 500ms.

<Tabs>
  <Tab title="StreamNative Cloud Console">
    Enter the following SQL when [creating a query](/cloud/process/pfsql/pfsql-work-with-cloud-console#create-a-query).

    ```sql theme={null}
    CREATE SOURCE `pipeline-source` FROM `data-generator` OUTPUT `pipeline-datagen-source` WITH("configs/sleepBetweenMessages"='500')
    ```
  </Tab>

  <Tab title="pfSQL CLI">
    Run the command to submit this filtering query to the pfSQL gateway.

    ```bash theme={null}
    pfsql query run --query "CREATE SOURCE \`pipeline-source\` FROM \`data-generator\` OUTPUT \`pipeline-datagen-source\` WITH(\"configs/sleepBetweenMessages\"='500')"
    ```

    If you want to submit the query to a specific tenant and namespace rather than the default one, you can use `pfsql run query --query '$query' -p tenant=$tenant -p namespace=$namespace` to pass the information via CLI properties.

    <Note title="Note">
      Use a backslash `(\)` to escape single quotes or double quotes. For more information, see [String identifiers](/cloud/process/pfsql/pfsql-understand#string-identifiers).
    </Note>
  </Tab>
</Tabs>

## Step2: Use queries to process messages

After creating a source connector, you can select either of the following ways to continue processing these messages.

* Filter messages
* Route messages
* Transform messages

### Filter messages

To filter the data from the “pipeline-datagen-source” topic and inserts it into another topic according to specific age criteria (greater than or equal to 18), you can do the following:

<Tabs>
  <Tab title="StreamNative Cloud Console">
    Enter the following SQL when [creating a query](/cloud/process/pfsql/pfsql-work-with-cloud-console#create-a-query).

    ```sql theme={null}
    INSERT INTO `pipeline-datagen-age-filter` SELECT * FROM `pipeline-datagen-source` WHERE `age` >= 18
    ```
  </Tab>

  <Tab title="pfSQL CLI">
    Run the command to submit this filtering query to the pfSQL gateway.

    ```bash theme={null}
    pfsql query run --query \
          "INSERT INTO \`pipeline-datagen-age-filter\` \
           SELECT * FROM \`pipeline-datagen-source\` \
           WHERE \`age\` >= 18"
    ```

    <Note title="Note">
      You can use the `--preview` flag to preview the query result (for Avro and JSON schemas only) and push it to the console.
    </Note>

    Once the filtering query is submitted, you will get a query ID with the `pfsql-pfr` prefix. With the query ID, you can get the status and stats of the query with `pfsql query status ${queryId}` and `pfsql query stats ${queryId}`.

    You may also use `pfsql query preview ${queryId}` to preview the query result (for Avro and JSON schemas only) and push it to the console.
  </Tab>
</Tabs>

After the query is deployed, you can see the filtered data in the “pipeline-datagen-age-filter” topic.

### Route messages

To route the messages from the “pipeline-datagen-source” topic to multiple topics according to specific age criteria, you can implement the following logic:
If the `age` field is less than 18, the messages go to the “pipeline-datagen-age-routing-1” topic.
If it is between 18 and 60 (exclusive), the messages go to the “pipeline-datagen-age-routing-2” topic.
Otherwise, the messages are routed to the “pipeline-datagen-age-routing-3” topic.

<Tabs>
  <Tab title="StreamNative Cloud Console">
    Enter the following SQL when [creating a query](/cloud/process/pfsql/pfsql-work-with-cloud-console#create-a-query).

    ```sql theme={null}
    INSERT MULTI IF `age` < 18 THEN INTO `pipeline-datagen-age-routing-1`, IF `age` >= 18 AND `age` < 60 THEN INTO `pipeline-datagen-age-routing-2`, ELSE INTO `pipeline-datagen-age-routing-3` SELECT * FROM `pipeline-datagen-source`
    ```
  </Tab>

  <Tab title="pfSQL CLI">
    Run the command to submit this filtering query to the pfSQL gateway.

    ```bash theme={null}
    pfsql query run --query \
      "INSERT MULTI \
             IF \`age\` < 18 THEN INTO \`pipeline-datagen-age-routing-1\`, \
             IF \`age\` >= 18 AND \`age\` < 60 THEN INTO\`pipeline-datagen-age-routing-2\`, \
            ELSE INTO \`pipeline-datagen-age-routing-3\` \
      SELECT * FROM \`pipeline-datagen-source\`"

    ```

    <Note title="Note">
      You can use the `--preview` flag to preview the query result (for Avro and JSON schemas only) and push it to the console.
    </Note>

    Once the filtering query is submitted, you will get a query ID with the `pfsql-pfr` prefix. With the query ID, you can get the status and stats of the query with `pfsql query status ${queryId}` and `pfsql query stats ${queryId}`.

    You may also use `pfsql query preview ${queryId}` to preview the query result (for Avro and JSON schemas only) and push it to the console.
  </Tab>
</Tabs>

After the query is deployed, you can see the routed data in the “pipeline-datagen-age-routing-1”, “pipeline-datagen-age-routing-2”, and “pipeline-datagen-age-routing-3” topics.

### Transform messages

To extract a field from the “pipeline-datagen-source” topic and insert the field into another topic according to specific age criteria, you can do the following:

<Tabs>
  <Tab title="StreamNative Cloud Console">
    Enter the following SQL when [creating a query](/cloud/process/pfsql/pfsql-work-with-cloud-console#create-a-query).

    ```sql theme={null}
    INSERT INTO `pipeline-datagen-age-select` SELECT `age` FROM `pipeline-datagen-source` WHERE `age` >= 18
    ```
  </Tab>

  <Tab title="pfSQL CLI">
    Run the command to submit this filtering query to the pfSQL gateway.

    ```bash theme={null}
    pfsql query run --query "INSERT INTO \`pipeline-datagen-age-select\` SELECT \`age\` FROM \`pipeline-datagen-source\` WHERE \`age\` >= 18"
    ```

    <Note title="Note">
      You can use the `--preview` flag to preview the query result (for Avro and JSON schemas only) and push it to the console.
    </Note>

    Once the filtering query is submitted, you will get a query ID with the `pfsql-pfr` prefix. With the query ID, you can get the status and stats of the query with `pfsql query status ${queryId}` and `pfsql query stats ${queryId}`.

    You may also use `pfsql query preview ${queryId}` to preview the query result (for Avro and JSON schemas only) and push it to the console.
  </Tab>
</Tabs>

After the query is deployed, you can see the transformed data in the “pipeline-datagen-age-select” topic.

## Step3: Access query results

You can use the ‘pulsar-client’ tool and run the following command to subscribe to the output topic and receive the processed messages.

For example, to access the query results after transforming the messages, you can run the following command:

```bash theme={null}
pulsar-client consume -s my-subscription -p Earliest -n 0 -t persistent://public/default/pipeline-datagen-age-select
```

You can also use the `pfsql query preview` command to preview the query result (for Avro and JSON schemas only) and push it to the console.

## Step4: Clean up queries

After running the SQL queries, you can delete queries that you don’t need anymore.

<Tabs>
  <Tab title="StreamNative Cloud Console">
    Please refer to [Delete a query](/cloud/process/pfsql/pfsql-work-with-cloud-console#delete-a-query).
  </Tab>

  <Tab title="pfSQL CLI">
    1. Run the `pfsql query list` command to get a list of all queries that have been executed.
    2. Copy the returned query ID of each query you want to delete.
    3. Run the `pfsql query delete ${queryId}` command to delete each query one by one. Replace `${queryId}` with the query ID that you copied in step 2.

    For example, if the query IDs are `pfsql-pfr-1k1u04hs8d5k5-e1023304` and `pfsql-pfr-1h7tpcxofd3kz-ef81fe6c`, you can run the following commands to delete them:

    ```bash theme={null}
    pfsql query delete pfsql-pfr-1k1u04hs8d5k5-e1023304
    pfsql query delete pfsql-pfr-1h7tpcxofd3kz-ef81fe6c
    ```
  </Tab>
</Tabs>

## What’s next?

* [Understand pfSQL](/cloud/process/pfsql/pfsql-understand)
* [Built-in UDFs](/cloud/process/pfsql/pfsql-built-in-udfs)
