介绍
介绍
之前福哥介绍过一个通过监听MySQL的binglog日志来实现实时获取数据增加、删除、修改的变化的工具php-mysql-replication,这个工具很棒,可惜的是只支持PHP语言。
今天福哥要带着大家学习一个更棒的工具阿里巴巴的Canal项目,Canal分为服务器端和客户端,服务器端是Java实现的,而客户端支持主流编程语言,包括:PHP、Java、Go、。
安装
服务器端
官方下载地址。
https://github.com/alibaba/canal/releases
福哥选择的是比较新的1.1.6版本。
配置
MySQL
binlog
首先要开启MySQL的binlog支持,并且设置binlog的格式为ROW。
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
授权
建立canal用户,授权replication权限。
grant replication client, replication slave, select on *.* to canal@'%' identified by 'tongfu.net';
依赖
添加Canal依赖,福哥选择的是1.1.3版本。
<!-- canal --> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.3</version> </dependency>
Service
canalClient
在service目录下面建立canalClient类。
public class canalClient { String canalHost; int canalPort; String canalUser; String canalPwd; String canalInstance; public canalClient(String host, int port, String user, String pwd, String instance){ canalHost = host; canalPort = port; canalUser = user; canalPwd = pwd; canalInstance = instance; } private void process(List<CanalEntry.Entry> etries) throws Exception { for(CanalEntry.Entry entry : etries){ if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){ continue; } CanalEntry.RowChange rowChange; rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); CanalEntry.EventType eventType = rowChange.getEventType(); System.out.println(entry.getHeader().getTableName()); switch (eventType){ case INSERT: for(CanalEntry.RowData rowData : rowChange.getRowDatasList()){ printData(rowData.getBeforeColumnsList()); printData(rowData.getAfterColumnsList()); } break; case UPDATE: for(CanalEntry.RowData rowData : rowChange.getRowDatasList()){ printData(rowData.getBeforeColumnsList()); printData(rowData.getAfterColumnsList()); } break; case DELETE: for(CanalEntry.RowData rowData : rowChange.getRowDatasList()){ printData(rowData.getBeforeColumnsList()); printData(rowData.getAfterColumnsList()); } break; } } } private void printData(List<CanalEntry.Column> columns){ for(CanalEntry.Column column : columns){ System.out.println(String.format("%s: %s", column.getName(), column.getValue())); } } public void start(){ InetSocketAddress inetSocketAddress = new InetSocketAddress(canalHost, canalPort); CanalConnector canalConnector = CanalConnectors.newSingleConnector(inetSocketAddress, canalInstance, canalUser, canalPwd); try{ canalConnector.connect(); canalConnector.subscribe(); canalConnector.rollback(); while(true){ Message message = canalConnector.getWithoutAck(1000); long batchId = message.getId(); int size = message.getEntries().size(); if(batchId == -1 || size == 0){ }else{ try{ process(message.getEntries()); }catch(Exception e){ break; } } canalConnector.ack(batchId); } }catch(Exception e){ e.printStackTrace(); }finally{ canalConnector.disconnect(); } } }
TfcanalApplication
修改TfcanalApplication对象,改成命令行模式。
在run方法里面调用canalClient对象,启动客户端。
@SpringBootApplication public class TfcanalApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(TfcanalApplication.class, args); } @Override public void run(String... args) throws Exception { canalClient canalClient = new canalClient( "127.0.0.1", 11111, "canal", "tongfu.net", "tfams" ); canalClient.start(); } }
启动
启动服务器端
启动cmd,切到canal的bin目录,执行startup.bat。
总结
今天福哥带着大家学习了阿里巴巴的Canal的搭建方法,还学会了通过Springboot搭建Canal客户端,完成了通过Canal监听MySQL的biglog日志实现实时获取数据变化的目的。