logstash基础入门教程

logstash基础入门教程

Posted by yishuifengxiao on 2019-09-24

一 常用命令

1.1 常用命令

  • —node.name
  • -f —path.conf pipeline 路径,可以是文件或文件夹
  • —path.settings logstash 配置文件夹路径,其中要包含logstash.yml
  • -e config.string 指明 pipeline 内容,多用于测试
  • -w pipeline.workers
  • -b pipeline.batch.size
  • —path.data
  • —debug
  • -t —config.test_and_exit

具体更多配置参见 官方文档

示例 1

1
bin/logstash -e 'input{stdin{}} output{stdout{}}'

1.2 多实例运行

1
2
bin/logstash --path.settings instance1
bin/logstash --path.settings instance2

不同的 instance 修改其中的logstash.yml,自定义path.data,确保其不相同即可。

二 配置详解

logstash 的配置主要有三部分

1
2
3
4
5
6
7
8
9
10
11
input { #输入
stdin {} # 标准输入
}

filter { # 过滤,对数据进行分割、截取等操作

}

output {
stdout {} #标准输出
}

2.1 pipeline 语法

2.1.1 数据类型

pipeline 主要有如下数据类型

  1. 布尔类型 Boolean
1
issuc => true
  1. 数值类型 Number
1
port => 22
  1. 字符串类型 String
1
name = "hello world"
  1. 数组 Array/List
1
2
users => [{id=>1,name=>tom},{id=>2,name=>yishui}]
path => ["/var/lib/*.log","/var/logs/message"]
  1. 哈希类型
1
2
3
4
match => {
"k1":"v1"
"k2":"v2"
}
2.1.2 数值引用

在配置中,可以引用 logstash event 的属性(字段),主要有一下两种方式

  • 直接引用字段值
  • 在字符串中以 sprintf 方式引用

直接引用字段值

使用[]即可,嵌套字段写多层[]即可

例如

1
2
3
4
"ip":"192.168.1.165"
"os":{
"version":"win10"
}

引用方法如下

1
2
3
4
5
6
7
if[ip] =~ "index" {
...
}

if[os][version] =~ "windows" {
...
}

在字符串中以 sprintf 方式引用

例如

1
2
3
4
"ip":"192.168.1.165"
"os":{
"version":"win10"
}

引用方法如下

1
2
ip => "ip is %{ip}"
ver => "version is %{[os][version]}"
2.1.3 条件判断

主要格式如下

1
2
3
4
5
6
7
if 表达式 {

}else if 表达式 {

}else{

}

表达式可以使用一下操作符

  • 比较 :==、 != 、>、 < 、>= 、<=
  • 正则是否配置 : =~ 、!~
  • 包含(字符串或数组) :in not in
  • 布尔操作符 : and 、or 、nand 、xor 、!
  • 分组操作符 :()

插件配置

2.2.1 输入

采集各种样式、大小和来源的数据. 数据往往以各种各样的形式,或分散或集中地存在于很多系统中

Logstash 支持各种输入选择,可以在同时间从众多常用来源捕捉事件。能够以连续的流式传输方式, 轻松地
从日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。

input 插件执行输入数据源,一个 pipeline 可以有多个 input 插件

主要的 input 插件有

  • stdin
  • file
  • kafka

最简单的输入,从标准输入读取数据,通用配置为:

  • codec 类型为 codec
  • type 类型为 string ,自定义该事件的类型,可用于后续判断
  • tags 类型为 array,自定义该事件的 tag ,可用于后续判断
  • add_field 类型为 hash,为该事件添加字段

例如

在 input-sdtin.conf 配置文件中

1
2
3
4
5
6
7
8
9
10
11
12
input {
codec => "plain"
tags => ["test"]
type => "std"
add_field => {"key" => "value"}
}

output {
stdout =>{
codec => "rubydebug"
}
}

此时,在 output 中可以这样使用

1
2
3
4
5
6
7
8
output{
"type" => "std", //在input-sdtin.conf中定义的
"@version" => 1,
"key" => "value"
"tages" => {
[0]"test" //在input-sdtin.conf中定义的
}
}
2.2.1.1 codec 插件

. Codec Plugin 作用于 input 和 output plugin,负责将数据在原始与 Logstash Event 之间转换,常见的 codec 有:

  • plain 读取原始内容
  • dots 将内容简化为点进行输出
  • rubydebug 将 Logstash Events 按照 ruby 格式输出,方便调试
  • line 处理带有换行符的内容
  • json 处理 json 格式的内容
  • multiline 处理多行数据的内容

