一、Canal技术介绍:
1.1 calnal发展介绍:
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
1.2 calnal工作原理:
1.2.1 MySQL主备复制原理
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
1.2.2 canal 工作原理
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)
1.2.3 canal架构说明:
说明:server代表一个canal运行实例,对应于一个jvm
- instance对应于一个数据队列 (1个server对应1..n个instance)
instance模块:
- eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
- eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
- eventStore (数据存储)
- metaManager (增量订阅&消费信息管理器)
二、Linux环境安装Canal:
2.1 环境修改
2.1.1 进入容器,修改my.cnf内容如下(mysql支持binlog):
[mysqld]log-bin=mysql-bin # 开启 binlogbinlog-format=ROW # 选择 ROW 模式server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
提示: 如果mysql安装在docker下,可以执行如下命令,修改上面的配置文件: ① docker exec -it mysql /bin/bash ② vi /etc/mysql/my.cnf ③ 创建mysql容器时,可以使用如下命令: 【备注1:提前创建好相关的目录】
mkdir -p /mydata/mysql3/data /mydata/mysql3/conf /mydata/mysql3/logsdocker run -p 3308:3306 --restart=always -v /mydata/mysql3/conf:/etc/mysql/conf.d -v /mydata/mysql3/logs:/var/log/mysql -v /mydata/mysql3/data:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123 --name mysql -d mysql:5.7
【备注2:在/mydata/mysql3/conf中创建my.cnf文件内容如下:】
[mysql]default-character-set=utf8[mysqld]character-set-server=utf8log-bin=mysql-binbinlog-format=ROWserver_id=1
不要忘记重启mysql:
docker restart mysql3
【备注3:进行mysql内部容器,查看是否binlog日志添加成功】
docker exec -it mysql3 /bin/bashmysql -uroot -p123show variables like '%bin%'
【备注4:进行mysql内部容器,查看是否binlog日志添加成功】
因为starter-canal中使用的client-canal的版本为1.0.25,所以,最好下载canal的版本为1.0.25以下的版酊,否则,会失败
2.1.2 创建用户并授权:
CREATE USER canal IDENTIFIED BY 'canal';GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;FLUSH PRIVILEGES;
说明: 如果不创建用户,也可以使用root用户,密码就是你的mysql密码
2.1.3 下载canal安装包:
wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-{版本号}.tar.gz
说明: 下载地址
2.1.4 解压缩并修改配置文件:
修改canal/conf/canal.properties文件:
修改canal/conf/example/instance.properties文件:


