kafka的脚本管理

topic管理

1.查看topic列表

1
2
[root@elk01:1 ~]# kafka-topics.sh --bootstrap-server 10.0.0.211:9092 --list 

**2.创建topic **

1
2
3
4
5
6
7
8
9
10
11
12
13
[root@elk01:1 ~]# kafka-topics.sh --bootstrap-server 10.0.0.211:9092 --topic luay --partitions 3 --replication-factor 2 --create
Created topic luay.

#注释:
--topic luay #指定topic为luay
--partitions 3 #创建3个分区
--replication-factor 2 #创建2个副本
--create #动作--创建

#其它选项
--delete #删除
--alter #修改
--describe #输出详细信息

image-20241029180717295

3.修改分区数量【只能由小变大,不能由大变小】

1
[root@elk01:1 ~]# kafka-topics.sh --bootstrap-server 10.0.0.211:9092 --topic luay --partitions 5 --alter

image-20241029180957289

4.查看指定topic的详细信息

1
[root@elk01:1 ~]# kafka-topics.sh kafka-topics.sh --bootstrap-server 10.0.0.211:9092 --topic luay --describe

image-20241029181157071

5.删除topic

1
2
3
4
5
6
7
8
[root@elk01:1 ~]# kafka-topics.sh kafka-topics.sh --bootstrap-server 10.0.0.211:9092 --topic luay --delete


#kafka数据目录里topic会被标记-delete,等待一会便会自动删除
[root@elk01:1 ~]# ll /app/data/kafka/
drwxr-xr-x 2 root root 4096 Oct 29 18:15 luay-1.2baa4b610f314befbfbed58aaad84052-delete/
drwxr-xr-x 2 root root 4096 Oct 29 18:15 luay-2.0f44f4821a1849ccb7e0e37e1fe65ff4-delete/

producer管理

1
2
3
4
5
6
7
[root@elk01:5 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.211:9092 --topic test
>999999
[2024-10-29 20:49:30,357] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 7 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

#首次写入时,若topic不存在,则kafka集群默认会自动创建。
[root@elk02:1 ~]# kafka-topics.sh --bootstrap-server 10.0.0.211:9092 --list
test

kafka的consumer管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#生产者发送消息
[root@elk01:5 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.211:9092 --topic test
>12112112
>666666

#消费者接收消息---接收现在开始的消息
[root@elk01:4 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.211:9092 --topic test
666666

#消费者接收消息---从头接收消息 --from-beginning
[root@elk01:4 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.211:9092 --topic test --from-beginning
999999
12112112
666666

消费者组理念

1
2
3
4
5
6
7
8
9
10
11
12
13
- kafka的消费者组概念
1.kafka的offset存储位置
kafka早期版本 0.9-版本,offset记录存储在zookeeper集群。
从kafka 0.10+版本,默认的offset存储在kafka集群,存储在一个名为"__consumer_offsets"内置的topic。(当消费者出现宕机,当前当前组里的其它消费者会从_consumer_offsets记录的偏移量里,找到已经取到哪里的数据,从此点继续往后取数据,而不是从新取所有数据,避免数据重复采集)

2.相关术语
consumer group: 消费者组,任意一个消费者都隶属于一个消费者组。

1.而"__consumer_offsets"内置的topic记录的偏移量并不属于某个消费者,而是基于消费者进行记录的。
2.当消费者组的消费者数量发生变化时,会触发重平衡(Rebalance);
1.比如消费者组的消费者(C1/C2)新增或者下线,所谓的重平衡指的是该消费者组的消费者重新分配分区(0分区,1分区,2分区过程。
2.该消费者组若新增了消费者,也会触发重平衡;
3.当消费者组的数量多余partition数量时,则会导致该消费者组有空闲的消费者;(假设5个消费者,但是只有3个分区)

consumer_group

kafka消费者组数据延迟分析

由图解,kafka的消费者是logstash,怎么会出现数据延迟呢?

ElasticStack_MQ

当生产者过多,logstash忙不过来时候收集,就会出现数据延迟可能

理论存在,分析实践

1. 查看现有的消费者组列表

1
2
3
4
[root@elk03:0 ~]# kafka-consumer-groups.sh --bootstrap-server  10.0.0.211:9092 --list
console-consumer-10558
console-consumer-73547
console-consumer-4558

2.创建topic指定分区和副本

1
2
[root@elk03:0 ~]# kafka-topics.sh --bootstrap-server 10.0.0.211:9092 --topic new --partitions 3 --replication-factor 2 --create
Created topic new.

3.启动生产者

1
2
[root@elk01:5 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.211:9092 --topic new
>999999

4.启动消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#1. 启动第一个消费者,指定消费者组
[root@elk01:4 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.211:9092 --topic new --from-beginning --group dev

# 2.查看消费者列表
[root@elk03:0 ~]# kafka-consumer-groups.sh --bootstrap-server 10.0.0.211:9092 --list
dev
console-consumer-10558
console-consumer-73547

# 3.查看指定消费者的详细信息
[root@elk03:0 ~]# kafka-consumer-groups.sh --bootstrap-server 10.0.0.211:9092 --group dev --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
dev new 0 0 0 0
dev new 1 0 0 0
dev new 2 1 0 0

image-20241029224849054

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 4.生产者继续写入测试数据
[root@elk01:5 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.211:9092 --topic new
>999999999
>aaaaaaaa
>vbbbbbb
>ccccc

# 5. 启动新的消费者
[root@elk02:1 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.211:9092 --topic new --from-beginning --group dev

此时没有新的数据,因为被同一个组(dev)的第一个消费者已经采集过了

# 6. 写入测试数据
[root@elk01:5 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.211:9092 --topic new
>999999999
>aaaaaaaa
>vbbbbbb
>ccccc
>9999999999
>69696969696

停止所有消费者,只写入数据

1
2
3
4
5
6
7
[root@elk01:5 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.211:9092 --topic new
>h
>h
>asasasasasasas

# 再次查看详细数据
[root@elk03:0 ~]# kafka-consumer-groups.sh --bootstrap-server 10.0.0.211:9092 --group dev --describe

image-20241029230457830

kafka数据丢失分析

见图解

1
2
3
4
5
6
7
8
9
10
11
kafka的ISR列表导致数据丢失的原因:
ISR:
和leader副本同步的所有副本集合。
OSR:
和leader副本不同步所有副本集合。
AR:
所有副本,指的是leader + follower,即AR = ISR + OSR
LEO:
英文全称为: LOG-END-OFFSET,表示每个partition最后一个Offset。
HW:
表示在ISR列表中所有LEO中最小的LEO。

kafka-ISR