The Aiven JDBC source connector reads data from any JDBC-compliant database and writes data to Kafka topics.

Prerequisites

  • Valid credentials for the source database.
  • The connection.url for your database.

Quick Start

  1. Setup the kcctl client: doc
  2. Create a MySQL database in your Kubernetes cluster:
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
helm install mysql bitnami/mysql \
  --set auth.rootPassword=secretpassword \
  --set auth.database=test
kubectl wait -l app.kubernetes.io/instance=mysql --for=condition=Ready pod --timeout=5m
# Create a table and insert some data
kubectl run -i --rm --tty mysql-client --image=mysql:8.0 --restart=Never -- mysql -h mysql.default.svc.cluster.local -uroot -psecretpassword -Dtest -e "CREATE TABLE test_table (id INT PRIMARY KEY, name VARCHAR(255)); INSERT INTO test_table (id, name) VALUES (1, 'test-user');"
  1. Create a JSON file like the following:
{
    "name": "jdbc-source",
    "config": {
        "connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:mysql://mysql.default.svc.cluster.local:3306/test",
        "connection.user": "root",
        "connection.password": "secretpassword",
        "mode": "incrementing",
        "table.whitelist": "test_table",
        "incrementing.column.name": "id",
        "topic.prefix": "jdbc_",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "tasks.max": "1"
    }
}
  1. Run the following command to create the connector:
kcctl create -f <filename>.json

Configuration

Configure the Aiven JDBC source connector with the following properties:
PropertyRequiredDefaultDescription
connection.urltrueJDBC connection URL.
connection.usertruenullJDBC connection user.
connection.passwordtruenullJDBC connection password.
connection.attemptsfalse3Maximum number of attempts to retrieve a valid JDBC connection.
connection.backoff.msfalse10000Backoff time in milliseconds between connection attempts.
table.whitelistfalse""List of tables to include in copying.
table.blacklistfalse""List of tables to exclude from copying.
catalog.patternfalsenullCatalog pattern to fetch table metadata from the database.
schema.patternfalsenullSchema pattern to fetch table metadata from the database.
numeric.precision.mappingfalsefalseWhether or not to attempt mapping NUMERIC values by precision to integral types. (deprecated)
numeric.mappingfalsenullMap NUMERIC values by precision and optionally scale to integral or decimal types.
table.names.qualifyfalsetrueWhether to use fully-qualified table names when querying the database.
db.timezonefalseUTCName of the JDBC timezone that should be used in the connector when querying with time-based criteria.
dialect.namefalse""The name of the database dialect that should be used for this connector.
sql.quote.identifiersfalsetrueWhether to delimit identifiers in SQL statements.
modetrueThe mode for updating a table each time it is polled. Valid Values: [bulk, timestamp, incrementing, timestamp+incrementing]
incrementing.column.namefalse""The name of the strictly incrementing column to use to detect new rows.
timestamp.column.namefalse""Comma separated list of one or more timestamp columns to detect new or modified rows.
validate.non.nullfalsetrueBy default, the JDBC connector will validate that all incrementing and timestamp tables have NOT NULL set for the columns being used as their ID/timestamp.
queryfalse""If specified, the query to perform to select new or updated rows.
timestamp.initial.msfalse0The initial value of timestamp when selecting records.
incrementing.initialfalse-1For the incrementing column, consider only the rows that have the value greater than this.
table.typesfalseTABLEA comma-separated list of table types to extract.
poll.interval.mstrue5000Frequency in ms to poll for new data in each table.
batch.max.rowsfalse100Maximum number of rows to include in a single batch when polling for new data.
table.poll.interval.msfalse60000Frequency in ms to poll for new or removed tables.
topic.prefixtruePrefix to prepend to table names to generate the name of the Kafka topic.
timestamp.delay.interval.mstrue0How long to wait after a row with certain timestamp appears before we include it in the result.
For full details, see the Aiven JDBC source connector configs