示例如下

1
2
3
4
5
bin/logstash -e "inoput{stdin{code=>line}} output{stdout=>{codec=>rubydebug}}"

bin/logstash -e "inoput{stdin{code=>line}} output{stdout=>{codec=>dots}}"

bin/logstash -e "inoput{stdin{code=>json}} output{stdout=>{codec=>rubydebug}}"

主要设置参数如下:

  • pattern 设置行匹配的正则表达式,可以使用 grok
  • what previous|next,如果匹配成功,那么匹配行是归属上一个事件还是下一个事件
  • negate true or false 是否对 pattern 的结果取反

示例

1
2
3
4
5
6
7
8
input {
stdin {
codec => multiline {
pattern => "^\s" #表示以空格开头
what => "previous"
}
}
}
2.2.2 过滤

实时解析和转换数据

数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件.识别已命名的字段以构建结构,并将它
们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。

Fiter 是 Logstash 功能强大的主爱原因。它可以对 Logstash Event 进行车的处理。比如解析数据。删除字段。类型转换等等,常见的有如下几个、

  • date 日期解析
  • grok 正则匹配解析
  • dissect 分割符解析
  • mutate 对字段作处理,比如重命名、删除, 替换等
  • json 按照 json 解析字段内容到定字段中
  • geoip 增加地理位置数据
  • ruby 利用 ruby 代码来动志修改 Logstash Event
2.2.2.1 date 插件

用于将日期字符串解析为日期类型,然后替换 @timestamp 字段或指定的其他字段

示例

1
2
3
4
5
filter {
date {
match => ["logdate" ,"MM dd yyyy HH:mm:ss"]
}
}

. match

  • 类型为数组,用于指定日期匹配的格式,可以一-次指定多种日期格式

  • match => [ “logdate”, “MMM dd yyy HH:mmss”MMM d yy

    HH:mm:ss”, “ISO8601” ]

. target

-类型为字符串,用于指定赋值的字段名,默认是@timestamp

. timezone

-类型为字符串,用于指定时区

2.2.2.2 grok 插件

例如要解析这样一段文本信息

1
2
3
192.168.0.105 - - [25/Mar/2019:09:24:26 -0342] "GET /fancy.html HTTP/1.1 200
5386 "-" "Mozilla/6.0 (X11; Linux x86_ 64; rv:51.0) Gecko/ 20100101 Firefox/
51.0"

使用 grok 方式的解析表达式如下

1
2
3
%{IPORHOST:clientip} %{USER:ident} %{USER:auth}\[%{HTTPDATE:timestamp}
\] %{WORD:verb} %{DATA:request) HTTP/%{NUMBER:httpversion)}"%
{INUMBER:response:int} (?:-|%NUMBER:bytes:int)%{QS:referrer) %{QS:agent}

IPORHOST等其实带有名字的正则表达式集合

具体的参见 https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns

.Grok 语法如下:

  • %(SYNTAX:SEMANTIQ}
  • SYNTAX 为 grok pattern 的名称,SEMANTIC 为赋值字段名称
  • %{NUMBER:duration}可以匹配数值类型,但是 grok 匹配出的内容都是字符串类型,可以通过在最后指定为 int 或者 float 来强制转换类型。
  • %{NUMBER:duration:float)
2.2.2.3 dissect 插件

dissect 基于分隔符进行分割 ,解决了基于 grok 解析时消耗太多 cpu 资源的问题

对于文本信息

1
2
3
192.168.0.105 - - [25/Mar/2019:09:24:26 -0342] "GET /fancy.html HTTP/1.1 200
5386 "-" "Mozilla/6.0 (X11; Linux x86_ 64; rv:51.0) Gecko/ 20100101 Firefox/
51.0"

使用 dissect 解析的命令如下

1
%{clientip} %{ident} %{auth} [%{timestamp}] "%{request}" %{response} %{bytes} "%{referrer}" "%{agent}"

dissect 语法比较简单,有一系列 字段 和 分隔符组成
%{} 是字段,两个 %{} 之间的是分隔符

对于文本信息

1
Apr 20 12:11:10 localhost systemd[1]:this is log info

解析命令如下

