The Canal source connector pulls messages from MySQL to Pulsar topics.
Configuration
The configuration of Canal source connector has the following properties.
Property
Name | Required | Sensitive | Default | Description |
---|---|---|---|---|
username | true | true | None | Canal server account (not MySQL). |
password | true | true | None | Canal server password (not MySQL). |
destination | true | false | None | Source destination that Canal source connector connects to. |
singleHostname | false | false | None | Canal server address. |
singlePort | false | false | None | Canal server port. |
cluster | true | false | false | Whether to enable cluster mode based on Canal server configuration or not.<br/><br/><li>true: cluster mode.<br/>If set to true, it talks to zkServers to figure out the actual database host.<br/><br/><li>false: standalone mode.<br/>If set to false, it connects to the database specified by singleHostname and singlePort . |
zkServers | true | false | None | Address and port of the Zookeeper that Canal source connector talks to figure out the actual database host. |
batchSize | false | false | 1000 | Batch size to fetch from Canal. |
Example
Before using the Canal connector, you can create a configuration file through one of the following methods.
JSON
{ "zkServers": "127.0.0.1:2181", "batchSize": "5120", "destination": "example", "username": "", "password": "", "cluster": false, "singleHostname": "127.0.0.1", "singlePort": "11111", }
YAML
You can create a YAML file and copy the contents below to your YAML file.
configs: zkServers: "127.0.0.1:2181" batchSize: 5120 destination: "example" username: "" password: "" cluster: false singleHostname: "127.0.0.1" singlePort: 11111
Usage
Here is an example of storing MySQL data using the configuration file as above.
Start a MySQL server.
$ docker pull mysql:5.7 $ docker run -d -it --rm --name pulsar-mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=canal -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw mysql:5.7
Create a configuration file
mysqld.cnf
.[mysqld] pid-file = /var/run/mysqld/mysqld.pid socket = /var/run/mysqld/mysqld.sock datadir = /var/lib/mysql #log-error = /var/log/mysql/error.log # By default we only accept connections from localhost #bind-address = 127.0.0.1 # Disabling symbolic-links is recommended to prevent assorted security risks symbolic-links=0 log-bin=mysql-bin binlog-format=ROW server_id=1
Copy the configuration file
mysqld.cnf
to MySQL server.$ docker cp mysqld.cnf pulsar-mysql:/etc/mysql/mysql.conf.d/
Restart the MySQL server.
$ docker restart pulsar-mysql
Create a test database in MySQL server.
$ docker exec -it pulsar-mysql /bin/bash $ mysql -h 127.0.0.1 -uroot -pcanal -e 'create database test;'
Start a Canal server and connect to MySQL server.
$ docker pull canal/canal-server:v1.1.2 $ docker run -d -it --link pulsar-mysql -e canal.auto.scan=false -e canal.destinations=test -e canal.instance.master.address=pulsar-mysql:3306 -e canal.instance.dbUsername=root -e canal.instance.dbPassword=canal -e canal.instance.connectionCharset=UTF-8 -e canal.instance.tsdb.enable=true -e canal.instance.gtidon=false --name=pulsar-canal-server -p 8000:8000 -p 2222:2222 -p 11111:11111 -p 11112:11112 -m 4096m canal/canal-server:v1.1.2
Start Pulsar standalone.
$ docker pull apachepulsar/pulsar:2.3.0 $ docker run -d -it --link pulsar-canal-server -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-standalone apachepulsar/pulsar:2.3.0 bin/pulsar standalone
Modify the configuration file
canal-mysql-source-config.yaml
.configs: zkServers: "" batchSize: "5120" destination: "test" username: "" password: "" cluster: false singleHostname: "pulsar-canal-server" singlePort: "11111"
Create a consumer file
pulsar-client.py
.import pulsar client = pulsar.Client('pulsar://localhost:6650') consumer = client.subscribe('my-topic', subscription_name='my-sub') while True: msg = consumer.receive() print("Received message: '%s'" % msg.data()) consumer.acknowledge(msg) client.close()
Copy the configuration file
canal-mysql-source-config.yaml
and the consumer filepulsar-client.py
to Pulsar server.$ docker cp canal-mysql-source-config.yaml pulsar-standalone:/pulsar/conf/ $ docker cp pulsar-client.py pulsar-standalone:/pulsar/
Download a Canal connector and start it.
$ docker exec -it pulsar-standalone /bin/bash $ wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/connectors/pulsar-io-canal-2.3.0.nar -P connectors $ ./bin/pulsar-admin source localrun \ --archive ./connectors/pulsar-io-canal-2.3.0.nar \ --classname org.apache.pulsar.io.canal.CanalStringSource \ --tenant public \ --namespace default \ --name canal \ --destination-topic-name my-topic \ --source-config-file /pulsar/conf/canal-mysql-source-config.yaml \ --parallelism 1
Consume data from MySQL.
$ docker exec -it pulsar-standalone /bin/bash $ python pulsar-client.py
Open another window to log in MySQL server.
$ docker exec -it pulsar-mysql /bin/bash $ mysql -h 127.0.0.1 -uroot -pcanal
Create a table, and insert, delete, and update data in MySQL server.
mysql> use test; mysql> show tables; mysql> CREATE TABLE IF NOT EXISTS `test_table`(`test_id` INT UNSIGNED AUTO_INCREMENT,`test_title` VARCHAR(100) NOT NULL, `test_author` VARCHAR(40) NOT NULL, `test_date` DATE,PRIMARY KEY ( `test_id` ))ENGINE=InnoDB DEFAULT CHARSET=utf8; mysql> INSERT INTO test_table (test_title, test_author, test_date) VALUES("a", "b", NOW()); mysql> UPDATE test_table SET test_title='c' WHERE test_title='a'; mysql> DELETE FROM test_table WHERE test_title='c';