同福

Springboot使用Canal监听MySQL的binlog日志实现实时获取数据增、删、改的变化【20220923】

介绍

介绍

之前福哥介绍过一个通过监听MySQL的binglog日志来实现实时获取数据增加、删除、修改的变化的工具php-mysql-replication,这个工具很棒,可惜的是只支持PHP语言。

今天福哥要带着大家学习一个更棒的工具阿里巴巴的Canal项目,Canal分为服务器端和客户端,服务器端是Java实现的,而客户端支持主流编程语言,包括:PHP、Java、Go、。

安装

服务器端

官方下载地址。

https://github.com/alibaba/canal/releases

福哥选择的是比较新的1.1.6版本。

home/topic/2022/0916/17/2ff43055c10fa4bc6915bdf6baad84b2.png

配置

MySQL

binlog

首先要开启MySQL的binlog支持,并且设置binlog的格式为ROW。

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权

建立canal用户,授权replication权限。

grant replication client, replication slave, select on *.* to canal@'%' identified by 'tongfu.net';

home/topic/2022/0922/17/8bcddfd3fe41cf185ad5e1243aaaa12f.png

服务器端

目录结构

解开压缩包可以得到如下一个目录结构。

home/topic/2022/0916/19/364bd4660dd6aa63bc85c238c8f6bcf7.png

打开conf目录,文件canal.properties是canal的配置文件,文件夹example是example实例的配置文件目录。

home/topic/2022/0916/19/9b8a56de3e6d85f23aa874f7c7fbb4d5.png

福哥复制example目录到tfams目录,然后我们来配置tfams实例。

tfams实例

打开tfams目录下的instance.properties文件。

canal.instance.master.address

设置mysql的主机和端口。

canal.instance.master.address=192.168.168.88:3306

canal.instance.dbUsername

设置mysql的登录名。

canal.instance.dbUsername=canal

canal.instance.dbPassword

设置mysql的密码。

canal.instance.dbPassword=tongfu.net

canal.instance.connectionCharset

设置mysql的连接数据编码。

canal.instance.connectionCharset = UTF-8

客户端

建立项目

建立一个Springboot项目,不需要勾选任何依赖。

home/topic/2022/0922/15/558db1a16a2ca89039875d070944c5d2.png

依赖

添加Canal依赖,福哥选择的是1.1.3版本。

<!-- canal -->
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.3</version>
</dependency>

Service

canalClient

在service目录下面建立canalClient类。

public class canalClient {
    String canalHost;
    int canalPort;
    String canalUser;
    String canalPwd;
    String canalInstance;

    public canalClient(String host, int port, String user, String pwd, String instance){
        canalHost = host;
        canalPort = port;
        canalUser = user;
        canalPwd = pwd;
        canalInstance = instance;
    }

    private void process(List<CanalEntry.Entry> etries)
            throws Exception {
        for(CanalEntry.Entry entry : etries){
            if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){
                continue;
            }
            CanalEntry.RowChange rowChange;
            rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            CanalEntry.EventType eventType = rowChange.getEventType();
            System.out.println(entry.getHeader().getTableName());
            switch (eventType){
                case INSERT:
                    for(CanalEntry.RowData rowData : rowChange.getRowDatasList()){
                        printData(rowData.getBeforeColumnsList());
                        printData(rowData.getAfterColumnsList());
                    }
                    break;
                case UPDATE:
                    for(CanalEntry.RowData rowData : rowChange.getRowDatasList()){
                        printData(rowData.getBeforeColumnsList());
                        printData(rowData.getAfterColumnsList());
                    }
                    break;
                case DELETE:
                    for(CanalEntry.RowData rowData : rowChange.getRowDatasList()){
                        printData(rowData.getBeforeColumnsList());
                        printData(rowData.getAfterColumnsList());
                    }
                    break;
            }
        }
    }

    private void printData(List<CanalEntry.Column> columns){
        for(CanalEntry.Column column : columns){
            System.out.println(String.format("%s: %s", column.getName(), column.getValue()));
        }
    }

    public void start(){
        InetSocketAddress inetSocketAddress = new InetSocketAddress(canalHost, canalPort);
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(inetSocketAddress, canalInstance, canalUser, canalPwd);
        try{
            canalConnector.connect();
            canalConnector.subscribe();
            canalConnector.rollback();
            while(true){
                Message message = canalConnector.getWithoutAck(1000);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if(batchId == -1 || size == 0){

                }else{
                    try{
                        process(message.getEntries());
                    }catch(Exception e){
                        break;
                    }
                }
                canalConnector.ack(batchId);
            }
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            canalConnector.disconnect();
        }
    }
}

TfcanalApplication

修改TfcanalApplication对象,改成命令行模式。

在run方法里面调用canalClient对象,启动客户端。

@SpringBootApplication
public class TfcanalApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(TfcanalApplication.class, args);
    }

    @Override
    public void run(String... args)
            throws Exception {
        canalClient canalClient = new canalClient(
                "127.0.0.1",
                11111,
                "canal",
                "tongfu.net",
                "tfams"
        );

        canalClient.start();
    }
}

启动

启动服务器端

启动cmd,切到canal的bin目录,执行startup.bat。

home/topic/2022/0922/17/b3aa4db2a55015713805c3a329e2dfa5.png

启动客户端

福哥是直接通过IDEA启动的。

home/topic/2022/0922/17/eefe64a53484facc2f75544d3d415380.png

测试

修改mysql数据

福哥更新了用户“鬼谷子叔叔”的密码。

home/topic/2022/0922/17/9000e39a3cf44feaf8bdf460d3018b3d.png

home/topic/2022/0922/17/23ab7822b03fb05a936bbb82ad8866b4.png

查看客户端

福哥在IDEA的输出框里看到了刚刚在mysql里更新的数据的变化,包括更改前的状态和更改后的状态。

home/topic/2022/0922/17/c3f94ac7f5b654f3d36bfc5abe4b87cd.png

总结

今天福哥带着大家学习了阿里巴巴的Canal的搭建方法,还学会了通过Springboot搭建Canal客户端,完成了通过Canal监听MySQL的biglog日志实现实时获取数据变化的目的。