1
2
3
4
5
filter {
dissect {
mapping =>{ "message" => %{ts} %{+ts} %{+ts} } %{src} %{rog}[%{pid}] :%{msg} }
}
}

得到的解析结果如下

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"msg" => "this is log info",
"@timestamp" => "2019-09-22T20:20:20.248Z",
"src" => "localhost" ,
"@version" => "localhost" ,
"host" => "localhost.localdomain",
"pid" => "1"
"message" => "Apr 20 12:11:10 localhost systemd[1]:this is log info",
"type" => "stdin" ,
"rog" => "systemd" ,
"ts" => "Apr 20 12:11:10"

}

使用 dissect 时还可以指定拼接顺序,例如

1
2
3
4
5
filter {
dissect {
mapping =>{ "message" => %{+ts/2} %{+ts/1} %{+ts/3} } %{src} %{rog}[%{pid}] :%{msg}
}
}

此外,对于形为

1
a=1&b=2

的信息,解析命令可为

1
%{?key1}=%{&key1}&%{?key2}=%{&key2}

其中
%{?key1}表示忽略匹配值,但是赋予字段名,用于后续匹配

%{&key1}表示将匹配值赋予 key1 的匹配值

另外,dissect 分割后的值都是字符串,可以使用 convert_datatype 属性进行类型转换

1
2
3
4
5
6
7
8
filter {

dissect {
convert_datatype {
age => "int"
}
}
}
2.2.2.4 mutate 插件

mutate 是使用最频繁的操作,可以对字段进行各种操作,比如重命名、删除、替换、更新
等,主要操作如下:

  • convert 类型转换
  • gsub 字符串替换
  • split/join/merge 字符串切割、数组合并为字符串、数组合并为数组
  • rename 字段重命名
  • update/replace 字段内容更新或替换
  • remove_field 删除字段

convert 类型转换
实现字段类型转换时类型为 hash,仅支持 Integer 、float 、string 和 boolean

例如

1
2
3
4
5
filter {
mutate {
convert => { "age" => "integer" }
}
}

gsub 字符串替换

1
2
3
4
5
6
7
8
filter {
mutate {
gsub => {
"path" ,"/","_",
"urlparams","[\\?#-]","."
}
}
}

表示对字段内容进行替换,类型为数组,每三项为一个替换匹配

字符串切割

1
2
3
4
5
filter {
mutate {
split => { "works" => "," }
}
}

更新字段内容

update 只在字段存在时生效,而 replace 在字段不存在时会执行新增字段操作

1
2
3
4
5
filter {
mutate {
replace => { "message" => "%{source_host}:my demo message" }
}
}

其中 source_host 是 logstash event 中的字段

2.2.3 输出

logstash 提供众多的输出,可以将数据发送到指定的地方

常见的插件有

  • stdout
  • file
  • elasticsearch

stdout

输出到标准输出,多用于调试

1
2
3
4
5
output {
stdout {
codec => rubydebug
}
}

输出到文件

实现将分散到多地的文件统一到一处的要求,

1
2
3
4
5
6
output {
file {
path => "/var/logs/demo.log"
codec => line { format => "%{meaasge}"}
}
}

在上述配置,由于默认是输出 json 格式的数据,因此使用 format 可以输出原始格式的数据

输出到 elasticsearch

1
2
3
4
5
6
7
8
output {
elasticsearch {
hosts => ["http://192.168.19.128:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
#user => "elastic"
#password => "changeme"
}
}

其中@metadata是特殊字段,其内容不会输出在 output 中,适合用来存储做条件判断或临时存储的字段

例如

1
2
3
4
5
6
7
8
9
10
11
input {stdin {} }

filter {
mutate {add_field =>{"[@metadata][debug]" => true}}
}

output {
if[@metadata][debug]{
stdout {codec => rubydebug}
}
}

三 读取自定义日志

前面我们通过 Filebeat 读取了 nginx 的日志.如果是自定义结构的日志.就需要读取处理后才能使用,所以.这个时
候就需要使用 logstash 了.因为 logstash 有着强大的处理能力,可以应对各种各样的场景。

3.1 日志结构

1
2019-06-06 18:35:06|ERROR|读取数据出错 | 参数: id= 1002

可以看到,日志中的内容是使用 | 进行分割的。我们在处理的时候。也需要对数据做分割处理。

3.2 编写配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
input {
file {
path => "/var/logs/demo.log"
start_position => "beginning"
}
}

filter {
mutate {
split => {"message"=>"|"}
}
}

output {
stdout { codec => rubydebug }
}

也可以输出到 elasticsearch 和对数据进行转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
input {
file {
path => "/var/logs/demo.log"
start_position => "beginning"
}
}

filter {
mutate {
split => {"message"=>"|"}
}
mutate {
add_field => { #分割后的数据的索引从0开始
"userId"=>"%{message[1]}" #获取分割后的第1个数据
"visit"=>"%{message[2]}" #获取分割后的第1个数据
"date"=>"%{message[3]}" #获取分割后的第1个数据
}
}
mutate {
convert => {
"userId"=>"integer"
"visit"=>"string"
"date"=>"string"
}
}
}

output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
#user => "elastic"
#password => "changeme"
}
}

