canal是阿里巴巴mysql数据库binlog的增量订阅&消费组件 。阿里云DRDS、阿里巴巴TDDL 二级索引、小表复制powerd by canal.

工作原理

canal-1.png

原理相对比较简单:

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)

快速开始

mysql配置

由于canal的原理是基于mysql binlog技术,所以mysql 必须开启 binlog 写入功能。建议 binlog 模式为 row。

查看是否已开启binlog

show variables like 'log_%';
show variables like 'bin%';

配置:

[mysqld]
log-bin=mysql-bin
binlog-format=row
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

如果是从从同步,那需要加上配置项:

[mysqld]
log_slave_updates=1 

canal 的原理是模拟自己为mysql slave,所以这里一定要做 mysql slave 的相关权限。

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, SHOW VIEW, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; 需要具有SHOW VIEW 权限

关于 MySql 主从同步可参考:MySQL主从同步配置

启动步骤

下载与安装

直接访问地址https://github.com/alibaba/canal/releases,列出所有的下载包,此处使用v1.1.2作为示例:

wget https://github.com/alibaba/canal/releases/download/canal-1.1.2/canal.deployer-1.1.2.tar.gz

解压:

mkdir /tmp/canal
tar zxvf canal.deployer-1.1.2.tar.gz -C /tmp/canal

配置修改

应用参数:

vim conf/example/instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\..
#binlog 下的所有表
# binlog\\..* 
#binlog 下指定表
# binlog\\.cal

kafka 配置:

# mq config
canal.mq.topic=canal.example
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=10
#canal.mq.partitionHash=.*\\..*
#散列规则定义 库名.表名 : 唯一主键,比如mytest.person: id 1.1.3版本支持新语法,见下文
#使用 .*\\..*时,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题)

说明:

启动

sh bin/startup.sh

查看日志

vim logs/canal/canal.log

实例日志:

vim logs/example/example.log

关闭服务

sh bin/stop.sh

适配器

文档:客户端适配器

canal 1.1.1版本之后, 增加客户端数据落地的适配及启动功能, 目前支持功能:

  • 客户端启动器同步管理REST接口
  • 日志适配器, 作为DEMO
  • 关系型数据库的数据同步(表对表同步), ETL功能
  • HBase的数据同步(表对表同步), ETL功能(后续支持)
  • ElasticSearch多表数据同步,ETL功能

与 Spring Boot 结合使用

项目准备

引用 canal

dependencies {
    // https://mvnrepository.com/artifact/com.alibaba.otter/canal.client
    compile('com.alibaba.otter:canal.client:1.1.2')
}

canal 客户端:

/**
 * @author tangfan 2018/12/17 14:22
 */
@Component
public class CanalClient {
    private static final Logger logger = LoggerFactory.getLogger(CanalClient.class);

    private CanalService canalService;
    private DatabaseProperties.CanalConfig canalConfig;
    private CanalConnector canalConnector;
    private boolean started;

    @Autowired
    public CanalClient(CanalService canalService, DatabaseProperties databaseProperties) {
        this.canalService = canalService;
        this.canalConfig = databaseProperties.getCanal();
    }

    @PostConstruct
    public void start() {
        if (canalConfig != null && StringUtils.isNotEmpty(canalConfig.getIp())) {
            // 创建链接
            this.canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.getIp(), canalConfig.getPort()), canalConfig.getDestination(), canalConfig.getUsername(), canalConfig.getPassword());
            logger.info("同步任务初始化完成。Canal 配置:{}", canalConfig);
            if (!this.started) {
                canalConnector.connect();
                canalConnector.subscribe(this.canalConfig.getSubscribe());
                canalConnector.rollback();
                this.started = true;
            }
        }
    }

    @PreDestroy
    public void stop() {
        if (this.started) {
            try {
                canalConnector.disconnect();
            } catch (CanalClientException exp) {
                logger.warn("canal 客户端停止异常", exp);
            }
            this.started = false;
        }
    }

    /**
     * 执行一次
     */
    public void handle() {
        int batchSize = canalConfig.getBatchSize();
        if (batchSize < 0 || batchSize >= 9999) {
            throw new CleanServiceException(ExceptionCode.ARGUMENT_ERROR, "同步获取批次参数设置超过界限");
        }
        if (this.canalConfig.getInterval() == null || this.canalConfig.getInterval().toMillis() <= 0) {
            throw new CleanServiceException(ExceptionCode.ARGUMENT_ERROR, "同步获取批次参数设置时间间隔超过界限");
        }
        long batchId;
        Message message = null;
        try {
            message = canalConnector.getWithoutAck(batchSize);
        } catch (Exception exp) {
            logger.error("获取canal数据失败", exp);
        }
        if (message != null) {
            batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId != -1 && size != 0) {
                try {
                    saveEntry(batchId, message.getEntries());
                    // 提交确认
                    canalConnector.ack(batchId);
                } catch (Exception exp) {
                    logger.error("canal service 处理失败,batchId:{}", batchId, exp);
                    // 处理失败, 回滚数据
                    if (batchId > 0) {
                        canalConnector.rollback(batchId);
                        logger.info("回滚批次:{},共计:{}项。", batchId, size);
                    }
                }
            }
        }
    }

    /**
     * 保存同步实体
     *
     * @param entrys
     */
    private void saveEntry(long batchId, List<CanalEntry.Entry> entrys) {
        if (entrys != null) {
            logger.info("批次:{}, 确认数量:{}项", batchId, entrys.size());
            for (CanalEntry.Entry entry : entrys) {
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    continue;
                }
                CanalEntry.RowChange rowChage = null;
                try {
                    rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                            e);
                }
                if (rowChage != null) {
                    CanalEntry.EventType eventType = rowChage.getEventType();
                    if (rowChage.getRowDatasList() != null) {
                        for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            String schemaName = entry.getHeader().getSchemaName();
                            String tableName = entry.getHeader().getTableName();
                            switch (eventType) {
                                case INSERT:
                                    canalService.insert(afterColumnsList, schemaName, tableName);
                                    break;
                                case UPDATE:
                                    canalService.update(afterColumnsList, schemaName, tableName);
                                    break;
                                case DELETE:
                                    canalService.delete(afterColumnsList, schemaName, tableName);
                                    break;
                                default:
                                    logger.info("未知操作类型:{}", eventType);
                                    break;
                            }
                        }
                    }
                }
            }
        }
    }
}

与 Kafka 结合使用

标签: 大数据

添加新评论