介绍
介绍
之前福哥介绍过一个通过监听MySQL的binglog日志来实现实时获取数据增加、删除、修改的变化的工具php-mysql-replication,这个工具很棒,可惜的是只支持PHP语言。
今天福哥要带着大家学习一个更棒的工具阿里巴巴的Canal项目,Canal分为服务器端和客户端,服务器端是Java实现的,而客户端支持主流编程语言,包括:PHP、Java、Go、。
安装
服务器端
官方下载地址。
https://github.com/alibaba/canal/releases
福哥选择的是比较新的1.1.6版本。

配置
MySQL
binlog
首先要开启MySQL的binlog支持,并且设置binlog的格式为ROW。
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
授权
建立canal用户,授权replication权限。
grant replication client, replication slave, select on *.* to canal@'%' identified by 'tongfu.net';

服务器端
目录结构
解开压缩包可以得到如下一个目录结构。

打开conf目录,文件canal.properties是canal的配置文件,文件夹example是example实例的配置文件目录。

福哥复制example目录到tfams目录,然后我们来配置tfams实例。
tfams实例
打开tfams目录下的instance.properties文件。
canal.instance.master.address
设置mysql的主机和端口。
canal.instance.master.address=192.168.168.88:3306
canal.instance.dbUsername
设置mysql的登录名。
canal.instance.dbUsername=canal
canal.instance.dbPassword
设置mysql的密码。
canal.instance.dbPassword=tongfu.net
canal.instance.connectionCharset
设置mysql的连接数据编码。
canal.instance.connectionCharset = UTF-8
客户端
建立项目
建立一个Springboot项目,不需要勾选任何依赖。

依赖
添加Canal依赖,福哥选择的是1.1.3版本。
<!-- canal --> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.3</version> </dependency>
Service
canalClient
在service目录下面建立canalClient类。
public class canalClient {
String canalHost;
int canalPort;
String canalUser;
String canalPwd;
String canalInstance;
public canalClient(String host, int port, String user, String pwd, String instance){
canalHost = host;
canalPort = port;
canalUser = user;
canalPwd = pwd;
canalInstance = instance;
}
private void process(List<CanalEntry.Entry> etries)
throws Exception {
for(CanalEntry.Entry entry : etries){
if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){
continue;
}
CanalEntry.RowChange rowChange;
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
CanalEntry.EventType eventType = rowChange.getEventType();
System.out.println(entry.getHeader().getTableName());
switch (eventType){
case INSERT:
for(CanalEntry.RowData rowData : rowChange.getRowDatasList()){
printData(rowData.getBeforeColumnsList());
printData(rowData.getAfterColumnsList());
}
break;
case UPDATE:
for(CanalEntry.RowData rowData : rowChange.getRowDatasList()){
printData(rowData.getBeforeColumnsList());
printData(rowData.getAfterColumnsList());
}
break;
case DELETE:
for(CanalEntry.RowData rowData : rowChange.getRowDatasList()){
printData(rowData.getBeforeColumnsList());
printData(rowData.getAfterColumnsList());
}
break;
}
}
}
private void printData(List<CanalEntry.Column> columns){
for(CanalEntry.Column column : columns){
System.out.println(String.format("%s: %s", column.getName(), column.getValue()));
}
}
public void start(){
InetSocketAddress inetSocketAddress = new InetSocketAddress(canalHost, canalPort);
CanalConnector canalConnector = CanalConnectors.newSingleConnector(inetSocketAddress, canalInstance, canalUser, canalPwd);
try{
canalConnector.connect();
canalConnector.subscribe();
canalConnector.rollback();
while(true){
Message message = canalConnector.getWithoutAck(1000);
long batchId = message.getId();
int size = message.getEntries().size();
if(batchId == -1 || size == 0){
}else{
try{
process(message.getEntries());
}catch(Exception e){
break;
}
}
canalConnector.ack(batchId);
}
}catch(Exception e){
e.printStackTrace();
}finally{
canalConnector.disconnect();
}
}
}TfcanalApplication
修改TfcanalApplication对象,改成命令行模式。
在run方法里面调用canalClient对象,启动客户端。
@SpringBootApplication
public class TfcanalApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(TfcanalApplication.class, args);
}
@Override
public void run(String... args)
throws Exception {
canalClient canalClient = new canalClient(
"127.0.0.1",
11111,
"canal",
"tongfu.net",
"tfams"
);
canalClient.start();
}
}启动
启动服务器端
启动cmd,切到canal的bin目录,执行startup.bat。

启动客户端
福哥是直接通过IDEA启动的。

测试
修改mysql数据
福哥更新了用户“鬼谷子叔叔”的密码。


查看客户端
福哥在IDEA的输出框里看到了刚刚在mysql里更新的数据的变化,包括更改前的状态和更改后的状态。

总结
今天福哥带着大家学习了阿里巴巴的Canal的搭建方法,还学会了通过Springboot搭建Canal客户端,完成了通过Canal监听MySQL的biglog日志实现实时获取数据变化的目的。