跳到主要内容

09、Kafka 实战 - kafka消费者offset维护

消费者offset维护

offset维护

由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便恢复后继续消费。

消费者是按照消费者组来保存offset的,不是按照消费者单独保存的,如果某个消费者挂掉了,按消费者保存就无法获取上之前消费到的offset

之后,是按消费者组+Topic+Partition来确定唯一的一个offset

kafka0.9版本之前,consumer默认将offset保存在Zk中,从0.9版本开始,consumer默认将offset保存在kafka一个内置的topic中,该topic为_consumer_offsets

offset存在zk

创建一个新topic

bin/kafka-topics.sh --zookeeper hll4:2181 --create --topic bigdata --partitions 2 --replication-factor 2

启动一个生产者

bin/kafka-console-producer.sh --broker-list hll1:9092 --topic bigdata

启动一个消费者

bin/kafka-console-consumer.sh --zookeeper hll4:2181 --topic bigdata

连接Zk观察

zkCli.sh

# ls /根目录可以看到kafka注册到zk的信息
[zk: localhost:2181(CONNECTED) 16] ls /
[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, latest_producer_id_block, config]

#consumers就是启动的消费者组,注意这是消费者组
[zk: localhost:2181(CONNECTED) 17] ls /consumers
[console-consumer-7153]
[zk: localhost:2181(CONNECTED) 18] ls /consumers/console-consumer-7153
[ids, owners, offsets]

从zk查询到的offsets中就可以看到消费者组存储的offset信息

通过刚刚启动生产者生产一条消息,消费者也正常消费成功

 

 

进入到zk中,查看刚刚的offsets

[zk: localhost:2181(CONNECTED) 29] ls /consumers/console-consumer-7153/offsets           
[bigdata]主题
[zk: localhost:2181(CONNECTED) 30] ls /consumers/console-consumer-7153/offsets/bigdata
[0, 1]这个0和1是分区
[zk: localhost:2181(CONNECTED) 31] get /consumers/console-consumer-7153/offsets/bigdata/0
1offset
cZxid = 0x200000042
ctime = Tue Jan 25 19:13:16 PST 2022
mZxid = 0x200000042
mtime = Tue Jan 25 19:13:16 PST 2022
pZxid = 0x200000042
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 1
numChildren = 0

通过get /consumers/console-consumer-7153/offsets/bigdata/0,就可以看到当前消费的offset

 

查看get /consumers/console-consumer-7153/offsets/bigdata/1,offset是0,因为刚只生产了一条消息然后被消费

 

现在通过生产者,再生产一条消息被消费者消息后,再get /consumers/console-consumer-7153/offsets/bigdata/1,变成1了

注意:由于zk客户端的延迟,需要重新连接zkCli.sh才会观察到更新的数据

 

offset存kafka本地

修改配置文件consumer.properties

exclude.internal.topics=false

这时消费者服务需要用bootstrap-server来启动:

bin/kafka-console-consumer.sh --bootstrap-server hll1:9092 --topic bigdata

查看offset

bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper hll4:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning

 

重新执行查看offset命令异常的话,需要修改下consumer.properties修改下group.id配置

Group + Topic + Partition确定唯一的offset