logstash对接filebeat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#1.编写logstash实例
[root@elk01:1 ~]# cat /etc/logstash/conf.d/03_filebeat_input.conf
input {
beats {
port => 9999
}
}
output {
stdout {
codec => rubydebug
}
}

#启动logstash实例
[root@elk01:4 ~]# logstash -rf /etc/logstash/conf.d/03_filebeat_input.conf

#查看是否有9999端口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[root@elk01:4 ~]# cat /etc/filebeat/14-logstash.yaml 
filebeat.inputs:
- type: filestream
paths:
- /tmp/test.conf
parsers:
- ndjson:
target: ""
message_key: message

output.logstash:
hosts: ["10.0.0.211:9999"]

#2.启动filebeat实例
[root@elk01:4 ~]# filebeat -e -c /etc/filebeat/14-logstash.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#检查logstash是否有标准输出  (message字段里输出new是/tmp/test.conf里的内容,证明对接filebeat成功)
{
"host" => {
"name" => "elk01"
},
"agent" => {
"ephemeral_id" => "b923603a-d451-4c18-b9d4-f7aa0e034000",
"hostname" => "elk01",
"name" => "elk01",
"version" => "7.17.23",
"id" => "4f19d0b0-5ed5-4c21-82ef-6f001fb850b3",
"type" => "filebeat"
},
"@timestamp" => 2024-10-05T05:29:42.250Z,
"@version" => "1",
"ecs" => {
"version" => "1.12.0"
},
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"message" => "new",
"input" => {
"type" => "filestream"
},
"log" => {
"file" => {
"path" => "/tmp/test.conf"
},
"offset" => 14
}
}

