kafka介绍

1
2
Kafka 被称为下一代分布式消息系统,由 Scala 和 Java编写,是非营利性组织ASF(Apache Software Foundation)基金会中的一个开源项目,比如:HTTP Server、Tomcat、Hadoop、ActiveMQ等开源软件都属于 Apache基金会的开源软件,类似的消息系统还有RabbitMQ、ActiveMQ、ZeroMQ。
Kafka用于构建实时数据管道和流应用程序。 它具有水平可伸缩性,容错性,快速性,可在数千家组织中同时投入生产协同工作。

kafka常见术语

:dango: 1.

1
2
3
4
5
6
7
8
9
10
- topic:
表示是主题,对应是逻辑存储单元,一般用于区分业务类型。可以和ES的索引对应。
- partition
分区,一个topic对应一个或多个partition。
- replica:
是数据的实际载体,真正存储数据的资源,分为leader和follower,其中leader对外提供读写,而follower负责数据的同步。
- producer:
生产者,往kafka集群写数据的一方。
- consumer:
消费者,从kafka读取数据的一方。

topic

读取数据只能去leader读取,follow值负责同步数据

:dango:2. 角色介绍

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
(1)Producer:Producer即生产者,消息的产生者,是消息的入口。负责发布消息到Kafka broker
(2)Consumer:消费者,用于消费消息,即处理消息

Broker:Broker是kafka实例,每个服务器上可以有一个或多个kafka的实例,假设每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如: broker-0、broker-1等……

(3)Topic :消息的主题,可以理解为消息的分类,一个Topic相当于数据库中的一张表,一条消息相当于关系数据库的一条记录,一个Topic或者相当于Redis中列表类型的一个Key,一条消息即为列表中的一个元素。kafka的数据就保存在topic。在每个broker上都可以创建多个topic。物理上不同 topic 的消息分开存储在不同的文件夹,逻辑上一个 topic的消息虽然保存于一个或多个broker 上, 但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处,topic 在逻辑上对record(记录、日志)进行分组保存,消费者需要订阅相应的topic 才能消费topic中的消息

(4)Consumer group: 每个consumer 属于一个特定的consumer group(可为每个consumer 指定 group name,若不指定 group name 则属于默认的group),同一topic的一条消息只能被同一个consumer group 内的一个consumer 消费,类似于一对一的单播机制,但多个consumer group 可同时消费这一消息,类似于一对多的多播机制

(5)Partition :是物理上的概念,每个topic 分割为一个或多个partition,即一个topic切分为多份.创建 topic时可指定 partition 数量,partition的表现形式就是一个一个的文件夹,该文件夹下存储该partition的数据和索引文件,分区的作用还可以实现负载均衡,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,一般Partition数不要超过节点数,注意同一个partition数据是有顺序的,但不同的partition则是无序的

(6)Replication: 同样数据的副本,包括leader和follower的副本数,基本于数据安全,建议至少2个,是Kafka的高可靠性的保障,和ES的副本有所不同,Kafka中的副本数包括主分片数,而ES中的副本数不包括主分片数

为了实现数据的高可用,比如将分区 0 的数据分散到不同的kafka 节点,每一个分区都有一个 broker 作为 Leader 和一个 broker 作为Follower,类似于ES中的主分片和副本分片。

假设分区为 3, 即分三个分区0-2,副本为3,即每个分区都有一个 leader,再加两个follower,分区 0 的leader为服务器A,则服务器 B 和服务器 C 为 A 的follower,而分区 1 的leader为服务器B,则服务器 A 和C 为服务器B 的follower,而分区 2 的leader 为C,则服务器A 和 B 为C 的follower。

AR: Assigned Replicas,分区中的所有副本的统称,包括leader和 follower,AR= lSR+ OSR
lSR:ln Sync Replicas,所有与leader副本保持同步的副本 follower和leader本身组成的集合,包括leader和 follower,是AR的子集
OSR:out-of-Sync Replied,所有与leader副本同步不能同步的 follower的集合,是AR的子集

:dango: 3. 分区和副本的优势

1
2
3
实现存储空间的横向扩容,即将多个kafka服务器的空间组合利用
提升性能,多服务器并行读写
实现高可用,每个分区都有一个主分区即 leader 分布在不同的kafka 服务器,并且有对应follower 分布在和leader不同的服务器上

:dango: 4. kafka写入消息流程

1
2
3
4
5
6
生产者(producter)先从kafka集群获取分区的leader
生产者(producter)将消息发送给leader
leader将消息写入本地文件
followers从leader pull消息
followers将消息写入本地后向leader发送ACK
leader收到所有副本的ACK后向producter发送ACK

:dango: 5. kafka特点和优势

特点:
分布式: 多机实现,不允许单机
分区: 一个消息.可以拆分出多个,分别存储在多个位置
多副本: 防止信息丢失,可以多来几个备份
多订阅者: 可以有很多应用连接kafka
Zookeeper: 早期版本的Kafka依赖于zookeeper, 2021年4月19日Kafka 2.8.0正式发布,此版本包括了很多重要改动,最主要的是kafka通过自我管理的仲裁来替代ZooKeeper,即Kafka将不再需要ZooKeeper!!!

优势:
高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。支持通过Kafka 服务器分区消息。
分布式: Kafka 基于分布式集群实现高可用的容错机制,可以实现自动的故障转移。
顺序保证:在大多数使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。 Kafka保证一个Partiton内的消息的有序性(分区间数据是无序的,如果对数据的顺序有要求,应将在创建主题时将分区数partitions设置为1)。
支持 Hadoop 并行数据加载。
通常用于大数据场合,传递单条消息比较大,而Rabbitmq 消息主要是传输业务的指令数据,单条数据较小。

