介绍
介绍
之前福哥介绍过一个通过监听MySQL的binglog日志来实现实时获取数据增加、删除、修改的变化的工具php-mysql-replication,这个工具很棒,可惜的是只支持PHP语言。
今天福哥要带着大家学习一个更棒的工具阿里巴巴的Canal项目,Canal分为服务器端和客户端,服务器端是Java实现的,而客户端支持主流编程语言,包括:PHP、Java、Go、。
安装
服务器端
官方下载地址。
https://github.com/alibaba/canal/releases
福哥选择的是比较新的1.1.6版本。
配置
MySQL
binlog
首先要开启MySQL的binlog支持,并且设置binlog的格式为ROW。
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
授权
建立canal用户,授权replication权限。
grant replication client, replication slave, select on *.* to canal@'%' identified by 'tongfu.net';
服务器端
目录结构
解开压缩包可以得到如下一个目录结构。
打开conf目录,文件canal.properties是canal的配置文件,文件夹example是example实例的配置文件目录。
福哥复制example目录到tfams目录,然后我们来配置tfams实例。
tfams实例
打开tfams目录下的instance.properties文件。
canal.instance.master.address
设置mysql的主机和端口。
canal.instance.master.address=192.168.168.88:3306
canal.instance.dbUsername
设置mysql的登录名。
canal.instance.dbUsername=canal
canal.instance.dbPassword
设置mysql的密码。
canal.instance.dbPassword=tongfu.net
canal.instance.connectionCharset
设置mysql的连接数据编码。
canal.instance.connectionCharset = UTF-8
客户端
建立项目
建立一个Springboot项目,不需要勾选任何依赖。
依赖
添加Canal依赖,福哥选择的是1.1.3版本。
<!-- canal --> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.3</version> </dependency>
Service
canalClient
在service目录下面建立canalClient类。
public class canalClient { String canalHost; int canalPort; String canalUser; String canalPwd; String canalInstance; public canalClient(String host, int port, String user, String pwd, String instance){ canalHost = host; canalPort = port; canalUser = user; canalPwd = pwd; canalInstance = instance; } private void process(List<CanalEntry.Entry> etries) throws Exception { for(CanalEntry.Entry entry : etries){ if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){ continue; } CanalEntry.RowChange rowChange; rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); CanalEntry.EventType eventType = rowChange.getEventType(); System.out.println(entry.getHeader().getTableName()); switch (eventType){ case INSERT: for(CanalEntry.RowData rowData : rowChange.getRowDatasList()){ printData(rowData.getBeforeColumnsList()); printData(rowData.getAfterColumnsList()); } break; case UPDATE: for(CanalEntry.RowData rowData : rowChange.getRowDatasList()){ printData(rowData.getBeforeColumnsList()); printData(rowData.getAfterColumnsList()); } break; case DELETE: for(CanalEntry.RowData rowData : rowChange.getRowDatasList()){ printData(rowData.getBeforeColumnsList()); printData(rowData.getAfterColumnsList()); } break; } } } private void printData(List<CanalEntry.Column> columns){ for(CanalEntry.Column column : columns){ System.out.println(String.format("%s: %s", column.getName(), column.getValue())); } } public void start(){ InetSocketAddress inetSocketAddress = new InetSocketAddress(canalHost, canalPort); CanalConnector canalConnector = CanalConnectors.newSingleConnector(inetSocketAddress, canalInstance, canalUser, canalPwd); try{ canalConnector.connect(); canalConnector.subscribe(); canalConnector.rollback(); while(true){ Message message = canalConnector.getWithoutAck(1000); long batchId = message.getId(); int size = message.getEntries().size(); if(batchId == -1 || size == 0){ }else{ try{ process(message.getEntries()); }catch(Exception e){ break; } } canalConnector.ack(batchId); } }catch(Exception e){ e.printStackTrace(); }finally{ canalConnector.disconnect(); } } }
TfcanalApplication
修改TfcanalApplication对象,改成命令行模式。
在run方法里面调用canalClient对象,启动客户端。
@SpringBootApplication public class TfcanalApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(TfcanalApplication.class, args); } @Override public void run(String... args) throws Exception { canalClient canalClient = new canalClient( "127.0.0.1", 11111, "canal", "tongfu.net", "tfams" ); canalClient.start(); } }
启动
启动服务器端
启动cmd,切到canal的bin目录,执行startup.bat。
启动客户端
福哥是直接通过IDEA启动的。
测试
修改mysql数据
福哥更新了用户“鬼谷子叔叔”的密码。
查看客户端
福哥在IDEA的输出框里看到了刚刚在mysql里更新的数据的变化,包括更改前的状态和更改后的状态。
总结
今天福哥带着大家学习了阿里巴巴的Canal的搭建方法,还学会了通过Springboot搭建Canal客户端,完成了通过Canal监听MySQL的biglog日志实现实时获取数据变化的目的。