使用canal连接kafka
本文最后更新于:2023年1月12日 下午
使用canal连接kafka
这篇主要是项目还原,目的是记录构建时遇到的各种奇葩坑,避免下次迷路。废话不多说,直接上手。
默认已安装
docker
,docker-compose
,nodejs
,yarn
,typescript
- 首先根据 kafka-docker 这个官方的仓库下的
docker-compose.yml
复制一份到自己的项目中
1 |
|
将kafka
下的build
项,更改为kafka
镜像,可以从dockerhub中查找指定版本的kafka
,这里使用wurstmeister/kafka:2.13-2.7.0
在environment
下添加配置属性
1 |
|
然后拉取镜像,并运行起来
1 |
|
编写
Producer
和Customer
kafkajs
版1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86//config.ts //简单的配置
const Config = {
brokers: [
"localhost:9092" //kafka的服务器
],
topic: 'test' //与kafka添加的topcs一样
}
export default Config;
//kafka.ts //实例化一个kafkajs对象
import { Kafka } from "kafkajs";
import Config from "./config";
const kafka = new Kafka({
clientId: 'kafkajs',
brokers: Config.brokers
});
export default kafka;
//producer.ts //kafka Producer
import { Message } from "kafkajs";
import Config from "./config";
import kafka from "./kafka";
async function producer(messages: Message[]) {
const producer = kafka.producer();
await producer.connect();
await producer.send({
topic: Config.topic,
messages
});
await producer.disconnect()
}
export default producer;
//consumer.ts //kafka Consumer
import kafka from "./kafka";
import Config from "./config";
import { ConsumerConfig, EachMessagePayload } from "kafkajs";
async function consumer(config: ConsumerConfig) {
const consumer = kafka.consumer(config);
await consumer.connect()
await consumer.subscribe({
topic: Config.topic,
fromBeginning: true
});
await consumer.run({
eachMessage: async ({topic, partition, message}: EachMessagePayload): Promise<void> => {
console.log({
value: message.value.toString(),
topic,
partition
})
}
})
}
export default consumer;
//index.ts //函数入口
import producer from "./producer";
import consumer from './consumer';
async function start() {
await producer([
{value: 'Hello'},
{value: ','},
{value: 'I\'m'},
{value: 'kafkajs'}
])
await consumer({
groupId: 'consumer-1'
})
await consumer({
groupId: 'consumer-2'
})
}
start()
.catch(console.log)然后编译文件,并运行,可以看到我们消息从
Producer
发送到了Consumer
接入
canal
修改
docker-compose.yml
,添加canal
的镜像和相关配置,同时添加一个测试的mysql
镜像(注,由于项目需求,我还配置了wordpress
镜像)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33...
canal:
image: canal/canal-server:v1.1.4
environment:
- canal.instance.mysql.slaveId=54321 #slave id 不要与mysql的一样就行
- canal.instance.master.address=mysql:3306 #mysql地址
- canal.instance.dbUsername=kafka #mysql 对应的用户名
- canal.instance.dbPassword=kafka #mysql 对应的密码
- canal.instance.parser.parallel=false #由于我用的虚拟机,cpu为1G,所以设为false
- canal.instance.filter.regex=kafka\.user #数据库中要监听的表,详细看官方说明
- canal.mq.dynamicTopic=.*\..* #动态生成topic
- canal.zkServers=zookeeper:2181 #链接zookeeper集群的链接信息
#canal.properties 配置
- canal.serverMode=kafka #MQ使用的kafka
- canal.mq.servers=kafka:9092 #kafka地址
depends_on:
- zookeeper
- kafka
mysql:
image: mysql:5.7
restart: always
volumes:
- ./configuration/conf.d/binlog.cnf:/etc/mysql/conf.d/binlog.cnf #为了让mysql开启bin_log模式的配置
restart: always
environment:
MYSQL_ROOT_PASSWORD: root_password_can_you
MYSQL_DATABASE: kafkadb
MYSQL_USER: kafka
MYSQL_PASSWORD: kafka
ports:
- 3306:3306binlog.cnf
配置文件内容,canal官方说明1
2
3
4[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复拉取镜像,启动项目
1
$ docker-compose up
更改
mysql
权限 ,使用root
登录到mysql
1
2
3
4CREATE USER kafka IDENTIFIED BY 'kafka'; # 创建与docker-compose.yml中对应的用户和密码
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'kafka'@'%'; #给mysql用户权限
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; #也可以给所有权限
FLUSH PRIVILEGES;创建一个数据库
kafkadb
并添加一个user
表向
user
表插入数据1
INSERT INTO user ( `id`, `username`) VALUES ( 1, 'yan');
好像没有数据过来(至少我的是这样)
排查问题
首先查看是否镜像运行正常
1
$ docker ps
发现没有问题,只有依次排查每个镜像日志,先从
canal
查起1
2
3
4
5
6$ docker exec -it <canal 镜像> bash
#然后进入canal-server文件夹
$ cd canal-server
$ cat logs/example/example.log
#发现出错了,以下为片段
# Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.百度后,发现和这个问题很像,那应该就是我们前面说的
kafka
的ip
设置成localhost
导致的,尝试更改一下,问题解决再插入数据,可以看到数据被接收到了
后记
其实在部署之间,遇到了很多问题,由于这次是问题重现,有些问题并没有再出现
例如有自己写的Producer
程序推送消息时,报错There is no leader for this topic-partition as we are in the middle of a leadership election
这是由于,没有设置KAFKA_BROKER_ID
导致每次构建项目,都重新生成了brokder id
,可以在构建项目时在其后添加--no-recreate
,可以再这里找到 。记得使用docker-compose rm -vfs
删除后再构建项目
也有zookeeper
报错Zookeeper Report Error:KeeperErrorCode = NoNode
,可以再这里找到
等等