单点部署kafka

1.下载kafka

1
[root@elk01 ~]#wget https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz

2.解压kafka

1
[root@elk01 ~]# tar xf kafka_2.13-3.8.0.tgz -C /app/

3.修改kafka的配置文件

1
2
3
4
5
6
7
8
9
10
11
12
[root@elk01:3 ~]# vim /app/kafka_2.13-3.8.0/config/server.properties 
···
# 修改kafka的broker的ID信息
broker.id=211
# 修改数据目录
log.dirs=/app/data/kafka
# 修改元数据存储zookeeper集群地址
zookeeper.connect=10.0.0.211:2181,10.0.0.212:2181,10.0.0.213:2181/kafka380


'注释:'
/kafka380 是将以后的znode都放在kafka380下,不指定则默认都在zookeeper的 / 下

4.配置环境变量

1
2
3
4
5
6
7
[root@elk01 ~]# cat  /etc/profile.d/kafka.sh
#!/bin/bash
export KAFKA_HOME=/app/kafka_2.13-3.8.0
export PATH=$PATH:$KAFKA_HOME/bin

#加载环境变量
[root@elk01:4 ~]# source /etc/profile.d/kafka.sh

5.启动kafka节点

1
[root@elk01:4 ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

image-20241029000747486

kafka配置文件详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#
# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.
# See kafka.server.KafkaConfig for additional details and defaults
#

############################# Server Basics #############################

# broker 的全局唯一编号,不能重复,只能是数字。
broker.id=0

############################# Socket Server Settings #############################

# 套接字服务器侦听的地址。如果未配置,主机名将等于的值
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# 侦听器名称、主机名和代理将向客户端公布的端口。
# 如果未设置,则使用“listeners”的值。
#advertised.listeners=PLAINTEXT://your.host.name:9092

# 将侦听器名称映射到安全协议,默认情况下它们是相同的。有关更多详细信息,请参阅配置文档
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# 处理网络请求的线程数量(服务器用于从网络接收请求并向网络发送响应的线程数)
num.network.threads=3

# 用来处理磁盘 IO 的线程数量
num.io.threads=8

# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400

# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400

# 请求套接字的缓冲区最大大小
socket.request.max.bytes=104857600


############################# Log Basics #############################

# kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/usr/kafka/kafka_2.13-3.6.1/datas

# 每个topic在当前 broker上的默认分区数。更多的分区允许更大的并行性以供使用,但这也会导致代理之间有更多的文件。
num.partitions=1

# 启动时用于日志恢复和关闭时用于刷新的每个数据目录的线程数。(用来恢复和清理 data 下数据的线程数量)对于数据目录位于RAID阵列中的安装,建议增加此值。
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings #############################
# 每个 topic 创建时的副本数,默认时 1 个副本,对于开发测试以外的环境,建议使用大于1的值以确保可用性,如3
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# 强制将数据刷新到磁盘之前要接受的消息数
#log.flush.interval.messages=10000

# 在强制刷新之前,消息可以在日志中停留的最长时间
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# segment 文件保留的最长时间,超时将被删除
log.retention.hours=168

# 基于大小的日志保留策略。除非剩余的
# segments下降到 log.retention.bytes 以下。独立于log.retention.hours的函数.
#log.retention.bytes=1073741824

# 每个 segment 文件的最大大小,默认最大 1G ,当达到此大小时,将创建一个新的segment。
log.segment.bytes=1073741824

# 检查日志段以查看是否可以根据保留策略删除它们的间隔(检查过期数据的时间,默认 5 分钟检查一次是否数据过期)
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper集群连接字符串,一个以逗号分隔的'主机:端口'对,每个对对应一个zk服务器。可以在url中附加一个可选的chroot字符串,以指定所有kafka-znode的根目录。
zookeeper.connect=10.0.0.211:2181,10.0.0.212:2181,10.0.0.213:2181

# 连接到zookeeper 的超时时间(毫秒)
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################

# 以下配置指定GroupCoordinator将延迟初始使用者重新平衡的时间(以毫秒为单位)。
# 随着新成员加入组,再平衡将进一步延迟group.initial.rebalance.delay.ms的值,最大值为max.poll.interval.ms。
# 默认值为3秒。
# 我们在这里将其覆盖为0,因为它为开发和测试提供了更好的开箱即用体验。
# 但是,在生产环境中,默认值3秒更合适,因为这将有助于避免在应用程序启动期间进行不必要的、可能代价高昂的重新平衡。
group.initial.rebalance.delay.ms=3

部署kafka集群

1
重复单点部署的1-6步骤,不过需要注意的是,配置文件的broker.id=211/212/213  根据你的ip后尾自行修改(三个节点不相同,且为正整数就行)

温馨提示:三个 节点做hosts解析,将主机名解析到你的ip上,巨坑,一开始忘记反向解析这个问题了

1
2
3
4
[root@elk211 ~]# cat >> /etc/hosts <<EOF
10.0.0.211 elk01
10.0.0.212 elk02
10.0.0.213 elk03

image-20241029185857776

以此为戒。。。。。。巨坑

image-20241029002841827

我们配置文件里没有指定每个kafka的地址,那它们是如何找到彼此的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[root@elk01:4 ~]# grep -Ev '^#|^$' /app/kafka_2.13-3.8.0/config/server.properties 
broker.id=211
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/app/data/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=10.0.0.211:2181,10.0.0.212:2181,10.0.0.213:2181/kafka380
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

kafka通过连接zookeeper,去找到各个kafka