> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Aiven JDBC Kafka Connect Source

> The Aiven JDBC Kafka Connect Source connector.

The Aiven JDBC source connector reads data from any JDBC-compliant database and writes data to Kafka topics.

<Note title="✅ Available on StreamNative Cloud">
  This connector is available as a built-in connector on StreamNative Cloud.
</Note>

### Prerequisites

* Valid credentials for the source database.
* The `connection.url` for your database.

### Quick Start

1. Setup the kcctl client: [doc](https://docs.streamnative.io/docs/kafka-connect-setup)
2. Create a MySQL database in your Kubernetes cluster:

```shell theme={null}
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
helm install mysql bitnami/mysql \
  --set auth.rootPassword=secretpassword \
  --set auth.database=test
kubectl wait -l app.kubernetes.io/instance=mysql --for=condition=Ready pod --timeout=5m
# Create a table and insert some data
kubectl run -i --rm --tty mysql-client --image=mysql:8.0 --restart=Never -- mysql -h mysql.default.svc.cluster.local -uroot -psecretpassword -Dtest -e "CREATE TABLE test_table (id INT PRIMARY KEY, name VARCHAR(255)); INSERT INTO test_table (id, name) VALUES (1, 'test-user');"
```

3. Create a JSON file like the following:

```json theme={null}
{
    "name": "jdbc-source",
    "config": {
        "connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:mysql://mysql.default.svc.cluster.local:3306/test",
        "connection.user": "root",
        "connection.password": "secretpassword",
        "mode": "incrementing",
        "table.whitelist": "test_table",
        "incrementing.column.name": "id",
        "topic.prefix": "jdbc_",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "tasks.max": "1"
    }
}
```

4. Run the following command to create the connector:

```shell theme={null}
kcctl apply -f <filename>.json
```

### Configuration

Configure the Aiven JDBC source connector with the following properties:

| Property                      | Required | Default | Description                                                                                                                                                 |
| ----------------------------- | -------- | ------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `connection.url`              | true     |         | JDBC connection URL.                                                                                                                                        |
| `connection.user`             | true     | null    | JDBC connection user.                                                                                                                                       |
| `connection.password`         | true     | null    | JDBC connection password.                                                                                                                                   |
| `connection.attempts`         | false    | 3       | Maximum number of attempts to retrieve a valid JDBC connection.                                                                                             |
| `connection.backoff.ms`       | false    | 10000   | Backoff time in milliseconds between connection attempts.                                                                                                   |
| `table.whitelist`             | false    | ""      | List of tables to include in copying.                                                                                                                       |
| `table.blacklist`             | false    | ""      | List of tables to exclude from copying.                                                                                                                     |
| `catalog.pattern`             | false    | null    | Catalog pattern to fetch table metadata from the database.                                                                                                  |
| `schema.pattern`              | false    | null    | Schema pattern to fetch table metadata from the database.                                                                                                   |
| `numeric.precision.mapping`   | false    | false   | Whether or not to attempt mapping NUMERIC values by precision to integral types. (deprecated)                                                               |
| `numeric.mapping`             | false    | null    | Map NUMERIC values by precision and optionally scale to integral or decimal types.                                                                          |
| `table.names.qualify`         | false    | true    | Whether to use fully-qualified table names when querying the database.                                                                                      |
| `db.timezone`                 | false    | UTC     | Name of the JDBC timezone that should be used in the connector when querying with time-based criteria.                                                      |
| `dialect.name`                | false    | ""      | The name of the database dialect that should be used for this connector.                                                                                    |
| `sql.quote.identifiers`       | false    | true    | Whether to delimit identifiers in SQL statements.                                                                                                           |
| `mode`                        | true     |         | The mode for updating a table each time it is polled. Valid Values: \[bulk, timestamp, incrementing, timestamp+incrementing]                                |
| `incrementing.column.name`    | false    | ""      | The name of the strictly incrementing column to use to detect new rows.                                                                                     |
| `timestamp.column.name`       | false    | ""      | Comma separated list of one or more timestamp columns to detect new or modified rows.                                                                       |
| `validate.non.null`           | false    | true    | By default, the JDBC connector will validate that all incrementing and timestamp tables have NOT NULL set for the columns being used as their ID/timestamp. |
| `query`                       | false    | ""      | If specified, the query to perform to select new or updated rows.                                                                                           |
| `timestamp.initial.ms`        | false    | 0       | The initial value of timestamp when selecting records.                                                                                                      |
| `incrementing.initial`        | false    | -1      | For the incrementing column, consider only the rows that have the value greater than this.                                                                  |
| `table.types`                 | false    | TABLE   | A comma-separated list of table types to extract.                                                                                                           |
| `poll.interval.ms`            | true     | 5000    | Frequency in ms to poll for new data in each table.                                                                                                         |
| `batch.max.rows`              | false    | 100     | Maximum number of rows to include in a single batch when polling for new data.                                                                              |
| `table.poll.interval.ms`      | false    | 60000   | Frequency in ms to poll for new or removed tables.                                                                                                          |
| `topic.prefix`                | true     |         | Prefix to prepend to table names to generate the name of the Kafka topic.                                                                                   |
| `timestamp.delay.interval.ms` | true     | 0       | How long to wait after a row with certain timestamp appears before we include it in the result.                                                             |

For full details, see the [Aiven JDBC source connector configs](https://github.com/Aiven-Open/jdbc-connector-for-apache-kafka/blob/master/docs/source-connector-config-options.rst)
