1、打开MYSQL的binlog

https://github.com/alibaba/canal/wiki/QuickStart

【MySQL】Mac 上使用 brew 安装的 mysql 配置文件位置

https://www.jianshu.com/p/acac5124991d

文件地址在:/usr/local/etc/my.cnf

加入以下的内容

[mysqld]

log-bin=mysql-bin # 开启 binlog

binlog-format=ROW # 选择 ROW 模式

server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

重启服务:

lemonhall@yunings-Mini etc % brew services restart mysql

Stopping `mysql`... (might take a while)

==> Successfully stopped `mysql` (label: homebrew.mxcl.mysql)

==> Successfully started `mysql` (label: homebrew.mxcl.mysql)

lemonhall@yunings-Mini etc %

=====================================================

2、增加MYSQL对应账户并授权

mysql -u root

CREATE USER canal IDENTIFIED BY 'canal';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;

FLUSH PRIVILEGES;

=====================================================

3、下载并解压

mkdir deployer

cp canal.deployer-1.1.4.tar.gz deployer

cd deployer

tar zxvf canal.deployer-1.1.4.tar.gz

=====================================================

4、修改配置文件

vi conf/example/instance.properties

启动

sh bin/startup.sh

查看 server 日志

vi logs/canal/canal.log</pre>

2013-02-05 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.

2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]

2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

查看 instance 的日志

vi logs/example/example.log

2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]

2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]

2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example

2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

关闭

sh bin/stop.sh

这几步,我这边都报错了,有问题

=====================================================

5、解决连接错误问题

128 2019-10-26 06:01:22.279 [destination = example , address = /127.0.0.1:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:example[com.alibaba.otter.canal.parse.exception.CanalParseExc eption: java.io.IOException: connect /127.0.0.1:3306 failure

129 Caused by: java.io.IOException: connect /127.0.0.1:3306 failure

130 at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:83)

131 at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.connect(MysqlConnection.java:89)

132 at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.preDump(MysqlEventParser.java:86)

133 at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:183)

134 at java.lang.Thread.run(Thread.java:748)

135 Caused by: java.io.IOException: caching_sha2_password Auth failed

136 at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:257)

137 at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:80)

138 ... 4 more

https://www.linuxidc.com/Linux/2018-11/155500.htm

如果您的MySQL安装必须服务于8.0之前的客户端,并且在升级到MySQL 8.0或更高版本后遇到兼容性问题,解决这些问题并恢复8.0之前的兼容性的最简单方法是重新配置服务器以恢复到以前的默认身份验证插件(mysql_native_password)。例如,在配置文件my.cnf中使用以下行:

[mysqld]

default_authentication_plugin=mysql_native_password

该设置允许8.0之前的客户端连接到8.0服务器,直到安装使用的客户端和连接器升级为了 caching_sha2_password。但是,该设置应被视为临时设置,而不是长期或永久性解决方案,因为它会导致使用有效设置创建的新帐户放弃提供的改进的身份验证安全性 caching_sha2_password。

然后参考了:

https://blog.csdn.net/u010142437/article/details/82423628

ALTER USER canal IDENTIFIED WITH mysql_native_password BY 'canal';

FLUSH PRIVILEGES;

好了,8.0的认证问题解决了;

=====================================================

6、装一个客户端

http://www.sequelpro.com/

然后他妈的也是连接报错

ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY '';

FLUSH PRIVILEGES;

=====================================================

7、解决kafka报错

205 java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

206 at com.alibaba.otter.canal.kafka.CanalKafkaProducer.produce(CanalKafkaProducer.java:215) ~[canal.server-1.1.4.jar:na]

207 at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:179) ~[canal.server-1.1.4.jar:na]

208 at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:120) ~[canal.server-1.1.4.jar:na]

209 at com.alibaba.otter.canal.server.CanalMQStarter.worker(CanalMQStarter.java:183) [canal.server-1.1.4.jar:na]

210 at com.alibaba.otter.canal.server.CanalMQStarter.access$500(CanalMQStarter.java:23) [canal.server-1.1.4.jar:na]

211 at com.alibaba.otter.canal.server.CanalMQStarter$CanalMQRunnable.run(CanalMQStarter.java:225) [canal.server-1.1.4.jar:na]

212 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_221]

213 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_221]

214 at java.lang.Thread.run(Thread.java:748) [na:1.8.0_221]

配置文件:

https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

7.1 再装一个kafka的工具

http://www.kafkatool.com/download.html

OK了,主要问题就一个,需要把配置文件里的端口号写对

##################################################

######### MQ #############

##################################################

canal.mq.servers = 127.0.0.1:9092

canal.mq.retries = 0

canal.mq.batchSize = 16384

canal.mq.maxRequestSize = 1048576

canal.mq.lingerMs = 100

canal.mq.bufferMemory = 33554432

canal.mq.canalBatchSize = 50

canal.mq.canalGetTimeout = 100

canal.mq.flatMessage = true

canal.mq.compressionType = none

canal.mq.acks = all

#canal.mq.properties. =

canal.mq.producerGroup = test

# Set this value to "cloud", if you want open message trace feature in aliyun.

canal.mq.accessChannel = local

# aliyun mq namespace

#canal.mq.namespace =

# tcp, kafka, RocketMQ

canal.serverMode = kafka

重启之后,表里添加数据,kafka就会多一条消息

当然,因为是byte的,看不懂

=====================================================

8、开始解析

https://www.cnblogs.com/createweb/p/10580281.html

https://www.cnblogs.com/createweb/p/10523780.html

首先是可以用命令行工具观察到的:

kafka-console-consumer --bootstrap-server localhost:9092 --topic example

*�7

mysql-bin.000002� *UTF-80ȑ̩�-8BJPW #�

mysql-bin.000002� *UTF-80ȑ̩�-8BexampleJfirstP>Xb

rowsCount1�Pb�

id (0B2RINT(11) unsigned

'

name (0BshengliR

VARCHAR(256)id (0B2RINT(11) unsigned%

name (0BHelloR

VARCHAR(256):

mysql-bin.000002� *UTF-80ȑ̩�-8BJP580

当然,是乱码

不管他

=====================================================

9、后来发现是可以不用那么麻烦的

首先

canal.properties配置文件当中

有一个:

canal.mq.flatMessage = true

然后kafka里就是json了;

{"data":[{"id":"2","name":"lemonhall"}],"database":"example","es":1572045535000,"id":3,"isDdl":false,"mysqlType":{"id":"INT(11) unsigned","name":"VARCHAR(256)"},"old":[{"name":"qiqi"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"first","ts":1572045535280,"type":"UPDATE"}

如果是JSON的话,那我就可以直接用JS来写逻辑搞了;

{

"data":[

{

"id":"2",

"name":"lemonhall"

}

],

"database":"example",

"es":1572045535000,

"id":3,

"isDdl":false,

"mysqlType":{

"id":"INT(11) unsigned",

"name":"VARCHAR(256)"

},

"old":[

{

"name":"qiqi"

}

],

"pkNames":[

"id"

],

"sql":"",

"sqlType":{

"id":4,

"name":12

},

"table":"first",

"ts":1572045535280,

"type":"UPDATE"

}