Canal实现MySQL实时增量数据传输

前言

最近在做公司大数据相关业务,需要在不侵入业务的情况下完成数据的同步,调研了一些框架考虑到后续的业务扩展以及灵活性最终决定选用中间件canal

canal 简介

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

应用场景

基于日志增量订阅和消费的业务包括:

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理
  • 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

工作原理

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

多语言

canal 特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑。

准备工作

需要 pull 如下 Docker 镜像:

  1. mysql:8.0
  2. canal/canal-server:v1.1.4
  3. lensesio/fast-data-dev:latest

具体版本可以根据实际需求调整

1
2
3
$ docker pull mysql:8.0
$ docker pull canal/canal-server:v1.1.4
$ docker pull lensesio/fast-data-dev:latest

启动MySQL

1
2
3
4
$ docker run -d \
-v /data/mysqldata:/var/lib/mysql \
-p 3306:3306 \
-e MYSQL_ROOT_PASSWORD=1234567890 mysql:8.0

需要修改配置,开启 binlog

1
2
$ docker exec -id 容器id /bin/bash
$ vim /etc/mysql/my.cnf

在 [mysqld] 增加如下配置

1
2
3
4
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1

重启容器

1
$ docker restart 容器id

启动 Kafka

此镜像用户快速部署 Kafka 开发环境,其中包括Web管理页面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ docker run -d \
-p 3181:3181 \
-p 3040:3040 \
-p 7081:7081 \
-p 7082:7082 \
-p 7083:7083 \
-p 7092:7092 \
-e ZK_PORT=3181 \
-e WEB_PORT=3040 \
-e REGISTRY_PORT=8081 \
-e REST_PORT=7082 \
-e CONNECT_PORT=7083 \
-e BROKER_PORT=7092 \
-e ADV_HOST=127.0.0.1 lensesio/fast-data-dev

启动之后访问 http://127.0.0.1:3040 即可,如下图所示:

启动 canal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ git clone https://github.com/alibaba/canal --depth=1
$ cd ./canal/docker
$ sh run.sh -e canal.auto.scan=false \
-e canal.serverMode=kafka \
-e canal.mq.servers=127.0.0.1:7092 \
-e canal.mq.canalBatchSize=1 \
-e canal.mq.dynamicTopic=.*\\..* \
-e canal.destinations=test \
-e canal.instance.master.address=127.0.0.1:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false

参数解释:

1
2
3
4
5
6
7
8
9
10
// cannal 模式 Tcp, Kafka, RocketMQ
-e canal.serverMode=kafka
// MQ 的地址
-e canal.mq.servers=127.0.0.1:7092
// 获取 canal 数据的批次大小(一次推送到 Kafka 中的数据)
-e canal.mq.canalBatchSize=1
// MQ 里的动态 topic 规则
-e canal.mq.dynamicTopic=.*\\..*
// eg
-e canal.mq.dynamicTopic=topic_name:databases_name.table_name

动态 topic 规则:

canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多个配置之间使用逗号或分号分隔

  • 例子1:test\.test 指定匹配的单表,发送到以test_test为名字的topic上
  • 例子2:.\… 匹配所有表,则每个表都会发送到各自表名的topic上
  • 例子3:test 指定匹配对应的库,一个库的所有表都会发送到库名的topic上
  • 例子4:test\.* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上
  • 例子5:test,test1\.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1\.test1的表发送到对应的test1_test1 topic上,其余的表发送到默认的canal.mq.topic值

为满足更大的灵活性,允许对匹配条件的规则指定发送的topic名字,配置格式:topicName:schema 或 topicName:schema.table

  • 例子1: test:test\.test 指定匹配的单表,发送到以test为名字的topic上
  • 例子2: test:.\… 匹配所有表,因为有指定topic,则每个表都会发送到test的topic下
  • 例子3: test:test 指定匹配对应的库,一个库的所有表都会发送到test的topic下
  • 例子4:testA:test\.* 指定匹配的表达式,针对匹配的表会发送到testA的topic下
  • 例子5:test0:test,test1:test1\.test1,指定多个表达式,会将test库的表都发送到test0的topic下,test1\.test1的表发送到对应的test1的topic下,其余的表发送到默认的canal.mq.topic值

大家可以结合自己的业务需求,设置匹配规则,建议MQ开启自动创建topic的能力

启动之后执行如下命令查看日志:

1
$ docker logs -f 容器id

看到如下内容则启动成功:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
DOCKER_DEPLOY_TYPE=VM
==> INIT /alidata/init/02init-sshd.sh
==> EXIT CODE: 0
==> INIT /alidata/init/fix-hosts.py
==> EXIT CODE: 0
==> INIT DEFAULT
Starting sshd: [ OK ]
Starting crond: [ OK ]
==> INIT DONE
==> RUN /home/admin/app.sh
==> START ...
start canal ...
start canal successful
==> START SUCCESSFUL ...