filter处理字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
filter {
mutate {
remove_field => [ "@version","input","agent","tags", "log" , "host","ecs"]
}
#remove_field可以过滤不需要的顶级字段

示例:
[root@elk01:1 ~]# cat /etc/logstash/conf.d/04_filebeat_input.conf
input {
beats {
port => 9999
}
}
filter {
mutate {
remove_field => [ "@version","input","agent","tags", "log" , "host","ecs"]
}
}
output {
stdout {
codec => rubydebug
}
}

#效果如下 (如果想要message里每个字段加入到顶级字段里,详情见上一篇修改nginx配置文件)
{
"timestamp" => "2024-10-05T16:16:29+08:00",
"uri" => "/index.nginx-debian.html",
"status" => "304",
"SendBytes" => 0,
"referer" => "-",
"tcp_xff" => "-",
"@timestamp" => 2024-10-05T08:16:30.048Z,
"vhost" => "10.0.0.211",
"responsetime" => 0,
"domain" => "10.0.0.211",
"upstreamtime" => "-",
"http_host" => "10.0.0.211",
"clientip" => "10.0.0.1",
"upstreamhost" => "-",
"http_user_agent" => "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36 Edg/129.0.0.0",
"xff" => "-"
}

useragent filter插件

分析分析设备的类型及操作系统

编写logstash实例文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
[root@elk01:3 ~]# cat /etc/logstash/conf.d/05_filebeat_filter_es.yaml 
input {
beats {
port => "9999"
}
}
filter {
mutate {
remove_field => [ "@version","input","agent","tags", "log" , "host","ecs"]
}
# 分析用户的设备类型
useragent {
# 对哪个字段进行分析
source => "http_user_agent"
# 指定解析的设备信息放在哪个字段中,若不指定则默认放在顶级字段中
target => "agent-kind"
}
}

output {
stdout {
codec => rubydebug
}
#写入es集群(可以先只用上面的标准输出,测试输出无误在使用下面的写入集群)
elasticsearch{
hosts => ["10.0.0.211:9200","10.0.0.212:9200","10.0.0.213:9200"]
index => "logstash-nginx-%{+yyyy.MM.dd}"
}
}

#启动logstash实例
[root@elk01:4 ~]# logstash -rf /etc/logstash/conf.d/05_filebeat_filter_es.yaml

filebeat实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[root@elk01:4 ~]# cat /etc/filebeat/15-nginx_logstash.yaml 
filebeat.inputs:
- type: filestream
paths:
- /var/log/nginx/access.log
parsers:
- ndjson:
target: ""
message_key: message

output.logstash:
hosts: ["10.0.0.211:9999"]

#启动filebeat实例
[root@elk01:4 ~]# filebeat -e -c /etc/filebeat/15-nginx_logstash.yaml

浏览器访问:10.0.0.211:5601

image-20241005170328290

在创建索引模板时候发现多出一个时间戳字段

image-20241005170803108

时间格式处理

官方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
filter {
mutate {
remove_field => [ "@version","input","agent","tags", "log" , "host","ecs"]
}

# 分析用户的设备类型
useragent {
# 对哪个字段进行分析
source => "http_user_agent"
# 指定解析的设备信息放在哪个字段中,若不指定则默认放在顶级字段中
target => "linux_agent"
}

# 处理日期的插件 #AccessTime是用户实际访问时间
date {
# "[29/Aug/2024:08:46:59 +0000]"
match => [ "AccessTime", "[dd/MMM/yyyy:HH:mm:ss Z]" ]
# 将解析的字符串存储到指定字段,若不指定则默认覆盖"@timestamp"
target => "accesstime"
}
}

geoip filter插件

分析用户的地理位置,前提是日志里的ip需要是公网地址,可以手动更改模拟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
1.获取自己的公网地址
[root@elk01:1 ~]# curl ifconfig.io
101.82.161.6

2.修改nginx日志文件模拟公网地址访问 (替换后里面ip数字随便改改) (10.0.0.1是日志里客户端访问的ip)
[root@elk01:1 ~]# sed -i s#10.0.0.1#101.82.161.6#g /var/log/nginx/access.log

3.重启nginx
[root@elk01:1 ~]# systemctl restart nginx

4.删除filebeat采集缓存
[root@elk01:1 ~]# rm -fr /var/lib/filebeat/*

5.重新启动logstash实例和filebeat实例

image-20241005181444001

添加map数据提示不包含任何字段? logstash创建的索引的地理坐标点类型默认是float,修改为geoip.location

做一个模板映射location的type类型

image-20241005181708936

image-20241005181759172

grok插件

官方grok指导–点击直达

grok匹配支持项–github

1
2
3
4
5
6
7
8
9
#查找内置的匹配模式(以httpd为例)
[root@elk01:0 conf.d]# find / -name httpd
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.3.4/patterns/legacy/httpd

#在httpd这个匹配规则里尝试搜索clientip字段,即可找到 HTTPD_COMMONLOG 这个匹配项
[root@elk01:0 conf.d]# grep clientip /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.3.4/patterns/legacy/httpd
HTTPD_COMMONLOG %{IPORHOST:clientip} %{HTTPDUSER:ident} %{HTTPDUSER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" (?:-|%{NUMBER:response}) (?:-|%{NUMBER:bytes})

#下面就以clientip为例,展示grok匹配
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# 1.编辑logstash实例
[root@elk01:2 ~]# cat /etc/logstash/conf.d/07_filter_grok.conf
input {
beats {
port => "7777"
}
}
filter {
mutate {
remove_field => [ "@version","input","agent","tags", "log" , "host","ecs"]
}
# 基于正则匹配任意文本,grok内置了120种匹配模式
grok {
match => {
"message" => "%{HTTPD_COMBINEDLOG}"
}
}
#分析设备类型
useragent {
source => "agent"
target => "agent-kind"
}
#分析用户的经纬度,位置信息(ip要为公网ip才有效)
geoip {
source => ["clientip"]
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch{
hosts => ["10.0.0.211:9200","10.0.0.212:9200","10.0.0.213:9200"]
index => "logstash-nginx-%{+yyyy.MM.dd}"
}
}

#启动实例
[root@elk01:0 conf.d]# logstash -rf /etc/logstash/conf.d/07_filter_grok.conf


# 2.编辑filebeat实例
cat /etc/filebeat/16-nginx_logstash_grok.yaml
filebeat:
inputs:
- type: filestream
paths:
- /var/log/nginx/access.log*

output.logstash:
hosts: ["10.0.0.93:7777"]

# 启动实例
[root@elk01:1 ~]# filebeat -e -c /etc/filebeat/16-nginx_logstash_grok.yaml

输出展示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#展示结果可以看出,用户ip以clientip字段作为顶级字段,展示出ip
"timestamp" => "20/Oct/2024:16:36:12 +0800",
"clientip" => "87.120.127.103",
"verb" => "GET",
"geoip" => {
"timezone" => "Europe/Sofia",
"longitude" => 23.332,
"continent_code" => "EU",
"latitude" => 42.696,
"ip" => "87.120.127.103",
"country_name" => "Bulgaria",
"country_code3" => "BG",
"location" => {
"lon" => 23.332,
"lat" => 42.696

#分析用户的经纬度,位置信息,同时也展示出用户ip的一些信息包括时区,城市名字等
geoip {
source => ["clientip"]
}

自定义正则提取字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#自定义正则
[root@elk93 ~]# cat /tmp/parttern/xixi.re
YEAR \d{4}
school [A-z]{5}
Class [A-z]{5}\d{2}

#编写logstash实例
[root@elk93 ~]# cat /etc/logstash/conf.d/05-tcp-grok-stdout.conf
[root@elk01:1 ~]# vim /etc/logstash/conf.d/08_pattern.conf

input {
tcp {
port => 6666
}
}

filter {
grok {
# 加载自定义正则(存放自定义正则目录)
patterns_dir => ["/tmp/pattern/"]
match => {
"message" => "xixi %{YEAR:year}-%{school:school}-%{Class:class} haha"
}
}
}
output {
stdout {
codec => rubydebug
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Class [A-z]{5}\d{2}

#如果匹配规则未达到则如下(规则是Class [A-z]{5}\d{2} 字母后应该是2数字)
{
"@timestamp" => 2024-10-20T12:33:19.642Z,
"port" => 49454,
"host" => "elk01",
"@version" => "1",
"tags" => [
[0] "_grokparsefailure"
],
"message" => "xixi 2024-BJedu-Linux666 haha"
}


#达到匹配规则 xixi %{YEAR:year}-%{school:school}-%{Class:class} haha 才能如此
{
"@timestamp" => 2024-10-20T12:31:16.265Z,
"message" => "xixi 2024-BJedu-Linux01 haha",
"school" => "BJedu",
"host" => "elk01",
"year" => "2024",
"port" => 34950,
"@version" => "1",
"class" => "Linux01"
}

#输出年份是2024-2030
[root@elk01:2 ~]# for i in `seq 2024 2030`; do echo "xixi $i-BJedu-Linux01 haha" | nc 10.0.0.211 6666 & done

mutate插件

自定义日志格式提取

1. 使用python脚本创建自定义日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
[root@elk01:0 ~]# cat generate_log.py 
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author : Jason Yin

import datetime
import random
import logging
import time
import sys

LOG_FORMAT = "%(levelname)s %(asctime)s [com.linux.%(module)s] - %(message)s "
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"

# 配置root的logging.Logger实例的基本配置
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, datefmt=DATE_FORMAT, filename=sys.argv[1]
, filemode='a',)
actions = ["浏览页面", "评论商品", "加入收藏", "加入购物车", "提交订单", "使用优惠券", "领取优惠券",
"搜索", "查看订单", "付款", "清空购物车"]

while True:
time.sleep(random.randint(1, 5))
user_id = random.randint(1, 10000)
# 对生成的浮点数保留2位有效数字.
price = round(random.uniform(15000, 30000),2)
action = random.choice(actions)
svip = random.choice([0,1,2])
logging.info("DAU|{0}|{1}|{2}|{3}".format(user_id, action,svip,price))

2. 启动脚本-生成日志

1
2
3
4
5
6
7
8
9
[root@elk91 ~]# python3 generate_log.py /tmp/apps.log

#生成类似格式的日志文件
[root@elk01:1 ~]# tail -f /tmp/apps.log
INFO 2024-10-27 09:23:50 [com.linux.generate_log] - DAU|5164|领取优惠券|2|18464.33
INFO 2024-10-27 09:23:51 [com.linux.generate_log] - DAU|2615|搜索|1|21874.88
INFO 2024-10-27 09:23:56 [com.linux.generate_log] - DAU|8097|查看订单|1|23759.47
INFO 2024-10-27 09:23:57 [com.linux.generate_log] - DAU|795|加入购物车|2|21097.98
INFO 2024-10-27 09:24:00 [com.linux.generate_log] - DAU|1577|清空购物车|0|16636.14

3. 编写logstash实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
[root@elk01:2 ~]# cat /etc/logstash/conf.d/10.beats-mutate.conf 
input {
beats {
port => 5555
}
}

filter {
mutate {
remove_field => [ "@version","input","agent","tags", "log" , "host","ecs"]
}
mutate {
#分割字段
split => { "message" => "|" }
}
}

output {
stdout {
codec => rubydebug
}
}

#启动logstash实例
[root@elk01:2 ~]# logstash -rf /etc/logstash/conf.d/10.beats-mutate.conf

4. 编写filebeat实例

1
2
3
4
5
6
7
8
9
10
11
12
[root@elk01:1 ~]# cat /etc/filebeat/18-apps-to-logstash.yaml
filebeat:
inputs:
- type: filestream
paths:
- /tmp/apps.log

output.logstash:
hosts: ["10.0.0.211:7777"]

#启动实例
[root@elk01:0 ~]# filebeat -e -c /etc/filebeat/18-apps-to-logstash.yaml

5. 输出展示

1
2
3
4
5
6
7
8
9
{
"message" => [
[0] "INFO 2024-10-27 09:46:03 [com.linux.generate_log] - DAU",
[1] "3013",
[2] "评论商品",
[3] "2",
[4] "25853.85 "
],
"@timestamp" => 2024-10-27T01:58:28.229Z

添加字段

1
2
3
4
5
6
7
8
9
10
#为[0-4]添加顶级字段
{
"message" => [
[0] "INFO 2024-10-27 09:46:03 [com.linux.generate_log] - DAU",
[1] "3013",
[2] "评论商品",
[3] "2",
[4] "25853.85 "
],
"@timestamp" => 2024-10-27T01:58:28.229Z

修改logstash实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
[root@elk01:0 ~]# cat /etc/logstash/conf.d/10.beats-mutate.conf 
input {
beats {
port => 5555
}
}

filter {
mutate {
remove_field => [ "@version","input","agent","tags", "log" , "host","ecs"]
}
mutate {
#分割message字段,以 | 为分隔符
split => { "message" => "|" }
#添加字段
add_field => {
"other" => "%{[message][0]}"
"userId" => "%{[message][1]}"
"action" => "%{[message][2]}"
"svip" => "%{[message][3]}"
"price" => "%{[message][4]}"
}
}
}

output {
stdout {
codec => rubydebug
}
}

输出展示

1
2
3
4
5
      "svip" => "2",
"other" => "INFO 2024-10-27 09:46:03 [com.linux.generate_log] - DAU",
"@timestamp" => 2024-10-27T02:17:55.199Z,
"userId" => "3013",
"action" => "评论商品"

多次切割

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
我们发现某个字段内容多(例如本案例的other字段),可实现多次切割输出我们需要的字段
"svip" => "2",
"other" => "INFO 2024-10-27 09:46:03 [com.linux.generate_log] - DAU",
"@timestamp" => 2024-10-27T02:17:55.199Z,
"userId" => "3013",
"action" => "评论商品"

对other字段切割,以空格为分隔符,输出如下
"other" => [
[0] "INFO",
[1] "2024-10-27",
[2] "10:46:33",
[3] "[com.linux.generate_log]",
[4] "-",
[5] "DAU"

#修改logstash实例,在filter里再次分割,将时间和网站信息都放在顶级字段里
mutate {
#分割other字段,空格为分隔符
split => { "other" => " " }
add_field => {
#将日期和时间添加到datetime字段中
"datetime" => "%{[other][1]} %{[other][2]}"
#将访问的网站添加到www字段中
"www" => "%{[other][3]}"
}
}

输出展示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
        "userId" => "9928",
"www" => "[com.linux.generate_log]",
"datetime" => "2024-10-27 10:58:19",
"action" => "领取优惠券",
"svip" => "2",
"other" => [
[0] "INFO",
[1] "2024-10-27",
[2] "10:58:19",
[3] "[com.linux.generate_log]",
[4] "-",
[5] "DAU"

#再次多次切割实现网页字段没有 [ ]
mutate {
split => {"www" => "["}
add_field => {
"www2" => "%{[www][1]}"
}
}

mutate {
split => {"www2" =>"]"}
add_field => {
website => "%{[www2][0]}"
}
}


#输出展示
"userId" => "3837",
"www2" => [
[0] "com.linux.generate_log"
],
"action" => "浏览页面",
"svip" => "0",
"website" => "com.linux.generate_log",
"@timestamp" => 2024-10-27T03:03:39.620Z
}
{
"www" => [
[0] "",
[1] "com.linux.generate_log]"
],
"datetime" => "2024-10-27 11:03:40",
"other" => [
[0] "INFO",
[1] "2024-10-27",
[2] "11:03:40",
[3] "[com.linux.generate_log]",
[4] "-",
[5] "DAU"
],
"price" => "16570.26 ",
"message" => [
[0] "INFO 2024-10-27 11:03:40 [com.linux.generate_log] - DAU",
[1] "8129",
[2] "使用优惠券",
[3] "1",
[4] "16570.26 "
],

删除多余字段

移除如message,other,www等字段(已经都做了提取切割,所有不需要再展示了)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#修改logstash实例
mutate {
remove_field => [ "message","other","www","www2"]
}

#最终效果
{
"datetime" => "2024-10-27 11:09:12",
"price" => "18005.57 ",
"userId" => "5897",
"action" => "提交订单",
"svip" => "1",
"website" => "com.linux.generate_log",
"@timestamp" => 2024-10-27T03:09:13.875Z

最终logstash实例,并输出到ES集群

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
[root@elk01:2 ~]# cat /etc/logstash/conf.d/10.beats-mutate.conf 
input {
beats {
port => 5555
}
}

filter {
mutate {
remove_field => [ "@version","input","agent","tags", "log" , "host","ecs"]
}
mutate {
#分割message字段(以|为分隔符)
split => { "message" => "|" }

#添加字段
add_field => {
"other" => "%{[message][0]}"
"userId" => "%{[message][1]}"
"action" => "%{[message][2]}"
"svip" => "%{[message][3]}"
"price" => "%{[message][4]}"
}
}
mutate {
#分割other字段,空格为分隔符
split => { "other" => " " }
add_field => {
"datetime" => "%{[other][1]} %{[other][2]}"
"www" => "%{[other][3]}"
}
}
mutate {
split => {"www" => "["}
add_field => {
"www2" => "%{[www][1]}"
}
}

mutate {
split => {"www2" =>"]"}
add_field => {
website => "%{[www2][0]}"
}
}
mutate {
#转换类型
convert => {
#将字段price类型转为浮点型(默认是字符串,在ES集群里kibana出图无法做到求和等操作)
"price" => "float"
}
}
#移除多余字段
mutate {
remove_field => [ "message","other","www","www2"]
}
}

output {
stdout {
codec => rubydebug
}
elasticsearch{
hosts => ["10.0.0.211:9200","10.0.0.212:9200","10.0.0.213:9200"]
index => "auto_logs-apps-%{+yyyy.MM.dd}"
}
}

logstash多分支语句

1. 多分枝语句 --基础类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
[root@elk01:2 ~]# cat /etc/logstash/conf.d/11-multiple_input.conf 
input {
beats {
port => 5555
type => beats
}

tcp {
port => 6666
type => tcp
}

file {
path => ["/tmp/luay.log"]
start_position => "beginning"
type => file
}
}

output {
stdout {
codec => rubydebug
}
}

#测试
[root@elk01:0 ~]# echo 999 |nc 10.0.0.211 6666
[root@elk01:2 ~]# echo 7989 >/tmp/luay.log

#输出
{
"@version" => "1",
"type" => "tcp",
"port" => 56998,
"@timestamp" => 2024-10-27T08:53:35.394Z,
"host" => "elk01",
"message" => "999"
}
{
"@version" => "1",
"type" => "file",
"path" => "/tmp/luay.log",
"@timestamp" => 2024-10-27T08:54:44.088Z,
"host" => "elk01",
"message" => "7989"
}

2. 多分枝语句 --复杂类形

官方案例

添加 if 判断

input多分支

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
'要求'
1.删除tcp类型的version和port字段
2.删除file类型的version和host字段,将path字段改为filepath字段
3.beat类型同案例10.beats-mutate.conf(往上翻)

[root@elk01:2 ~]# cat /etc/logstash/conf.d/11-multiple_input.conf
input {
beats {
port => 5555
type => beats
}

tcp {
port => 6666
type => tcp
}

file {
path => ["/tmp/luay.log"]
start_position => "beginning"
type => file
}
}
filter {
if [type] == "tcp" {
mutate {
remove_field => [ "@version","port" ]
}
}else if [type] == "file" {
mutate {
rename => { "path" => "filepath" }
remove_field => [ "@version","host" ]
}
}else {
mutate {
split => {"message" => "|"}
add_field => {
"userId" => "%{[message][1]}"
"action" => "%{[message][2]}"
"svip" => "%{[message][3]}"
"price" => "%{[message][4]}"
}
}
}
mutate {
remove_field => [ "@version","input","agent","tags", "log" , "host","ecs","message"]
}
}

output {
stdout {
codec => rubydebug
}
}

#测试
[root@elk01:0 ~]# echo 999999999 >>/tmp/luay.log
[root@elk01:0 ~]# echo 11111111 |nc 10.0.0.211 6666
[root@elk01:0 ~]# filebeat -e -c /etc/filebeat/18-apps-to-logstash.yaml

#输出展示
{
"filepath" => "/tmp/luay.log",
"type" => "file",
"@timestamp" => 2024-10-27T09:22:11.843Z
}
{
"type" => "tcp",
"@timestamp" => 2024-10-27T09:22:28.519Z
{
"svip" => "0",
"action" => "加入收藏",
"type" => "beats",
"@timestamp" => 2024-10-27T09:23:57.599Z,
"price" => "27975.99 ",
"userId" => "6505"
}

output 多分支

输出es集群,多分支索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
[root@elk01:2 ~]# cat /etc/logstash/conf.d/11-multiple_input.conf 
input {
beats {
port => 5555
type => beats
}

tcp {
port => 6666
type => tcp
}

file {
path => ["/tmp/luay.log"]
start_position => "beginning"
type => file
}
}
filter {
if [type] == "tcp" {
mutate {
remove_field => [ "@version","port" ]
}
}else if [type] == "file" {
mutate {
rename => { "path" => "filepath" }
remove_field => [ "@version","host" ]
}
}else {
mutate {
split => {"message" => "|"}
add_field => {
"userId" => "%{[message][1]}"
"action" => "%{[message][2]}"
"svip" => "%{[message][3]}"
"price" => "%{[message][4]}"
}
}
}
mutate {
remove_field => [ "@version","input","agent","tags", "log" , "host","ecs","message"]
}
}

output {
#stdout {
# codec => rubydebug
#}

if [type] == "tcp" {
elasticsearch{
hosts => ["10.0.0.211:9200","10.0.0.212:9200","10.0.0.213:9200"]
index => "logstash-if-tcp-%{+yyyy.MM.dd}"
}
} else if [type] == "file" {
elasticsearch{
hosts => ["10.0.0.211:9200","10.0.0.212:9200","10.0.0.213:9200"]
index => "logstash-if-file-%{+yyyy.MM.dd}"
}
} else {
elasticsearch{
hosts => ["10.0.0.211:9200","10.0.0.212:9200","10.0.0.213:9200"]
index => "logstash-if-apps-%{+yyyy.MM.dd}"
}
}
}

image-20241027174748739

实操

将/var/log/syslog,/var/log/auth.log,/var/log/vmware-vmsvc*.log这三文件写入ES不同的索引,要求使用if多分支语句完成。

  1. 要求写入ES索引分别带有syslog,auth,vmware等关键词
  2. 含syslog关键词索引要求分片数量为3,副本数量为0
  3. 含auth关键词索引要求分片数量为5,副本数量为0
  4. 含vmware关键词索引要求分片数量为8,副本数量为0

1. 编写logstash实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
[root@elk01:1 ~]# cat /etc/logstash/conf.d/12-if_test.conf
input {
file {
path => ["/var/log/syslog"]
start_position => "beginning"
type => syslog
}

file {
path => ["/var/log/auth.log"]
start_position => "beginning"
type => auth
}


file {
path => ["/var/log/vmware-vmsvc*.log"]
start_position => "beginning"
type => vmware
}
}

output {
stdout {
codec => rubydebug
}

if [type] == "syslog" {
elasticsearch{
hosts => ["10.0.0.211:9200","10.0.0.212:9200","10.0.0.213:9200"]
index => "linux-syslog-%{+yyyy.MM.dd}"
}
} else if [type] == "auth" {
elasticsearch{
hosts => ["10.0.0.211:9200","10.0.0.212:9200","10.0.0.213:9200"]
index => "linux-auth-%{+yyyy.MM.dd}"
}
} else {
elasticsearch{
hosts => ["10.0.0.211:9200","10.0.0.212:9200","10.0.0.213:9200"]
index => "linux-vmware-%{+yyyy.MM.dd}"
}
}
}

2. 设置单独副本

索引管理–>索引模板–>创建索引模板

3分片0副本

image-20241027180916483

image-20241027181113679

“number_of_replicas”:0,
“number_of_shards”:3

其它两个同理如上操作

3. 启动logstash实例

1
2
3
[root@elk01:1 ~]# logstash -rf /etc/logstash/conf.d/12-if_test.conf

#注意:logstash采集文件也是有缓存的,重新采集记得删除缓存

image-20241027181807970

logstash的pipeline语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
'Logstash的pipeline,多实例和多分支语句对比'
1.多分支语句的优缺点
优点:
可以将多个input写入到同一个配置文件,运行一个Logstash实例,更加轻量级。
缺点:
修改一个配置就算是热加载配置,如果修改配置出错,则导致整个Logstash无法继续运行,会影响到其他input。
2.logstash的多实例
优点:
每个实例互不影响,没有太强的逻辑性,理解简单。
缺点:
需要管理多个数据目录,相对而言更加重量级。
3.pipeline
优点:
轻量级,一个Logstash实例,轻量级,相对而言配置没有那么复杂,而是将配置文件拆分成多个文件的思路。
每个文件都有独立的input,filter和output,我们将其分为多个不同的pipeline。
无需使用-f选项指定配置文件,而是直接启动Logstash,Logstash会自动加载pipeline文件。
缺点:
几乎完美,当Logstash挂掉,会影响到所有的配置。

logstash-pipeline.drawio

准备文件

1. tcp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
[root@elk01:1 ~]# cat /etc/logstash/conf.d/13-pipeline_tcp.conf
input {
tcp {
port => 6666
}
}

filter {
mutate {
add_field => {
"address" => "%{host}:%{port}"
}
remove_field => [ "@version", "host","port"]
}
}

output {
elasticsearch{
hosts => ["10.0.0.211:9200","10.0.0.212:9200","10.0.0.213:9200"]
index => "logstash-pipeline-tcp-%{+yyyy.MM.dd}"
}
}

2. file

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[root@elk01:1 ~]# cat /etc/logstash/conf.d/14-pipeline_file.conf
input {
file {
path => ["/tmp/luay.log"]
start_position => "beginning"
}
}

filter {
mutate {
rename => { "path" => "filepath" }
}
}

output {
elasticsearch{
hosts => ["10.0.0.211:9200","10.0.0.212:9200","10.0.0.213:9200"]
index => "logstash-pipeline-file-%{+yyyy.MM.dd}"
}
}

3. beats

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
[root@elk01:1 ~]# cat /etc/logstash/conf.d/15-pipeline_beat.conf
input {
beats {
port => 5555
}
}

filter {
mutate {
split => {"message" => "|"}

add_field => {
"userId" => "%{[message][1]}"
"action" => "%{[message][2]}"
"svip" => "%{[message][3]}"
"price" => "%{[message][4]}"
}
}

mutate {
remove_field => [ "@version","input","agent","tags", "log" , "host","ecs","message"]
}
}

output {
elasticsearch{
hosts => ["10.0.0.211:9200","10.0.0.212:9200","10.0.0.213:9200"]
index => "logstash-pipeline-apps-%{+yyyy.MM.dd}"
}
}

#filebeat实例(log文件自备)
[root@elk01:0 ~]# cat /etc/filebeat/18-apps-to-logstash.yaml
filebeat:
inputs:
- type: filestream
paths:
- /tmp/apps.log

output.logstash:
hosts: ["10.0.0.211:5555"]

修改pipeline文件

1
2
3
4
5
6
7
8
9
10
[root@elk01:1 ~]# cat /etc/logstash/pipelines.yml 
···
- pipeline.id: tcp-pipeline
path.config: "/etc/logstash/conf.d/13-pipeline_tcp.conf"
- pipeline.id: file-pipeline
path.config: "/etc/logstash/conf.d/14-pipeline_file.conf"
- pipeline.id: beat-pipeline
path.config: "/etc/logstash/conf.d/15-pipeline_beat.conf"

# pipeline.id 名字可自定义

创建软链接文件

1
2
3
4
[root@elk01:1 ~]# mkdir /usr/share/logstash/config

[root@elk01:1 ~]# ln -svf /etc/logstash/pipelines.yml /usr/share/logstash/config/pipelines.yml
'/usr/share/logstash/config/pipelines.yml' -> '/etc/logstash/pipelines.yml'

启动logstash

1
2
3
4
5
6
7
8
9
10
[root@elk01:1 ~]# logstash -r

#注意:pipeline语法 启动时候不需要指定文件



#写入数据
[root@elk01:0 ~]# echo 898989 |nc 10.0.0.211 6666
[root@elk01:0 ~]# echo 9900 >/tmp/luay.log
[root@elk01:0 ~]# filebeat -e -c /etc/filebeat/18-apps-to-logstash.yaml

image-20241027185825270

logstash-pipeline.drawio