1、打开MYSQL的binlog
https://github.com/alibaba/canal/wiki/QuickStart
【MySQL】Mac 上使用 brew 安装的 mysql 配置文件位置
https://www.jianshu.com/p/acac5124991d
文件地址在:/usr/local/etc/my.cnf
加入以下的内容
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
重启服务:
lemonhall@yunings-Mini etc % brew services restart mysql
Stopping `mysql`... (might take a while)
==> Successfully stopped `mysql` (label: homebrew.mxcl.mysql)
==> Successfully started `mysql` (label: homebrew.mxcl.mysql)
lemonhall@yunings-Mini etc %
=====================================================
2、增加MYSQL对应账户并授权
mysql -u root
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
=====================================================
3、下载并解压
mkdir deployer
cp canal.deployer-1.1.4.tar.gz deployer
cd deployer
tar zxvf canal.deployer-1.1.4.tar.gz
=====================================================
4、修改配置文件
vi conf/example/instance.properties
启动
sh bin/startup.sh
查看 server 日志
vi logs/canal/canal.log</pre>
2013-02-05 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
查看 instance 的日志
vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
关闭
sh bin/stop.sh
这几步,我这边都报错了,有问题
=====================================================
5、解决连接错误问题
128 2019-10-26 06:01:22.279 [destination = example , address = /127.0.0.1:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:example[com.alibaba.otter.canal.parse.exception.CanalParseExc eption: java.io.IOException: connect /127.0.0.1:3306 failure
129 Caused by: java.io.IOException: connect /127.0.0.1:3306 failure
130 at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:83)
131 at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.connect(MysqlConnection.java:89)
132 at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.preDump(MysqlEventParser.java:86)
133 at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:183)
134 at java.lang.Thread.run(Thread.java:748)
135 Caused by: java.io.IOException: caching_sha2_password Auth failed
136 at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:257)
137 at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:80)
138 ... 4 more
https://www.linuxidc.com/Linux/2018-11/155500.htm
如果您的MySQL安装必须服务于8.0之前的客户端,并且在升级到MySQL 8.0或更高版本后遇到兼容性问题,解决这些问题并恢复8.0之前的兼容性的最简单方法是重新配置服务器以恢复到以前的默认身份验证插件(mysql_native_password)。例如,在配置文件my.cnf中使用以下行:
[mysqld]
default_authentication_plugin=mysql_native_password
该设置允许8.0之前的客户端连接到8.0服务器,直到安装使用的客户端和连接器升级为了 caching_sha2_password。但是,该设置应被视为临时设置,而不是长期或永久性解决方案,因为它会导致使用有效设置创建的新帐户放弃提供的改进的身份验证安全性 caching_sha2_password。
然后参考了:
https://blog.csdn.net/u010142437/article/details/82423628
ALTER USER canal IDENTIFIED WITH mysql_native_password BY 'canal';
FLUSH PRIVILEGES;
好了,8.0的认证问题解决了;
=====================================================
6、装一个客户端
然后他妈的也是连接报错
ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY '';
FLUSH PRIVILEGES;
=====================================================
7、解决kafka报错
205 java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
206 at com.alibaba.otter.canal.kafka.CanalKafkaProducer.produce(CanalKafkaProducer.java:215) ~[canal.server-1.1.4.jar:na]
207 at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:179) ~[canal.server-1.1.4.jar:na]
208 at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:120) ~[canal.server-1.1.4.jar:na]
209 at com.alibaba.otter.canal.server.CanalMQStarter.worker(CanalMQStarter.java:183) [canal.server-1.1.4.jar:na]
210 at com.alibaba.otter.canal.server.CanalMQStarter.access$500(CanalMQStarter.java:23) [canal.server-1.1.4.jar:na]
211 at com.alibaba.otter.canal.server.CanalMQStarter$CanalMQRunnable.run(CanalMQStarter.java:225) [canal.server-1.1.4.jar:na]
212 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_221]
213 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_221]
214 at java.lang.Thread.run(Thread.java:748) [na:1.8.0_221]
配置文件:
https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
7.1 再装一个kafka的工具
http://www.kafkatool.com/download.html
OK了,主要问题就一个,需要把配置文件里的端口号写对
##################################################
######### MQ #############
##################################################
canal.mq.servers = 127.0.0.1:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
# aliyun mq namespace
#canal.mq.namespace =
# tcp, kafka, RocketMQ
canal.serverMode = kafka
重启之后,表里添加数据,kafka就会多一条消息
当然,因为是byte的,看不懂
=====================================================
8、开始解析
https://www.cnblogs.com/createweb/p/10580281.html
https://www.cnblogs.com/createweb/p/10523780.html
首先是可以用命令行工具观察到的:
kafka-console-consumer --bootstrap-server localhost:9092 --topic example
*�7
mysql-bin.000002� *UTF-80ȑ̩�-8BJPW #�
mysql-bin.000002� *UTF-80ȑ̩�-8BexampleJfirstP>Xb
rowsCount1�Pb�
id (0B2RINT(11) unsigned
'
name (0BshengliR
VARCHAR(256)id (0B2RINT(11) unsigned%
name (0BHelloR
VARCHAR(256):
mysql-bin.000002� *UTF-80ȑ̩�-8BJP580
当然,是乱码
不管他
=====================================================
9、后来发现是可以不用那么麻烦的
首先
canal.properties配置文件当中
有一个:
canal.mq.flatMessage = true
然后kafka里就是json了;
{"data":[{"id":"2","name":"lemonhall"}],"database":"example","es":1572045535000,"id":3,"isDdl":false,"mysqlType":{"id":"INT(11) unsigned","name":"VARCHAR(256)"},"old":[{"name":"qiqi"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"first","ts":1572045535280,"type":"UPDATE"}
如果是JSON的话,那我就可以直接用JS来写逻辑搞了;
{
"data":[
{
"id":"2",
"name":"lemonhall"
}
],
"database":"example",
"es":1572045535000,
"id":3,
"isDdl":false,
"mysqlType":{
"id":"INT(11) unsigned",
"name":"VARCHAR(256)"
},
"old":[
{
"name":"qiqi"
}
],
"pkNames":[
"id"
],
"sql":"",
"sqlType":{
"id":4,
"name":12
},
"table":"first",
"ts":1572045535280,
"type":"UPDATE"
}