之后你就可以修改Mysql中的数据,在 Kafka web页面中查看数据变更了,举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
[
{
"topic": "test_database.oss", // topic
"key": null,
"value":
{
"data": [ // 变更后的数据
{
"id": "112398380415582208",
"file_name": "test.png",
"file_key": "7ca8dfe915684afdad8b2229aa42d55b",
"suffix": ".png",
"url": "http://test.com/oss-api/file/7ca8dfe915684afdad8b2229aa42d55b",
"content_type": "image/png",
"expires": "2029-11-08 08:51:02",
"create_time": "2019-11-07 16:51:02",
"create_by": "-1",
"create_by_name": "-1",
"file_length": "5666",
"bucket_name": "png"
}],
"database": "test_database", // 数据库名字
"es": 1592550488000,
"id": 2,
"isDdl": false,
"mysqlType":
{
"id": "bigint(20)",
"file_name": "varchar(255)",
"file_key": "varchar(255)",
"suffix": "varchar(32)",
"url": "varchar(255)",
"content_type": "varchar(255)",
"expires": "timestamp",
"create_time": "datetime",
"create_by": "bigint(255)",
"create_by_name": "varchar(32)",
"file_length": "bigint(20)",
"bucket_name": "varchar(255)"
},
"old": [ // 修改前的字段值
{
"file_name": "1.png"
}],
"pkNames": ["id"],
"sql": "",
"sqlType":
{
"id": -5,
"file_name": 12,
"file_key": 12,
"suffix": 12,
"url": 12,
"content_type": 12,
"expires": 93,
"create_time": 93,
"create_by": -5,
"create_by_name": 12,
"file_length": -5,
"bucket_name": 12
},
"table": "oss",
"ts": 1592550488571,
"type": "UPDATE" // 修改操作 INSERT DELETE UPDATE
},
"partition": 0,
"offset": 0,
"$$hashKey": "object:743"
}
]

canal 的 Tcp 模式 Java-client 简单实现

在启动 canal 的时候指定 serverMode 为 tcp 模式 [详见](启动 canal),顺利启动完开始编写代码:

引入如下以来,注意要你的 canal 版本保持一致

1
2
3
4
5
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import cn.hutool.core.lang.Console;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
public class SimpleCanalClientExample {
public static void main(String args[]) {
// 创建链接 注意 test 对应 canal.destinations=test 参数配置
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "test", "root", "1234567890");
int batchSize = 1;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = Integer.MAX_VALUE;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
//System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
Console.log("闲置太久了,关闭监听");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
EventType eventType = rowChage.getEventType();
Console.log("binlog:[{}:{}], name:[{},{}], eventType:{}",
entry.getHeader().getLogfileName(),
entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(),
entry.getHeader().getTableName(), eventType);
Console.log("");
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printBeforeColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printBeforeColumn(rowData.getAfterColumnsList());
} else {
Console.log(">>>> before:");
printBeforeColumn(rowData.getBeforeColumnsList());
Console.log(">>>> after:");
printAfterColumn(rowData.getAfterColumnsList());
Console.log("------------------------------------------------------------------------");
Console.log("");
Console.log("");
}
}
}
}
private static void printBeforeColumn(List<Column> columns) {
for (Column column : columns) {
Console.log("{} : {}", column.getName(), column.getValue());
}
Console.log("");
}
private static void printAfterColumn(List<Column> columns) {
for (Column column : columns) {
Console.log("{} : {} > 是否更新:{}", column.getName(), column.getValue(), column.getUpdated());
}
Console.log("");
}
}

canal 的 Tcp 模式 go 简单实现

安装依赖:

1
2
3
$ git clone https://github.com/withlin/canal-go.git
$ export GO111MODULE=on
$ go mod vendor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
connector := client.NewSimpleCanalConnector("192.168.199.17", 11111, "", "", "example", 60000, 60*60*1000)
err :=connector.Connect()
if err != nil {
log.Println(err)
os.Exit(1)
}
err = connector.Subscribe(".*\\..*")
if err != nil {
log.Println(err)
os.Exit(1)
}
for {
message,err := connector.Get(100, nil, nil)
if err != nil {
log.Println(err)
os.Exit(1)
}
batchId := message.Id
if batchId == -1 || len(message.Entries) <= 0 {
time.Sleep(300 * time.Millisecond)
fmt.Println("===没有数据了===")
continue
}
printEntry(message.Entries)
}
----本文结束 感谢您的阅读----