3.3 实例

解析下面的数据

1
2
3
4
5
6
7
8
9
10
2019-09-24 17:20:21.680 [main] INFO  com.yishuifengxiao.webmagic.WebmagicApplication - No active profile set, falling back to default profiles: default
2019-09-24 17:20:22.152 [main] INFO com.yishuifengxiao.webmagic.WebmagicApplication - Started WebmagicApplication in 1.035 seconds (JVM running for 2.97)
2019-09-24 17:20:26.657 [main] INFO com.yishuifengxiao.webmagic.LogCommander - 用户名为 TrshKKT ,密码为 NjxEWoD ,年龄为 30
2019-09-24 17:20:29.159 [main] INFO com.yishuifengxiao.webmagic.LogCommander - 用户名为 Dudve ,密码为 Q3dMPT8 ,年龄为 32
2019-09-24 17:20:33.159 [main] INFO com.yishuifengxiao.webmagic.LogCommander - 用户名为 Ulpdut ,密码为 RZGAK2f ,年龄为 34
2019-09-24 17:20:39.160 [main] INFO com.yishuifengxiao.webmagic.LogCommander - 用户名为 ZWWA ,密码为 lSgSKEw ,年龄为 53
2019-09-24 17:20:47.661 [main] INFO com.yishuifengxiao.webmagic.LogCommander - 用户名为 tAYciw ,密码为 AbSQP98 ,年龄为 42
2019-09-24 17:20:56.662 [main] INFO com.yishuifengxiao.webmagic.LogCommander - 用户名为 WvTUBHC ,密码为 kyiknPQ ,年龄为 49
2019-09-24 17:21:02.663 [main] INFO com.yishuifengxiao.webmagic.LogCommander - 用户名为 lMTwwBUzr ,密码为 SQK3sex ,年龄为 33
2019-09-24 17:21:10.663 [main] INFO com.yishuifengxiao.webmagic.LogCommander - 用户名为 gNgedb ,密码为 MJpSi1I ,年龄为 11

配置文件如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
input {
file {
path => "/usr/local/demo/yishui_log/yishui.log"
start_position => "beginning"
}
}

filter {
### 去掉非法的数据
if([message]=~ "^\d+"){
### 丢弃
drop{}
}

### 去掉多余的空格
mutate{
gsub => [ "message", "\s+", "" ]
}



### 字符串切割
mutate {
split => {"message"=>","}
add_field => {
"username" => "%{[message][0]}"
}
add_field => {
"pwd" => "%{[message][1]}"
}
add_field => {
"age" => "%{[message][2]}"
}
}


### 原始数据还原
mutate {
join => ["message", ","]
}

### 去掉中文
mutate{
gsub => [ "username", "[\u4e00-\u9fa5]+", "" ]
gsub => [ "pwd", "[\u4e00-\u9fa5]+", "" ]
gsub => [ "age", "[\u4e00-\u9fa5]+", "" ]
}

# 数据类型转换
mutate {
convert => { "age" => "integer" }
}
}


# output {
# stdout {
# codec => rubydebug
# }
# }

output {
elasticsearch {
hosts => ["http://192.168.19.128:9200"]
index => "logs-v0.1.1-%{+YYYY.MM.dd}"
#user => "elastic"
#password => "changeme"
}
}

安装仪表盘到 kibana

1
./metricbeat setup --dashboards






参考链接

logstash 官方文档

filter-mutate 过滤插件 - dance_man - 博客园

Grok 正则表达式调试地址