logstash对接filebeat
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| [root@elk01:1 ~]# cat /etc/logstash/conf.d/03_filebeat_input.conf input { beats { port => 9999 } } output { stdout { codec => rubydebug } }
[root@elk01:4 ~]# logstash -rf /etc/logstash/conf.d/03_filebeat_input.conf
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| [root@elk01:4 ~]# cat /etc/filebeat/14-logstash.yaml filebeat.inputs: - type: filestream paths: - /tmp/test.conf parsers: - ndjson: target: "" message_key: message
output.logstash: hosts: ["10.0.0.211:9999"]
[root@elk01:4 ~]# filebeat -e -c /etc/filebeat/14-logstash.yaml
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| { "host" => { "name" => "elk01" }, "agent" => { "ephemeral_id" => "b923603a-d451-4c18-b9d4-f7aa0e034000", "hostname" => "elk01", "name" => "elk01", "version" => "7.17.23", "id" => "4f19d0b0-5ed5-4c21-82ef-6f001fb850b3", "type" => "filebeat" }, "@timestamp" => 2024-10-05T05:29:42.250Z, "@version" => "1", "ecs" => { "version" => "1.12.0" }, "tags" => [ [0] "beats_input_codec_plain_applied" ], "message" => "new", "input" => { "type" => "filestream" }, "log" => { "file" => { "path" => "/tmp/test.conf" }, "offset" => 14 } }
|
filter处理字段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| filter { mutate { remove_field => [ "@version","input","agent","tags", "log" , "host","ecs"] }
示例: [root@elk01:1 ~]# cat /etc/logstash/conf.d/04_filebeat_input.conf input { beats { port => 9999 } } filter { mutate { remove_field => [ "@version","input","agent","tags", "log" , "host","ecs"] } } output { stdout { codec => rubydebug } }
{ "timestamp" => "2024-10-05T16:16:29+08:00", "uri" => "/index.nginx-debian.html", "status" => "304", "SendBytes" => 0, "referer" => "-", "tcp_xff" => "-", "@timestamp" => 2024-10-05T08:16:30.048Z, "vhost" => "10.0.0.211", "responsetime" => 0, "domain" => "10.0.0.211", "upstreamtime" => "-", "http_host" => "10.0.0.211", "clientip" => "10.0.0.1", "upstreamhost" => "-", "http_user_agent" => "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36 Edg/129.0.0.0", "xff" => "-" }
|
useragent filter插件
分析分析设备的类型及操作系统
编写logstash实例文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| [root@elk01:3 ~]# cat /etc/logstash/conf.d/05_filebeat_filter_es.yaml input { beats { port => "9999" } } filter { mutate { remove_field => [ "@version","input","agent","tags", "log" , "host","ecs"] } useragent { source => "http_user_agent" target => "agent-kind" } }
output { stdout { codec => rubydebug } elasticsearch{ hosts => ["10.0.0.211:9200","10.0.0.212:9200","10.0.0.213:9200"] index => "logstash-nginx-%{+yyyy.MM.dd}" } }
[root@elk01:4 ~]# logstash -rf /etc/logstash/conf.d/05_filebeat_filter_es.yaml
|
filebeat实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| [root@elk01:4 ~]# cat /etc/filebeat/15-nginx_logstash.yaml filebeat.inputs: - type: filestream paths: - /var/log/nginx/access.log parsers: - ndjson: target: "" message_key: message
output.logstash: hosts: ["10.0.0.211:9999"]
[root@elk01:4 ~]# filebeat -e -c /etc/filebeat/15-nginx_logstash.yaml
|
浏览器访问:10.0.0.211:5601
在创建索引模板时候发现多出一个时间戳字段
时间格式处理
官方
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| filter { mutate { remove_field => [ "@version","input","agent","tags", "log" , "host","ecs"] }
useragent { source => "http_user_agent" target => "linux_agent" }
date { match => [ "AccessTime", "[dd/MMM/yyyy:HH:mm:ss Z]" ] target => "accesstime" } }
|
geoip filter插件
分析用户的地理位置,前提是日志里的ip需要是公网地址,可以手动更改模拟
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| 1.获取自己的公网地址 [root@elk01:1 ~]# curl ifconfig.io 101.82.161.6
2.修改nginx日志文件模拟公网地址访问 (替换后里面ip数字随便改改) (10.0.0.1是日志里客户端访问的ip) [root@elk01:1 ~]# sed -i s#10.0.0.1#101.82.161.6#g /var/log/nginx/access.log
3.重启nginx [root@elk01:1 ~]# systemctl restart nginx
4.删除filebeat采集缓存 [root@elk01:1 ~]# rm -fr /var/lib/filebeat/*
5.重新启动logstash实例和filebeat实例
|
添加map数据提示不包含任何字段? logstash创建的索引的地理坐标点类型默认是float,修改为geoip.location
做一个模板映射location的type类型
grok插件
官方grok指导–点击直达
grok匹配支持项–github
1 2 3 4 5 6 7 8 9
| [root@elk01:0 conf.d]# find / -name httpd /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.3.4/patterns/legacy/httpd
[root@elk01:0 conf.d]# grep clientip /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.3.4/patterns/legacy/httpd HTTPD_COMMONLOG %{IPORHOST:clientip} %{HTTPDUSER:ident} %{HTTPDUSER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" (?:-|%{NUMBER:response}) (?:-|%{NUMBER:bytes})
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| [root@elk01:2 ~]# cat /etc/logstash/conf.d/07_filter_grok.conf input { beats { port => "7777" } } filter { mutate { remove_field => [ "@version","input","agent","tags", "log" , "host","ecs"] } grok { match => { "message" => "%{HTTPD_COMBINEDLOG}" } } useragent { source => "agent" target => "agent-kind" } geoip { source => ["clientip"] } } output { stdout { codec => rubydebug } elasticsearch{ hosts => ["10.0.0.211:9200","10.0.0.212:9200","10.0.0.213:9200"] index => "logstash-nginx-%{+yyyy.MM.dd}" } }
[root@elk01:0 conf.d]# logstash -rf /etc/logstash/conf.d/07_filter_grok.conf
cat /etc/filebeat/16-nginx_logstash_grok.yaml filebeat: inputs: - type: filestream paths: - /var/log/nginx/access.log*
output.logstash: hosts: ["10.0.0.93:7777"]
[root@elk01:1 ~]# filebeat -e -c /etc/filebeat/16-nginx_logstash_grok.yaml
|
输出展示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| "timestamp" => "20/Oct/2024:16:36:12 +0800", "clientip" => "87.120.127.103", "verb" => "GET", "geoip" => { "timezone" => "Europe/Sofia", "longitude" => 23.332, "continent_code" => "EU", "latitude" => 42.696, "ip" => "87.120.127.103", "country_name" => "Bulgaria", "country_code3" => "BG", "location" => { "lon" => 23.332, "lat" => 42.696 geoip { source => ["clientip"] }
|
自定义正则提取字段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| [root@elk93 ~]# cat /tmp/parttern/xixi.re YEAR \d{4} school [A-z]{5} Class [A-z]{5}\d{2}
[root@elk93 ~]# cat /etc/logstash/conf.d/05-tcp-grok-stdout.conf [root@elk01:1 ~]# vim /etc/logstash/conf.d/08_pattern.conf
input { tcp { port => 6666 } }
filter { grok { patterns_dir => ["/tmp/pattern/"] match => { "message" => "xixi %{YEAR:year}-%{school:school}-%{Class:class} haha" } } } output { stdout { codec => rubydebug } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| Class [A-z]{5}\d{2}
{ "@timestamp" => 2024-10-20T12:33:19.642Z, "port" => 49454, "host" => "elk01", "@version" => "1", "tags" => [ [0] "_grokparsefailure" ], "message" => "xixi 2024-BJedu-Linux666 haha" }
{ "@timestamp" => 2024-10-20T12:31:16.265Z, "message" => "xixi 2024-BJedu-Linux01 haha", "school" => "BJedu", "host" => "elk01", "year" => "2024", "port" => 34950, "@version" => "1", "class" => "Linux01" }
[root@elk01:2 ~]# for i in `seq 2024 2030`; do echo "xixi $i-BJedu-Linux01 haha" | nc 10.0.0.211 6666 & done
|
mutate插件
自定义日志格式提取
1. 使用python脚本创建自定义日志
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| [root@elk01:0 ~]# cat generate_log.py
import datetime import random import logging import time import sys
LOG_FORMAT = "%(levelname)s %(asctime)s [com.linux.%(module)s] - %(message)s " DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, datefmt=DATE_FORMAT, filename=sys.argv[1] , filemode='a',) actions = ["浏览页面", "评论商品", "加入收藏", "加入购物车", "提交订单", "使用优惠券", "领取优惠券", "搜索", "查看订单", "付款", "清空购物车"]
while True: time.sleep(random.randint(1, 5)) user_id = random.randint(1, 10000) price = round(random.uniform(15000, 30000),2) action = random.choice(actions) svip = random.choice([0,1,2]) logging.info("DAU|{0}|{1}|{2}|{3}".format(user_id, action,svip,price))
|
2. 启动脚本-生成日志
1 2 3 4 5 6 7 8 9
| [root@elk91 ~]# python3 generate_log.py /tmp/apps.log
[root@elk01:1 ~]# tail -f /tmp/apps.log INFO 2024-10-27 09:23:50 [com.linux.generate_log] - DAU|5164|领取优惠券|2|18464.33 INFO 2024-10-27 09:23:51 [com.linux.generate_log] - DAU|2615|搜索|1|21874.88 INFO 2024-10-27 09:23:56 [com.linux.generate_log] - DAU|8097|查看订单|1|23759.47 INFO 2024-10-27 09:23:57 [com.linux.generate_log] - DAU|795|加入购物车|2|21097.98 INFO 2024-10-27 09:24:00 [com.linux.generate_log] - DAU|1577|清空购物车|0|16636.14
|
3. 编写logstash实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| [root@elk01:2 ~]# cat /etc/logstash/conf.d/10.beats-mutate.conf input { beats { port => 5555 } }
filter { mutate { remove_field => [ "@version","input","agent","tags", "log" , "host","ecs"] } mutate { split => { "message" => "|" } } }
output { stdout { codec => rubydebug } }
[root@elk01:2 ~]# logstash -rf /etc/logstash/conf.d/10.beats-mutate.conf
|
4. 编写filebeat实例
1 2 3 4 5 6 7 8 9 10 11 12
| [root@elk01:1 ~]# cat /etc/filebeat/18-apps-to-logstash.yaml filebeat: inputs: - type: filestream paths: - /tmp/apps.log
output.logstash: hosts: ["10.0.0.211:7777"]
[root@elk01:0 ~]# filebeat -e -c /etc/filebeat/18-apps-to-logstash.yaml
|
5. 输出展示
1 2 3 4 5 6 7 8 9
| { "message" => [ [0] "INFO 2024-10-27 09:46:03 [com.linux.generate_log] - DAU", [1] "3013", [2] "评论商品", [3] "2", [4] "25853.85 " ], "@timestamp" => 2024-10-27T01:58:28.229Z
|
添加字段
1 2 3 4 5 6 7 8 9 10
| { "message" => [ [0] "INFO 2024-10-27 09:46:03 [com.linux.generate_log] - DAU", [1] "3013", [2] "评论商品", [3] "2", [4] "25853.85 " ], "@timestamp" => 2024-10-27T01:58:28.229Z
|
修改logstash实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| [root@elk01:0 ~]# cat /etc/logstash/conf.d/10.beats-mutate.conf input { beats { port => 5555 } }
filter { mutate { remove_field => [ "@version","input","agent","tags", "log" , "host","ecs"] } mutate { split => { "message" => "|" } add_field => { "other" => "%{[message][0]}" "userId" => "%{[message][1]}" "action" => "%{[message][2]}" "svip" => "%{[message][3]}" "price" => "%{[message][4]}" } } }
output { stdout { codec => rubydebug } }
|
输出展示
1 2 3 4 5
| "svip" => "2", "other" => "INFO 2024-10-27 09:46:03 [com.linux.generate_log] - DAU", "@timestamp" => 2024-10-27T02:17:55.199Z, "userId" => "3013", "action" => "评论商品"
|
多次切割
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| 我们发现某个字段内容多(例如本案例的other字段),可实现多次切割输出我们需要的字段 "svip" => "2", "other" => "INFO 2024-10-27 09:46:03 [com.linux.generate_log] - DAU", "@timestamp" => 2024-10-27T02:17:55.199Z, "userId" => "3013", "action" => "评论商品"
对other字段切割,以空格为分隔符,输出如下 "other" => [ [0] "INFO", [1] "2024-10-27", [2] "10:46:33", [3] "[com.linux.generate_log]", [4] "-", [5] "DAU"
mutate { split => { "other" => " " } add_field => { "datetime" => "%{[other][1]} %{[other][2]}" "www" => "%{[other][3]}" } }
|
输出展示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| "userId" => "9928", "www" => "[com.linux.generate_log]", "datetime" => "2024-10-27 10:58:19", "action" => "领取优惠券", "svip" => "2", "other" => [ [0] "INFO", [1] "2024-10-27", [2] "10:58:19", [3] "[com.linux.generate_log]", [4] "-", [5] "DAU"
mutate { split => {"www" => "["} add_field => { "www2" => "%{[www][1]}" } }
mutate { split => {"www2" =>"]"} add_field => { website => "%{[www2][0]}" } }
"userId" => "3837", "www2" => [ [0] "com.linux.generate_log" ], "action" => "浏览页面", "svip" => "0", "website" => "com.linux.generate_log", "@timestamp" => 2024-10-27T03:03:39.620Z } { "www" => [ [0] "", [1] "com.linux.generate_log]" ], "datetime" => "2024-10-27 11:03:40", "other" => [ [0] "INFO", [1] "2024-10-27", [2] "11:03:40", [3] "[com.linux.generate_log]", [4] "-", [5] "DAU" ], "price" => "16570.26 ", "message" => [ [0] "INFO 2024-10-27 11:03:40 [com.linux.generate_log] - DAU", [1] "8129", [2] "使用优惠券", [3] "1", [4] "16570.26 " ],
|
删除多余字段
移除如message,other,www等字段(已经都做了提取切割,所有不需要再展示了)
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| mutate { remove_field => [ "message","other","www","www2"] }
{ "datetime" => "2024-10-27 11:09:12", "price" => "18005.57 ", "userId" => "5897", "action" => "提交订单", "svip" => "1", "website" => "com.linux.generate_log", "@timestamp" => 2024-10-27T03:09:13.875Z
|
最终logstash实例,并输出到ES集群
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| [root@elk01:2 ~]# cat /etc/logstash/conf.d/10.beats-mutate.conf input { beats { port => 5555 } }
filter { mutate { remove_field => [ "@version","input","agent","tags", "log" , "host","ecs"] } mutate { split => { "message" => "|" }
add_field => { "other" => "%{[message][0]}" "userId" => "%{[message][1]}" "action" => "%{[message][2]}" "svip" => "%{[message][3]}" "price" => "%{[message][4]}" } } mutate { split => { "other" => " " } add_field => { "datetime" => "%{[other][1]} %{[other][2]}" "www" => "%{[other][3]}" } } mutate { split => {"www" => "["} add_field => { "www2" => "%{[www][1]}" } }
mutate { split => {"www2" =>"]"} add_field => { website => "%{[www2][0]}" } } mutate { convert => { "price" => "float" } } mutate { remove_field => [ "message","other","www","www2"] } }
output { stdout { codec => rubydebug } elasticsearch{ hosts => ["10.0.0.211:9200","10.0.0.212:9200","10.0.0.213:9200"] index => "auto_logs-apps-%{+yyyy.MM.dd}" } }
|
logstash多分支语句
1. 多分枝语句 --基础类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| [root@elk01:2 ~]# cat /etc/logstash/conf.d/11-multiple_input.conf input { beats { port => 5555 type => beats }
tcp { port => 6666 type => tcp }
file { path => ["/tmp/luay.log"] start_position => "beginning" type => file } }
output { stdout { codec => rubydebug } }
[root@elk01:0 ~]# echo 999 |nc 10.0.0.211 6666 [root@elk01:2 ~]# echo 7989 >/tmp/luay.log
{ "@version" => "1", "type" => "tcp", "port" => 56998, "@timestamp" => 2024-10-27T08:53:35.394Z, "host" => "elk01", "message" => "999" } { "@version" => "1", "type" => "file", "path" => "/tmp/luay.log", "@timestamp" => 2024-10-27T08:54:44.088Z, "host" => "elk01", "message" => "7989" }
|
2. 多分枝语句 --复杂类形
官方案例
添加 if 判断
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| '要求': 1.删除tcp类型的version和port字段 2.删除file类型的version和host字段,将path字段改为filepath字段 3.beat类型同案例10.beats-mutate.conf(往上翻) [root@elk01:2 ~]# cat /etc/logstash/conf.d/11-multiple_input.conf input { beats { port => 5555 type => beats }
tcp { port => 6666 type => tcp }
file { path => ["/tmp/luay.log"] start_position => "beginning" type => file } } filter { if [type] == "tcp" { mutate { remove_field => [ "@version","port" ] } }else if [type] == "file" { mutate { rename => { "path" => "filepath" } remove_field => [ "@version","host" ] } }else { mutate { split => {"message" => "|"} add_field => { "userId" => "%{[message][1]}" "action" => "%{[message][2]}" "svip" => "%{[message][3]}" "price" => "%{[message][4]}" } } } mutate { remove_field => [ "@version","input","agent","tags", "log" , "host","ecs","message"] } }
output { stdout { codec => rubydebug } }
[root@elk01:0 ~]# echo 999999999 >>/tmp/luay.log [root@elk01:0 ~]# echo 11111111 |nc 10.0.0.211 6666 [root@elk01:0 ~]# filebeat -e -c /etc/filebeat/18-apps-to-logstash.yaml
{ "filepath" => "/tmp/luay.log", "type" => "file", "@timestamp" => 2024-10-27T09:22:11.843Z } { "type" => "tcp", "@timestamp" => 2024-10-27T09:22:28.519Z { "svip" => "0", "action" => "加入收藏", "type" => "beats", "@timestamp" => 2024-10-27T09:23:57.599Z, "price" => "27975.99 ", "userId" => "6505" }
|
output 多分支
输出es集群,多分支索引
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| [root@elk01:2 ~]# cat /etc/logstash/conf.d/11-multiple_input.conf input { beats { port => 5555 type => beats }
tcp { port => 6666 type => tcp }
file { path => ["/tmp/luay.log"] start_position => "beginning" type => file } } filter { if [type] == "tcp" { mutate { remove_field => [ "@version","port" ] } }else if [type] == "file" { mutate { rename => { "path" => "filepath" } remove_field => [ "@version","host" ] } }else { mutate { split => {"message" => "|"} add_field => { "userId" => "%{[message][1]}" "action" => "%{[message][2]}" "svip" => "%{[message][3]}" "price" => "%{[message][4]}" } } } mutate { remove_field => [ "@version","input","agent","tags", "log" , "host","ecs","message"] } }
output {
if [type] == "tcp" { elasticsearch{ hosts => ["10.0.0.211:9200","10.0.0.212:9200","10.0.0.213:9200"] index => "logstash-if-tcp-%{+yyyy.MM.dd}" } } else if [type] == "file" { elasticsearch{ hosts => ["10.0.0.211:9200","10.0.0.212:9200","10.0.0.213:9200"] index => "logstash-if-file-%{+yyyy.MM.dd}" } } else { elasticsearch{ hosts => ["10.0.0.211:9200","10.0.0.212:9200","10.0.0.213:9200"] index => "logstash-if-apps-%{+yyyy.MM.dd}" } } }
|
实操
将/var/log/syslog,/var/log/auth.log,/var/log/vmware-vmsvc*.log这三文件写入ES不同的索引,要求使用if多分支语句完成。
- 要求写入ES索引分别带有syslog,auth,vmware等关键词
- 含syslog关键词索引要求分片数量为3,副本数量为0
- 含auth关键词索引要求分片数量为5,副本数量为0
- 含vmware关键词索引要求分片数量为8,副本数量为0
1. 编写logstash实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| [root@elk01:1 ~]# cat /etc/logstash/conf.d/12-if_test.conf input { file { path => ["/var/log/syslog"] start_position => "beginning" type => syslog }
file { path => ["/var/log/auth.log"] start_position => "beginning" type => auth }
file { path => ["/var/log/vmware-vmsvc*.log"] start_position => "beginning" type => vmware } }
output { stdout { codec => rubydebug }
if [type] == "syslog" { elasticsearch{ hosts => ["10.0.0.211:9200","10.0.0.212:9200","10.0.0.213:9200"] index => "linux-syslog-%{+yyyy.MM.dd}" } } else if [type] == "auth" { elasticsearch{ hosts => ["10.0.0.211:9200","10.0.0.212:9200","10.0.0.213:9200"] index => "linux-auth-%{+yyyy.MM.dd}" } } else { elasticsearch{ hosts => ["10.0.0.211:9200","10.0.0.212:9200","10.0.0.213:9200"] index => "linux-vmware-%{+yyyy.MM.dd}" } } }
|
2. 设置单独副本
索引管理–>索引模板–>创建索引模板
3分片0副本
“number_of_replicas”:0,
“number_of_shards”:3
其它两个同理如上操作
3. 启动logstash实例
1 2 3
| [root@elk01:1 ~]# logstash -rf /etc/logstash/conf.d/12-if_test.conf
|
logstash的pipeline语句
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| 'Logstash的pipeline,多实例和多分支语句对比' 1.多分支语句的优缺点 优点: 可以将多个input写入到同一个配置文件,运行一个Logstash实例,更加轻量级。 缺点: 修改一个配置就算是热加载配置,如果修改配置出错,则导致整个Logstash无法继续运行,会影响到其他input。 2.logstash的多实例 优点: 每个实例互不影响,没有太强的逻辑性,理解简单。 缺点: 需要管理多个数据目录,相对而言更加重量级。 3.pipeline 优点: 轻量级,一个Logstash实例,轻量级,相对而言配置没有那么复杂,而是将配置文件拆分成多个文件的思路。 每个文件都有独立的input,filter和output,我们将其分为多个不同的pipeline。 无需使用-f选项指定配置文件,而是直接启动Logstash,Logstash会自动加载pipeline文件。 缺点: 几乎完美,当Logstash挂掉,会影响到所有的配置。
|
准备文件
1. tcp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| [root@elk01:1 ~]# cat /etc/logstash/conf.d/13-pipeline_tcp.conf input { tcp { port => 6666 } }
filter { mutate { add_field => { "address" => "%{host}:%{port}" } remove_field => [ "@version", "host","port"] } }
output { elasticsearch{ hosts => ["10.0.0.211:9200","10.0.0.212:9200","10.0.0.213:9200"] index => "logstash-pipeline-tcp-%{+yyyy.MM.dd}" } }
|
2. file
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| [root@elk01:1 ~]# cat /etc/logstash/conf.d/14-pipeline_file.conf input { file { path => ["/tmp/luay.log"] start_position => "beginning" } }
filter { mutate { rename => { "path" => "filepath" } } }
output { elasticsearch{ hosts => ["10.0.0.211:9200","10.0.0.212:9200","10.0.0.213:9200"] index => "logstash-pipeline-file-%{+yyyy.MM.dd}" } }
|
3. beats
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| [root@elk01:1 ~]# cat /etc/logstash/conf.d/15-pipeline_beat.conf input { beats { port => 5555 } }
filter { mutate { split => {"message" => "|"}
add_field => { "userId" => "%{[message][1]}" "action" => "%{[message][2]}" "svip" => "%{[message][3]}" "price" => "%{[message][4]}" } }
mutate { remove_field => [ "@version","input","agent","tags", "log" , "host","ecs","message"] } }
output { elasticsearch{ hosts => ["10.0.0.211:9200","10.0.0.212:9200","10.0.0.213:9200"] index => "logstash-pipeline-apps-%{+yyyy.MM.dd}" } }
[root@elk01:0 ~]# cat /etc/filebeat/18-apps-to-logstash.yaml filebeat: inputs: - type: filestream paths: - /tmp/apps.log
output.logstash: hosts: ["10.0.0.211:5555"]
|
修改pipeline文件
1 2 3 4 5 6 7 8 9 10
| [root@elk01:1 ~]# cat /etc/logstash/pipelines.yml ··· - pipeline.id: tcp-pipeline path.config: "/etc/logstash/conf.d/13-pipeline_tcp.conf" - pipeline.id: file-pipeline path.config: "/etc/logstash/conf.d/14-pipeline_file.conf" - pipeline.id: beat-pipeline path.config: "/etc/logstash/conf.d/15-pipeline_beat.conf"
|
创建软链接文件
1 2 3 4
| [root@elk01:1 ~]# mkdir /usr/share/logstash/config
[root@elk01:1 ~]# ln -svf /etc/logstash/pipelines.yml /usr/share/logstash/config/pipelines.yml '/usr/share/logstash/config/pipelines.yml' -> '/etc/logstash/pipelines.yml'
|
启动logstash
1 2 3 4 5 6 7 8 9 10
| [root@elk01:1 ~]# logstash -r
[root@elk01:0 ~]# echo 898989 |nc 10.0.0.211 6666 [root@elk01:0 ~]# echo 9900 >/tmp/luay.log [root@elk01:0 ~]# filebeat -e -c /etc/filebeat/18-apps-to-logstash.yaml
|