¶前言
最近在做公司大数据相关业务,需要在不侵入业务的情况下完成数据的同步,调研了一些框架考虑到后续的业务扩展以及灵活性最终决定选用中间件canal
¶canal 简介
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
¶应用场景
基于日志增量订阅和消费的业务包括:
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
- 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
¶工作原理
¶MySQL主备复制原理
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
¶canal 工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
¶多语言
canal 特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑。
¶准备工作
需要 pull 如下 Docker 镜像:
- mysql:8.0
- canal/canal-server:v1.1.4
- lensesio/fast-data-dev:latest
具体版本可以根据实际需求调整
|
|
¶启动MySQL
|
|
需要修改配置,开启 binlog
|
|
在 [mysqld] 增加如下配置
|
|
重启容器
|
|
¶启动 Kafka
此镜像用户快速部署 Kafka 开发环境,其中包括Web管理页面
|
|
启动之后访问 http://127.0.0.1:3040 即可,如下图所示:
¶启动 canal
|
|
参数解释:
|
|
动态 topic 规则:
canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多个配置之间使用逗号或分号分隔
- 例子1:test\.test 指定匹配的单表,发送到以test_test为名字的topic上
- 例子2:.\… 匹配所有表,则每个表都会发送到各自表名的topic上
- 例子3:test 指定匹配对应的库,一个库的所有表都会发送到库名的topic上
- 例子4:test\.* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上
- 例子5:test,test1\.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1\.test1的表发送到对应的test1_test1 topic上,其余的表发送到默认的canal.mq.topic值
为满足更大的灵活性,允许对匹配条件的规则指定发送的topic名字,配置格式:topicName:schema 或 topicName:schema.table
- 例子1: test:test\.test 指定匹配的单表,发送到以test为名字的topic上
- 例子2: test:.\… 匹配所有表,因为有指定topic,则每个表都会发送到test的topic下
- 例子3: test:test 指定匹配对应的库,一个库的所有表都会发送到test的topic下
- 例子4:testA:test\.* 指定匹配的表达式,针对匹配的表会发送到testA的topic下
- 例子5:test0:test,test1:test1\.test1,指定多个表达式,会将test库的表都发送到test0的topic下,test1\.test1的表发送到对应的test1的topic下,其余的表发送到默认的canal.mq.topic值
大家可以结合自己的业务需求,设置匹配规则,建议MQ开启自动创建topic的能力
启动之后执行如下命令查看日志:
|
|
看到如下内容则启动成功:
|
|
之后你就可以修改Mysql中的数据,在 Kafka web页面中查看数据变更了,举个例子:
|
|
¶canal 的 Tcp 模式 Java-client 简单实现
在启动 canal 的时候指定 serverMode 为 tcp 模式 [详见](启动 canal),顺利启动完开始编写代码:
引入如下以来,注意要你的 canal 版本保持一致
|
|
|
|
¶canal 的 Tcp 模式 go 简单实现
安装依赖:
|
|
|
|