前置条件
```yaml## mysql serverIdcanal.instance.mysql.slaveId = 1234#position info,需要改成自己的数据库信息canal.instance.master.address = 127.0.0.1:3306canal.instance.master.journal.name =canal.instance.master.position =canal.instance.master.timestamp =#username/password,需要改成自己的数据库信息canal.instance.dbUsername = canalcanal.instance.dbPassword = canalcanal.instance.defaultDatabaseName =canal.instance.connectionCharset = UTF-8#table regexcanal.instance.filter.regex = .\*\\\\..\*
整合RocketMQ的QuickStart
instance.properties配置
example/instance.properties文件配置 ```bashsalveId
canal.instance.mysql.slaveId=1234指定数据库地址
canal.instance.master.address=127.0.0.1:3306
数据库用户名密码
canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8
enable druid Decrypt database password
canal.instance.enableDruid=false
table regex 监听哪些表和库
canal.instance.filter.regex=.\..
table black regex 屏蔽哪些库和表
canal.instance.filter.black.regex=mysql\.slave_.*
发送mq到哪个Topic
canal.mq.topic=example canal.mq.partition=0
<a name="wykEW"></a>## canal.properties配置1. `canal.properties`修改内容展示```bash# tcp, kafka, rocketMQ, rabbitMQ 修改接收方位RocketMQcanal.serverMode = rocketMQ...省略..########################################################### RocketMQ ################################################################指定生产者组名称rocketmq.producer.group = testrocketmq.enable.message.trace = falserocketmq.customized.trace.topic =rocketmq.namespace =#指定rocketMq的naverserverrocketmq.namesrv.addr = 192.168.2.1:9876rocketmq.retry.times.when.send.failed = 0rocketmq.vip.channel.enabled = false#设置标签rocketmq.tag = gx
成果展示
动态Topic
前面的配置中我们将所有收集到的binlog日志都发送到了example主题上。如果我们希望区分他们,则可以通过配置动态主题来解决(配置example下的instance.properties文件)
动态主题的配置方式就是,指定哪个库哪个表,canal会自动发送到库名_表名的主题上。
例1
mytest.user:数据库mytest1中的数据变化都会发送到主题为 mytest_user中
例2
mytest2\..*:数据库mytest2中的表变化会发送到主题 mytest2_${表名}中
canal.instance.mysql.slaveId=1234# 指定数据库地址canal.instance.master.address=127.0.0.1:3306# 数据库用户名密码canal.instance.dbUsername=canalcanal.instance.dbPassword=canalcanal.instance.connectionCharset = UTF-8# enable druid Decrypt database passwordcanal.instance.enableDruid=false# table regex 监听哪些表和库canal.instance.filter.regex=.*\\..*# table black regex 屏蔽哪些库和表canal.instance.filter.black.regex=mysql\\.slave_.*# 发送mq到哪个Topiccanal.mq.topic=examplecanal.mq.dynamicTopic=mytest.user,mytest2\\..*canal.mq.partition=0
- 若符合
canal.mq.dynamicTopic配置项中指定的库和表,则对应发送到${库名}_${表名}的topic中 - 若没有符合
canal.mq.dynamicTopic配置项中指定的库和表,则还是到exampleTopic中

