sink
JDBC clickhouse Sink
The JDBC sink connector pulls messages from Pulsar topics and persists the messages to MySQL or SQlite.
Authored by
ASF
Support type
StreamNative
License
Apache License 2.0

This document introduces how to get started with creating an JDBC Clickhouse sink connector and get it up and running.

Quick start

Prerequisites

The prerequisites for connecting an JDBC Clickhouse sink connector to external systems include:

  1. Start a Clickhouse server. You can create a single-node Clickhouse cluster by executing this command:
curl https://clickhouse.com/ | sh
./clickhouse server
  1. Create a table, you can use ./clickhouse client to open a SQL shell.
CREATE TABLE users (
name String,
age UInt8,
city String
) ENGINE = MergeTree()
ORDER BY (name, age);

1. Create a connector

The following command shows how to use pulsarctl to create a builtin connector. If you want to create a non-builtin connector, you need to replace --sink-type jdbc-clickhouse with --archive /path/to/pulsar-io-jdbc-clickhouse.nar. You can find the button to download the nar package at the beginning of the document.

For StreamNative Cloud User

If you are a StreamNative Cloud user, you need set up your environment first.

pulsarctl sinks create \
  --sink-type jdbc-clickhouse \
  --name jdbc-clickhouse-sink \
  --tenant public \
  --namespace default \
  --inputs "Your topic name" \
  --parallelism 1 \
  --sink-config \
  '{
    "jdbcUrl": "jdbc:clickhouse://127.0.0.1:8123/default",
    "tableName": "users"
  }'

The --sink-config is the minimum necessary configuration for starting this connector, and it is a JSON string. You need to substitute the relevant parameters with your own. If you want to configure more parameters, see Configuration Properties for reference.

Note

You can also choose to use a variety of other tools to create a connector:

2. Send messages to the topic

Note

If your connector is created on StreamNative Cloud, you need to authenticate your clients. See Build applications using Pulsar clients for more information.

Note that the current implementation supports structured types of schemas, such as Avro, JSON, Protobuf, Protobuf_native, etc.

@Data
@AllArgsConstructor
@NoArgsConstructor
public class ProducerTest {
    
    private String name;
    private int age;
    private String city;
    
    
    public static void main(String[] args) throws PulsarClientException {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("{{Your Pulsar URL}}")
                .build();

        Producer<ProducerTest> producer = client.newProducer(Schema.JSON(ProducerTest.class))
                        .topic("{{Your topic name}}").create();

        MessageId msgID = producer.send(new ProducerTest("John Doe", 30, "New York"));
        System.out.println("Publish message and message ID " + msgID);

        producer.flush();
        producer.close();
        client.close();
        
    }
}

3. Check data on clickhouse

SELECT *
FROM users

Query id: b555a027-a781-47bc-b3dd-c7ffb30dc513

   ┌─name─────┬─age─┬─city─────┐
1. │ John Doe │  30 │ New York │
   └──────────┴─────┴──────────┘

1 row in set. Elapsed: 0.002 sec.

Configuration Properties

The configuration of the JDBC sink connector has the following properties.

NameTypeRequiredSensitiveDefaultDescription
userNameStringfalsetrue" " (empty string)The username used to connect to the database specified by jdbcUrl.<br><br>Note: userName is case-sensitive.
passwordStringfalsetrue" " (empty string)The password used to connect to the database specified by jdbcUrl. <br><br>Note: password is case-sensitive.
jdbcUrlStringtruefalse" " (empty string)The JDBC URL of the database to which the connector connects.
tableNameStringtruefalse" " (empty string)The name of the table to which the connector writes.
keyStringfalsefalse" " (empty string)A comma-separated list contains the fields used in where condition of updating and deleting events.
nonKeyStringfalsefalse" " (empty string)A comma-separated list contains the fields used in updating events.
insertModeenumfalsefalseINSERTOption: INSERT, DELETE and UPDATE. If it is configured as UPSERT, the sink will use upsert semantics rather than plain INSERT/UPDATE statements. Upsert semantics refer to atomically adding a new row or updating the existing row if there is a primary key constraint violation, which provides idempotence.
nullValueActionenumfalsefalseFAILOption: FAIL, DELETE. How to handle records with null values, possible options are DELETE or FAIL.
useTransactionsbooleanfalsefalsefalseEnable transactions of the database.
excludeNonDeclaredFieldsbooleanfalsefalsefalseAll the table fields are discovered automatically. 'excludeNonDeclaredFields' indicates if the table fields not explicitly listed in nonKey and key must be included in the query. By default all the table fields are included. To leverage of table fields defaults during insertion, it is suggested to set this value to true.
useJdbcBatchbooleanfalsefalsefalseUse the JDBC batch API. This option is suggested to improve write performance.
timeoutMsintfalsefalse500The JDBC operation timeout in milliseconds.
batchSizeintfalsefalse200The batch size of updates made to the database.