
Canal服务端安装
服务端需要下载压缩包,下载地址:https://github.com/alibaba/canal/releases
目前最新的是v1.1.5 ,点击下载:
下载完成解压,目录如下:
本文使用Canal+RabbitMQ 进行数据的同步,因此下面步骤完全按照这个base进行。
1、打开MySQL的binlog日志
修改MySQL的日志文件,my.cnf 配置如下:
[mysqld]log-bin=mysql-bin # 开启 binlogbinlog-format=ROW # 选择 ROW 模式server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
2、设置MySQL的配置
需要设置服务端配置文件中的MySQL配置,这样Canal才能知道需要监听哪个库、哪个表的日志文件。
一个 Server 可以配置多个实例监听 ,Canal 功能默认自带的有个 example 实例,本篇就用 example 实例 。如果增加实例,复制 example 文件夹内容到同级目录下,然后在 canal.properties 指定添加实例的名称。
修改canal.deployer-1.1.5\conf\example\instance.properties配置文件
# urlcanal.instance.master.address=127.0.0.1:3306# username/passwordcanal.instance.dbUsername=rootcanal.instance.dbPassword=root# 监听的数据库canal.instance.defaultDatabaseName=test# 监听的表,可以指定,多个用逗号分割,这里正则是监听所有canal.instance.filter.regex=.*\\..*
3、设置RabbitMQ的配置
服务端默认的传输方式是tcp ,需要在配置文件中设置MQ 的相关信息。
这里需要修改两处配置文件,如下;
1、canal.deployer-1.1.5\conf\canal.properties
这个配置文件主要是设置MQ相关的配置,比如URL,用户名、密码…
# 传输方式:tcp, kafka, rocketMQ, rabbitMQcanal.serverMode = rabbitMQ########################################################## RabbitMQ ###############################################################rabbitmq.host = 127.0.0.1rabbitmq.virtual.host =/# exchangerabbitmq.exchange =canal.exchange# 用户名、密码rabbitmq.username =guestrabbitmq.password =guest# 是否持久化rabbitmq.deliveryMode = 2
2、canal.deployer-1.1.5\conf\example\instance.properties
这个文件设置MQ的路由KEY,这样才能路由到指定的队列中,如下:
canal.mq.topic=canal.routing.key
4、RabbitMQ新建exchange和Queue
在RabbitMQ中需要新建一个canal.exchange (必须和配置中的相同)的exchange和一个名称为 canal.queue (名称随意)的队列。
其中绑定的路由KEY为:canal.routing.key (必须和配置中的相同),如下图:
5、启动服务端
点击bin目录下的脚本,windows直接双击startup.bat ,启动成功如下:
6、测试
在本地数据库test 中的oauth_client_details 插入一条数据,如下:
INSERT INTO `oauth_client_details` VALUES ('myjszl', 'res1', '$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W', 'all', 'password,refresh_token,authorization_code,client_credentials,implicit', 'http://www.baidu.com', NULL, 1000, 1000, NULL, 'false');
此时查看MQ中的canal.queue 已经有了数据,如下:
其实就是一串JSON数据,这个JSON如下:
{"data": [{"client_id": "myjszl","resource_ids": "res1","client_secret": "$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W","scope": "all","authorized_grant_types": "password,refresh_token,authorization_code,client_credentials,implicit","web_server_redirect_uri": "http://www.baidu.com","authorities": null,"access_token_validity": "1000","refresh_token_validity": "1000","additional_information": null,"autoapprove": "false"}],"database": "test","es": 1640337532000,"id": 7,"isDdl": false,"mysqlType": {"client_id": "varchar(48)","resource_ids": "varchar(256)","client_secret": "varchar(256)","scope": "varchar(256)","authorized_grant_types": "varchar(256)","web_server_redirect_uri": "varchar(256)","authorities": "varchar(256)","access_token_validity": "int(11)","refresh_token_validity": "int(11)","additional_information": "varchar(4096)","autoapprove": "varchar(256)"},"old": null,"pkNames": ["client_id"],"sql": "","sqlType": {"client_id": 12,"resource_ids": 12,"client_secret": 12,"scope": 12,"authorized_grant_types": 12,"web_server_redirect_uri": 12,"authorities": 12,"access_token_validity": 4,"refresh_token_validity": 4,"additional_information": 12,"autoapprove": 12},"table": "oauth_client_details","ts": 1640337532520,"type": "INSERT"}
每个字段的意思已经很清楚了,有表名称、方法、参数、参数类型、参数值…..
客户端要做的就是监听MQ获取JSON数据,然后将其解析出来,处理自己的业务逻辑。
Canal客户端搭建
客户端很简单实现,要做的就是消费Canal服务端传递过来的消息,监听canal.queue 这个队列。
1、创建消息实体类
MQ传递过来的是JSON数据,当然要创建个实体类接收数据,如下:
/*** Canal消息接收实体类*/@NoArgsConstructor@Datapublic class CanalMessage<T> {@JsonProperty("type")private String type;@JsonProperty("table")private String table;@JsonProperty("data")private List<T> data;@JsonProperty("database")private String database;@JsonProperty("es")private Long es;@JsonProperty("id")private Integer id;@JsonProperty("isDdl")private Boolean isDdl;@JsonProperty("old")private List<T> old;@JsonProperty("pkNames")private List<String> pkNames;@JsonProperty("sql")private String sql;@JsonProperty("ts")private Long ts;}
2、MQ消息监听业务
接下来就是监听队列,一旦有Canal服务端有数据推送能够及时的消费。
代码很简单,只是给出个接收的案例,具体的业务逻辑可以根据业务实现,如下:
import cn.hutool.json.JSONUtil;import cn.myjszl.middle.ware.canal.mq.rabbit.model.CanalMessage;import lombok.RequiredArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/*** 监听MQ获取Canal增量的数据消息*/@Component@Slf4j@RequiredArgsConstructorpublic class CanalRabbitMQListener {@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "canal.queue", durable = "true"),exchange = @Exchange(value = "canal.exchange"),key = "canal.routing.key")})public void handleDataChange(String message) {//将message转换为CanalMessageCanalMessage canalMessage = JSONUtil.toBean(message, CanalMessage.class);String tableName = canalMessage.getTable();log.info("Canal 监听 {} 发生变化;明细:{}", tableName, message);//TODO 业务逻辑自己完善...............}}
3、测试
下面向表中插入数据,看下接收的消息是什么样的,SQL如下:
INSERT INTO `oauth_client_details`VALUES( 'myjszl', 'res1', '$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W', 'all', 'password,refresh_token,authorization_code,client_credentials,implicit', 'http://www.baidu.com', NULL, 1000, 1000, NULL, 'false' );

