kafka的脚本管理
topic管理
1.查看topic列表
1 2 [root@elk01:1 ~]# kafka-topics.sh --bootstrap-server 10.0.0.211:9092 --list
**2.创建topic **
1 2 3 4 5 6 7 8 9 10 11 12 13 [root@elk01:1 ~]# kafka-topics.sh --bootstrap-server 10.0.0.211:9092 --topic luay --partitions 3 --replication-factor 2 --create Created topic luay. --topic luay --partitions 3 --replication-factor 2 --create --delete --alter --describe
3.修改分区数量【只能由小变大,不能由大变小】
1 [root@elk01:1 ~]# kafka-topics.sh --bootstrap-server 10.0.0.211:9092 --topic luay --partitions 5 --alter
4.查看指定topic的详细信息
1 [root@elk01:1 ~]# kafka-topics.sh kafka-topics.sh --bootstrap-server 10.0.0.211:9092 --topic luay --describe
5.删除topic
1 2 3 4 5 6 7 8 [root@elk01:1 ~]# kafka-topics.sh kafka-topics.sh --bootstrap-server 10.0.0.211:9092 --topic luay --delete [root@elk01:1 ~]# ll /app/data/kafka/ drwxr-xr-x 2 root root 4096 Oct 29 18:15 luay-1.2baa4b610f314befbfbed58aaad84052-delete/ drwxr-xr-x 2 root root 4096 Oct 29 18:15 luay-2.0f44f4821a1849ccb7e0e37e1fe65ff4-delete/
producer管理
1 2 3 4 5 6 7 [root@elk01:5 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.211:9092 --topic test >999999 [2024-10-29 20:49:30,357] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 7 : {test =LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) [root@elk02:1 ~]# kafka-topics.sh --bootstrap-server 10.0.0.211:9092 --list test
kafka的consumer管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 [root@elk01:5 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.211:9092 --topic test >12112112 >666666 [root@elk01:4 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.211:9092 --topic test 666666 [root@elk01:4 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.211:9092 --topic test --from-beginning 999999 12112112 666666
消费者组理念
1 2 3 4 5 6 7 8 9 10 11 12 13 - kafka的消费者组概念 1.kafka的offset存储位置 kafka早期版本 0.9-版本,offset记录存储在zookeeper集群。 从kafka 0.10+版本,默认的offset存储在kafka集群,存储在一个名为"__consumer_offsets" 内置的topic。(当消费者出现宕机,当前当前组里的其它消费者会从_consumer_offsets记录的偏移量里,找到已经取到哪里的数据,从此点继续往后取数据,而不是从新取所有数据,避免数据重复采集) 2.相关术语 consumer group: 消费者组,任意一个消费者都隶属于一个消费者组。 1.而"__consumer_offsets" 内置的topic记录的偏移量并不属于某个消费者,而是基于消费者进行记录的。 2.当消费者组的消费者数量发生变化时,会触发重平衡(Rebalance); 1.比如消费者组的消费者(C1/C2)新增或者下线,所谓的重平衡指的是该消费者组的消费者重新分配分区(0分区,1分区,2分区过程。 2.该消费者组若新增了消费者,也会触发重平衡; 3.当消费者组的数量多余partition数量时,则会导致该消费者组有空闲的消费者;(假设5个消费者,但是只有3个分区)
kafka消费者组数据延迟分析
由图解,kafka的消费者是logstash,怎么会出现数据延迟呢?
当生产者过多,logstash忙不过来时候收集,就会出现数据延迟可能
理论存在,分析实践
1. 查看现有的消费者组列表
1 2 3 4 [root@elk03:0 ~]# kafka-consumer-groups.sh --bootstrap-server 10.0.0.211:9092 --list console-consumer-10558 console-consumer-73547 console-consumer-4558
2.创建topic指定分区和副本
1 2 [root@elk03:0 ~]# kafka-topics.sh --bootstrap-server 10.0.0.211:9092 --topic new --partitions 3 --replication-factor 2 --create Created topic new.
3.启动生产者
1 2 [root@elk01:5 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.211:9092 --topic new >999999
4.启动消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 [root@elk01:4 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.211:9092 --topic new --from-beginning --group dev [root@elk03:0 ~]# kafka-consumer-groups.sh --bootstrap-server 10.0.0.211:9092 --list dev console-consumer-10558 console-consumer-73547 [root@elk03:0 ~]# kafka-consumer-groups.sh --bootstrap-server 10.0.0.211:9092 --group dev --describe GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG dev new 0 0 0 0 dev new 1 0 0 0 dev new 2 1 0 0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 [root@elk01:5 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.211:9092 --topic new >999999999 >aaaaaaaa >vbbbbbb >ccccc [root@elk02:1 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.211:9092 --topic new --from-beginning --group dev 此时没有新的数据,因为被同一个组(dev)的第一个消费者已经采集过了 [root@elk01:5 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.211:9092 --topic new >999999999 >aaaaaaaa >vbbbbbb >ccccc >9999999999 >69696969696
停止所有消费者,只写入数据
1 2 3 4 5 6 7 [root@elk01:5 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.211:9092 --topic new >h >h >asasasasasasas [root@elk03:0 ~]# kafka-consumer-groups.sh --bootstrap-server 10.0.0.211:9092 --group dev --describe
kafka数据丢失分析
见图解
1 2 3 4 5 6 7 8 9 10 11 kafka的ISR列表导致数据丢失的原因: ISR: 和leader副本同步的所有副本集合。 OSR: 和leader副本不同步所有副本集合。 AR: 所有副本,指的是leader + follower,即AR = ISR + OSR LEO: 英文全称为: LOG-END-OFFSET,表示每个partition最后一个Offset。 HW: 表示在ISR列表中所有LEO中最小的LEO。