mysql 数据解析关注的表,Perl正则表达式. 多个正则之间以逗号(,)分隔,转义符需要双斜杠(\)
常见例子:
- 所有表:. or .\..
2. canal schema下所有表: canal\..
3. canal下的以canal打头的表:canal\.canal.*
4. canal schema下的一张表:canal\.test1- 多个规则组合使用:canal\..,mysql.test1,mysql.test2 (逗号分隔) [*详细配置参见](https://github.com/alibaba/canal/wiki/AdminGuide)
2.2 启动运行:
2.2.1 执行canal/bin/startup.sh命令:
cd /usr/local/canalbin/startup.sh
2.2.2 查看日志
① canal/logs/canal/canal.log
② canal/logs/example/example.log
备注:下载的canal版本不能高于1.0.19版本
三、【实战】在项目中将广告数据库同步到redis中
3.1 从github下载第三方的canal的spring-boot-starter:
3.2 新建模块canal-demo
3.2.1 在模块中添加依赖:
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--1.引入canal的依赖--><dependency><groupId>com.xpand</groupId><artifactId>starter-canal</artifactId><version>0.0.1-SNAPSHOT</version></dependency></dependencies>
3.2.2 添加application.yml配置文件,
server:port: 7701canal:client:instances:example:host: 192.168.56.10port: 11111spring:application:name: canal
3.2.3 添加监听器:
/*** ------------------------------* 功能:* 作者:WF* 微信:hbxfwf13590332912* 创建时间:2021/8/9-20:46* ------------------------------*/@CanalEventListenerpublic class CanalEventDataListener {/*** 功能: 监听插入事件* 参数:* 返回值: void* 时间: 2021/8/10 13:07*/@InsertListenPointpublic void onEventInsert(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {System.out.println("onEventInsert");System.out.println("------------------------------------------------------------------------------------");rowData.getAfterColumnsList().stream().forEach(c-> System.out.println(c.getName() + "->" + c.getValue()));}/*** 功能: 监听修改事件* 参数:* 返回值: void* 时间: 2021/8/10 13:07*/@UpdateListenPointpublic void onEventUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {System.out.println("onEventUpdate");System.out.println("------------------------------------------------------------------------------------");rowData.getAfterColumnsList().stream().forEach(c-> System.out.println(c.getName() + "->" + c.getValue()));}/*** 功能: 监听删除事件* 参数:* 返回值: void* 时间: 2021/8/10 13:07*/@DeleteListenPointpublic void onEventDelete(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {System.out.println("onEventDelete");System.out.println("------------------------------------------------------------------------------------");rowData.getBeforeColumnsList().stream().forEach(c-> System.out.println( c.getName() + "->" + c.getValue()));}/*** 功能: 自定义监听* 参数:* 返回值: void* 时间: 2021/8/10 13:06*/@ListenPoint(destination = "example",schema = "zeyigoudb",table = {"tb_item"},eventType = {CanalEntry.EventType.UPDATE,CanalEntry.EventType.INSERT,CanalEntry.EventType.DELETE})public void onEventCustomUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {System.out.println("onEventCustomUpdate");System.out.println("------------------------------------------------------------------------------------");rowData.getAfterColumnsList().forEach((c) -> System.out.println( c.getName() + "->" + c.getValue()));}}
3.2.4 添加启动类:
/*** ------------------------------* 功能:* 作者:WF* 微信:hbxfwf13590332912* 创建时间:2021/8/9-20:55* ------------------------------*/@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)@EnableCanalClient //启用canalpublic class CancalDataApplication {public static void main(String[] args) {SpringApplication.run(CancalDataApplication.class);}}
3.3 实现当后台修改、添加、删除广告时,会自动同步到redis中
3.3.1 将原来content-service中的关于增、删、改广告时,删除redis的代码注掉:
/*** 功能: 添加广告* 参数:* 返回值: void* 时间: 2021/8/2 15:00*/@Overridepublic void add(ContentEntity content) {//1. 删除redis中的缓存//redisTemplate.delete("contents");//2. 添加到数据库中this.save(content);}@Overridepublic void update(ContentEntity content) {//1. 删除redis中的缓存//redisTemplate.delete("contents");//2. 调用 修改方法this.updateById(content);}@Overridepublic void delete(List<Long> ids) {//1. 删除redis中的缓存//redisTemplate.delete("contents");//2. 根据id删除广告this.removeByIds(ids);}
3.3.2 在canal-demo模块中,添加自定义的事件处理:
第一步:修改依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--1.引入canal的依赖--><dependency><groupId>com.xpand</groupId><artifactId>starter-canal</artifactId><version>0.0.1-SNAPSHOT</version></dependency><!--2.引入redis--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency></dependencies>
第二步:修改application.yml文件:
spring:redis:host: 192.168.56.10port: 6379
第三步:添加redis的序列化配置:
@Configurationpublic class RedisTemplateSerializerConfig {@Beanpublic RedisTemplate redisTemplate(RedisConnectionFactory factory){RedisTemplate redisTemplate = new RedisTemplate();redisTemplate.setConnectionFactory(factory);//1. 设置key的序列化器redisTemplate.setKeySerializer(new StringRedisSerializer());//2. 设置value的序列化器// redisTemplate.setValueSerializer(new GenericFastJsonRedisSerializer());redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());return redisTemplate;}}
第四步:定义事件:
/*** 功能: 自定义监听(测试)* 参数:* 返回值: void* 时间: 2021/8/10 13:06*/@ListenPoint(destination = "example",schema = "zeyigoudb",table = {"tb_content"},eventType = {CanalEntry.EventType.UPDATE,CanalEntry.EventType.INSERT,CanalEntry.EventType.DELETE})public void onEventCustomUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {//1. 根据事件得到不同的数据//1.1 得到添加或修改事件时的数据列表//List<CanalEntry.Column> columnsListAddOrUpdate = rowData.getAfterColumnsList();//1.2 得到删除时的数据列表//List<CanalEntry.Column> columnsListDelete = rowData.getBeforeColumnsList();System.out.println("正在同步广告列表...");//直接删除redis中的数据redisTemplate.delete("contents");}
第五步:运行效果(略)

