记录生活、记录历史

Canal 安装与使用

2019.12.28

canal是阿里巴巴mysql数据库binlog的增量订阅&消费组件 。阿里云DRDS、阿里巴巴TDDL 二级索引、小表复制powerd by canal.

工作原理

原理相对比较简单:

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)

快速开始

mysql配置

由于canal的原理是基于mysql binlog技术,所以mysql 必须开启 binlog 写入功能。建议 binlog 模式为 row。

查看是否已开启binlog

1
2
show variables like 'log_%';
show variables like 'bin%';

配置:

1
2
3
4
[mysqld]
log-bin=mysql-bin
binlog-format=row
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

如果是从从同步,那需要加上配置项:

1
2
[mysqld]
log_slave_updates=

canal 的原理是模拟自己为mysql slave,所以这里一定要做 mysql slave 的相关权限。

1
2
3
4
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, SHOW VIEW, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; 需要具有SHOW VIEW 权限

关于 MySql 主从同步可参考:MySQL主从同步配置

启动步骤

下载与安装

直接访问地址https://github.com/alibaba/canal/releases,列出所有的下载包,此处使用v1.1.2作为示例:

1
wget https://github.com/alibaba/canal/releases/download/canal-1.1.2/canal.deployer-1.1.2.tar.gz

解压:

1
2
mkdir /tmp/canal
tar zxvf canal.deployer-1.1.2.tar.gz -C /tmp/canal

配置修改

应用参数:

1
vim conf/example/instance.properties
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\..
#binlog 下的所有表
# binlog\\..* 
#binlog 下指定表
# binlog\\.cal

kafka 配置:

1
2
3
4
5
6
7
8
# mq config
canal.mq.topic=canal.example
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=10
#canal.mq.partitionHash=.*\\..*
#散列规则定义 库名.表名 : 唯一主键,比如mytest.person: id 1.1.3版本支持新语法,见下文
#使用 .*\\..*时,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题)

说明:

启动

1
sh bin/startup.sh

查看日志

1
vim logs/canal/canal.log

实例日志:

1
vim logs/example/example.log

关闭服务

1
sh bin/stop.sh

适配器

文档:客户端适配器

canal 1.1.1版本之后, 增加客户端数据落地的适配及启动功能, 目前支持功能:

  • 客户端启动器同步管理REST接口
  • 日志适配器, 作为DEMO
  • 关系型数据库的数据同步(表对表同步), ETL功能
  • HBase的数据同步(表对表同步), ETL功能(后续支持)
  • ElasticSearch多表数据同步,ETL功能

与 Spring Boot 结合使用

项目准备

引用 canal

1
2
3
4
dependencies {
    // https://mvnrepository.com/artifact/com.alibaba.otter/canal.client
    compile('com.alibaba.otter:canal.client:1.1.2')
}

canal 客户端:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/**
 * @author tangfan 2018/12/17 14:22
 */
@Component
public class CanalClient {
    private static final Logger logger = LoggerFactory.getLogger(CanalClient.class);

    private CanalService canalService;
    private DatabaseProperties.CanalConfig canalConfig;
    private CanalConnector canalConnector;
    private boolean started;

    @Autowired
    public CanalClient(CanalService canalService, DatabaseProperties databaseProperties) {
        this.canalService = canalService;
        this.canalConfig = databaseProperties.getCanal();
    }

    @PostConstruct
    public void start() {
        if (canalConfig != null && StringUtils.isNotEmpty(canalConfig.getIp())) {
            // 创建链接
            this.canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.getIp(), canalConfig.getPort()), canalConfig.getDestination(), canalConfig.getUsername(), canalConfig.getPassword());
            logger.info("同步任务初始化完成。Canal 配置:{}", canalConfig);
            if (!this.started) {
                canalConnector.connect();
                canalConnector.subscribe(this.canalConfig.getSubscribe());
                canalConnector.rollback();
                this.started = true;
            }
        }
    }

    @PreDestroy
    public void stop() {
        if (this.started) {
            try {
                canalConnector.disconnect();
            } catch (CanalClientException exp) {
                logger.warn("canal 客户端停止异常", exp);
            }
            this.started = false;
        }
    }

    /**
     * 执行一次
     */
    public void handle() {
        int batchSize = canalConfig.getBatchSize();
        if (batchSize < 0 || batchSize >= 9999) {
            throw new CleanServiceException(ExceptionCode.ARGUMENT_ERROR, "同步获取批次参数设置超过界限");
        }
        if (this.canalConfig.getInterval() == null || this.canalConfig.getInterval().toMillis() <= 0) {
            throw new CleanServiceException(ExceptionCode.ARGUMENT_ERROR, "同步获取批次参数设置时间间隔超过界限");
        }
        long batchId;
        Message message = null;
        try {
            message = canalConnector.getWithoutAck(batchSize);
        } catch (Exception exp) {
            logger.error("获取canal数据失败", exp);
        }
        if (message != null) {
            batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId != -1 && size != 0) {
                try {
                    saveEntry(batchId, message.getEntries());
                    // 提交确认
                    canalConnector.ack(batchId);
                } catch (Exception exp) {
                    logger.error("canal service 处理失败,batchId:{}", batchId, exp);
                    // 处理失败, 回滚数据
                    if (batchId > 0) {
                        canalConnector.rollback(batchId);
                        logger.info("回滚批次:{},共计:{}项。", batchId, size);
                    }
                }
            }
        }
    }

    /**
     * 保存同步实体
     *
     * @param entrys
     */
    private void saveEntry(long batchId, List<CanalEntry.Entry> entrys) {
        if (entrys != null) {
            logger.info("批次:{}, 确认数量:{}项", batchId, entrys.size());
            for (CanalEntry.Entry entry : entrys) {
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    continue;
                }
                CanalEntry.RowChange rowChage = null;
                try {
                    rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                            e);
                }
                if (rowChage != null) {
                    CanalEntry.EventType eventType = rowChage.getEventType();
                    if (rowChage.getRowDatasList() != null) {
                        for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            String schemaName = entry.getHeader().getSchemaName();
                            String tableName = entry.getHeader().getTableName();
                            switch (eventType) {
                                case INSERT:
                                    canalService.insert(afterColumnsList, schemaName, tableName);
                                    break;
                                case UPDATE:
                                    canalService.update(afterColumnsList, schemaName, tableName);
                                    break;
                                case DELETE:
                                    canalService.delete(afterColumnsList, schemaName, tableName);
                                    break;
                                default:
                                    logger.info("未知操作类型:{}", eventType);
                                    break;
                            }
                        }
                    }
                }
            }
        }
    }
}

与 Kafka 结合使用