同福

Springboot使用Canal监听MySQL的binlog日志实现实时获取数据增、删、改的变化【20220923】

介绍

介绍

之前福哥介绍过一个通过监听MySQL的binglog日志来实现实时获取数据增加、删除、修改的变化的工具php-mysql-replication,这个工具很棒,可惜的是只支持PHP语言。

今天福哥要带着大家学习一个更棒的工具阿里巴巴的Canal项目,Canal分为服务器端和客户端,服务器端是Java实现的,而客户端支持主流编程语言,包括:PHP、Java、Go、。

安装

服务器端

官方下载地址。

https://github.com/alibaba/canal/releases

福哥选择的是比较新的1.1.6版本。

home/topic/2022/0916/17/2ff43055c10fa4bc6915bdf6baad84b2.png

配置

MySQL

binlog

首先要开启MySQL的binlog支持,并且设置binlog的格式为ROW。

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权

建立canal用户,授权replication权限。

grant replication client, replication slave, select on *.* to canal@'%' identified by 'tongfu.net';

home/topic/2022/0922/15/558db1a16a2ca89039875d070944c5d2.png

依赖

添加Canal依赖,福哥选择的是1.1.3版本。

<!-- canal -->
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.3</version>
</dependency>

Service

canalClient

在service目录下面建立canalClient类。

public class canalClient {
    String canalHost;
    int canalPort;
    String canalUser;
    String canalPwd;
    String canalInstance;

    public canalClient(String host, int port, String user, String pwd, String instance){
        canalHost = host;
        canalPort = port;
        canalUser = user;
        canalPwd = pwd;
        canalInstance = instance;
    }

    private void process(List<CanalEntry.Entry> etries)
            throws Exception {
        for(CanalEntry.Entry entry : etries){
            if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){
                continue;
            }
            CanalEntry.RowChange rowChange;
            rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            CanalEntry.EventType eventType = rowChange.getEventType();
            System.out.println(entry.getHeader().getTableName());
            switch (eventType){
                case INSERT:
                    for(CanalEntry.RowData rowData : rowChange.getRowDatasList()){
                        printData(rowData.getBeforeColumnsList());
                        printData(rowData.getAfterColumnsList());
                    }
                    break;
                case UPDATE:
                    for(CanalEntry.RowData rowData : rowChange.getRowDatasList()){
                        printData(rowData.getBeforeColumnsList());
                        printData(rowData.getAfterColumnsList());
                    }
                    break;
                case DELETE:
                    for(CanalEntry.RowData rowData : rowChange.getRowDatasList()){
                        printData(rowData.getBeforeColumnsList());
                        printData(rowData.getAfterColumnsList());
                    }
                    break;
            }
        }
    }

    private void printData(List<CanalEntry.Column> columns){
        for(CanalEntry.Column column : columns){
            System.out.println(String.format("%s: %s", column.getName(), column.getValue()));
        }
    }

    public void start(){
        InetSocketAddress inetSocketAddress = new InetSocketAddress(canalHost, canalPort);
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(inetSocketAddress, canalInstance, canalUser, canalPwd);
        try{
            canalConnector.connect();
            canalConnector.subscribe();
            canalConnector.rollback();
            while(true){
                Message message = canalConnector.getWithoutAck(1000);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if(batchId == -1 || size == 0){

                }else{
                    try{
                        process(message.getEntries());
                    }catch(Exception e){
                        break;
                    }
                }
                canalConnector.ack(batchId);
            }
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            canalConnector.disconnect();
        }
    }
}

TfcanalApplication

修改TfcanalApplication对象,改成命令行模式。

在run方法里面调用canalClient对象,启动客户端。

@SpringBootApplication
public class TfcanalApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(TfcanalApplication.class, args);
    }

    @Override
    public void run(String... args)
            throws Exception {
        canalClient canalClient = new canalClient(
                "127.0.0.1",
                11111,
                "canal",
                "tongfu.net",
                "tfams"
        );

        canalClient.start();
    }
}

启动

启动服务器端

启动cmd,切到canal的bin目录,执行startup.bat。

home/topic/2022/0922/17/c3f94ac7f5b654f3d36bfc5abe4b87cd.png

总结

今天福哥带着大家学习了阿里巴巴的Canal的搭建方法,还学会了通过Springboot搭建Canal客户端,完成了通过Canal监听MySQL的biglog日志实现实时获取数据变化的目的。