> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

> The JDBC sink connector pulls messages from Pulsar topics and persists the messages to MySQL or SQlite.

# Jdbc sink

The JDBC sink connector pulls messages from Pulsar topics and persists the messages to MySQL or SQlite.

> Currently, INSERT, DELETE and UPDATE operations are supported.

# Configuration

The configuration of the JDBC sink connector has the following properties.

## Property

| Name        | Type   | Required | Sensitive | Default            | Description                                                                                                              |
| ----------- | ------ | -------- | --------- | ------------------ | ------------------------------------------------------------------------------------------------------------------------ |
| `userName`  | String | false    | true      | " " (empty string) | The username used to connect to the database specified by `jdbcUrl`.<br /><br />**Note: `userName` is case-sensitive.**  |
| `password`  | String | false    | true      | " " (empty string) | The password used to connect to the database specified by `jdbcUrl`. <br /><br />**Note: `password` is case-sensitive.** |
| `jdbcUrl`   | String | true     | false     | " " (empty string) | The JDBC URL of the database to which the connector connects.                                                            |
| `tableName` | String | true     | false     | " " (empty string) | The name of the table to which the connector writes.                                                                     |
| `nonKey`    | String | false    | false     | " " (empty string) | A comma-separated list contains the fields used in updating events.                                                      |
| `key`       | String | false    | false     | " " (empty string) | A comma-separated list contains the fields used in `where` condition of updating and deleting events.                    |
| `timeoutMs` | int    | false    | false     | 500                | The JDBC operation timeout in milliseconds.                                                                              |
| `batchSize` | int    | false    | false     | 200                | The batch size of updates made to the database.                                                                          |

## Batch support (PostgreSQL)

When using PostgreSQL with batched writes (for example, setting `batchSize > 1`), add `reWriteBatchedInserts=true` to the `jdbcUrl` so the PostgreSQL JDBC driver rewrites batched inserts for better performance.

* Example: `jdbc:postgresql://<host>:5432/<db>?reWriteBatchedInserts=true`
* The parameter name is case-sensitive: `reWriteBatchedInserts`.

For details, see the PostgreSQL JDBC driver documentation: [https://jdbc.postgresql.org/documentation/use/#:\~:text=reWriteBatchedInserts%20(boolean)%20Default%20false%0AThis%20will](https://jdbc.postgresql.org/documentation/use/#:~:text=reWriteBatchedInserts%20\(boolean\)%20Default%20false%0AThis%20will)

## Example

Before using the JDBC sink connector, you need to create a configuration file through one of the following methods.

* JSON

  ```json theme={null}
  {
      "userName": "root",
      "password": "jdbc",
      "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/pulsar_mysql_jdbc_sink",
      "tableName": "pulsar_mysql_jdbc_sink"
  }
  ```

* YAML

  ```yaml theme={null}
  configs:
      userName: "root"
      password: "jdbc"
      jdbcUrl: "jdbc:mysql://127.0.0.1:3306/pulsar_mysql_jdbc_sink"
      tableName: "pulsar_mysql_jdbc_sink"
  ```

# Usage

For more information about **how to use a JDBC sink connector**, see [connect Pulsar to Postgres](https://pulsar.apache.org/docs/io-quickstart/#connect-pulsar-to-postgresql).
