filebeat写入数据到kafka集群

1. 创建topic

1
2
3
4
5
[root@elk01:5 ~]# kafka-topics.sh --bootstrap-server 10.0.0.212:9092 --topic linux-study --partitions 3 --replication-factor 2 --create
Created topic linux-study.


[root@elk01:5 ~]# kafka-topics.sh --bootstrap-server 10.0.0.212:9092 --topic linux-study --describe

image-20241030003043850

2. filebeat写入数据到kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[root@elk01:0 ~]# cat /etc/filebeat/20-filebeat-to-kafka.yaml 
filebeat.inputs:
- type: filestream
paths:
- /opt/access.log*

output.kafka:
# 指定kafka集群地址
hosts: ["10.0.0.211:9092", "10.0.0.212:9092", "10.0.0.213:9092"]

# 指定写入的topic地址
topic: 'linux-study'

#启动filebeat实例
[root@elk01:0 ~]# filebeat -e -c /etc/filebeat/20-filebeat-to-kafka.yaml

3. kafka节点测试

1
[root@elk01:0 ~]#  kafka-console-consumer.sh --bootstrap-server 10.0.0.212:9092 --topic linux-study --from-beginning --group filebeat01

image-20241030004427518

logstash从kafka集群读取数据

**1.编写logstash实例 **

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
[root@elk01:0 ~]# cat /etc/logstash/conf.d/17-kafka-to-logstash.conf
input {
kafka {
# 指定kafka集群的地址
bootstrap_servers => "10.0.0.211:9092,10.0.0.212:9092,10.0.0.213:9092"
# 指定topic列表
topics => ["linux-study"]
# 指定消费者组 (每次启动logstash,都需要换个组,因为同一个组的采集了不会再次采集,你也可以不用这个)
group_id => ["filebeat01"]
# 指定从offset开始读取数据的位置,earliest表示最早的数据开始读,latest表示从最新的位置读取。
auto_offset_reset => "earliest"
}
}

output {
stdout {
}

# elasticsearch{
# hosts => ["10.0.0.91:9200","10.0.0.92:9200","10.0.0.93:9200"]
# index => "oldboyedu-kafka-elk-%{+yyyy.MM.dd}"
# user => "elastic"
# password => "123456"
# }
}

2. 启动logstash实例

1
2
3
4
5
6
[root@elk01:0 ~]# logstash -rf /etc/logstash/conf.d/17-kafka-to-logstash.conf 

"@version" => "1",
"@timestamp" => 2024-10-29T17:05:27.448Z,
"message" => "{\"@timestamp\":\"2024-10-29T16:39:24.279Z\",\"@metadata\":{\"beat\":\"filebeat\",\"type\":\"_doc\",\"version\":\"7.17.23\"},\"log\":{\"offset\":20478,\"file\":{\"path\":\"/opt/access.log\"}},\"message\":\"109.110.162.51 - - [04/Oct/2024:10:00:41 +0800] \\\"GET / HTTP/1.1\\\" 304 0 \\\"-\\\" \\\"Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1 Edg/129.0.0.0\\\"\",\"input\":{\"type\":\"filestream\"},\"ecs\":{\"version\":\"1.12.0\"},\"host\":{\"name\":\"elk01\"},\"agent\":{\"id\":\"0f020f85-7619-4e85-83ae-346d91b5ca57\",\"name\":\"elk01\",\"type\":\"filebeat\",\"version\":\"7.17.23\",\"hostname\":\"elk01\",\"ephemeral_id\":\"ed96638d-32ee-4ee0-afbb-504643c3d52a\"}}"
}

image-20241030010834508

3. 优化logstash

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
[root@elk01:0 ~]# cat /etc/logstash/conf.d/17-kafka-to-logstash.conf
input {
kafka {
# 指定kafka集群的地址
bootstrap_servers => "10.0.0.211:9092,10.0.0.212:9092,10.0.0.213:9092"
# 指定topic列表
topics => ["linux-study"]
# 指定消费者组 (每次启动logstash,都需要换个组,因为同一个组的采集了不会再次采集,你也可以不用这个)
group_id => ["filebeat01"]
# 指定从offset开始读取数据的位置,earliest表示最早的数据开始读,latest表示从最新的位置读取。
auto_offset_reset => "earliest"
}
}

#对message字段进行json格式化
filter {
json {
source => "message"
#移除不需要的字段,不然像下面一样出现很多没有价值的字段
# remove_field => [ "input","host","agent","@version","log", "ecs" ]
}
}

output {
stdout {
}
}

#输出结果展示

"agent" => {
"name" => "elk01",
"type" => "filebeat",
"hostname" => "elk01",
"version" => "7.17.23",
"ephemeral_id" => "9a1e6795-8fe4-476b-ba6b-9d91b41f282c",
"id" => "1b86f9d4-c41a-4c30-8b3c-c921ed07d026"
},
"input" => {
"type" => "filestream"
},
"@version" => "1",
"@timestamp" => 2024-10-29T16:40:45.156Z,
"host" => {
"name" => "elk01"
},
"ecs" => {
"version" => "1.12.0"
},
"log" => {
"offset" => 27088,
"file" => {
"path" => "/opt/access.log"
}
},



#移除字段后展示
"@timestamp" => 2024-10-29T16:40:45.156Z,
"message" => "150.109.253.34 - - [04/Oct/2024:10:01:27 +0800] \"GET /app HTTP/1.1\" 404 197 \"-\" \"Mozilla/5.0 (Linux; Android 8.0.0; SM-G955U Build/R16NW) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Mobile Safari/537.36 Edg/129.0.0.0\""

4. 正常操作优化,并输出到es集群

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
[root@elk01:0 ~]# cat /etc/logstash/conf.d/17-kafka-to-logstash.conf
input {
kafka {
# 指定kafka集群的地址
bootstrap_servers => "10.0.0.211:9092,10.0.0.212:9092,10.0.0.213:9092"
# 指定topic列表
topics => ["linux-study"]
# 指定消费者组 (每次启动logstash,都需要换个组,因为同一个组的采集了不会再次采集,你也可以不用这个)
group_id => ["filebeat01"]
# 指定从offset开始读取数据的位置,earliest表示最早的数据开始读,latest表示从最新的位置读取。
auto_offset_reset => "earliest"
}
}

#对message字段进行json格式化
filter {
json {
source => "message"
#移除不需要的字段
remove_field => [ "input","host","agent","@version","log", "ecs" ]
}
# 基于正则匹配任意文本,grok内置了120种匹配模式
grok {
match => {
"message" => "%{HTTPD_COMBINEDLOG}"
}
}
useragent {
source => "agent"
target => "agent-kind"
}
geoip {
source => "clientip"
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
output {
stdout {
}
elasticsearch{
hosts => ["10.0.0.211:9200","10.0.0.212:9200","10.0.0.213:9200"]
index => "kafka-elk-%{+yyyy.MM.dd}"
user => "elastic"
password => "123456"
}
}

image-20241030013045764