LogStash配置文件介绍及常用插件的配置

发布时间 2023-04-12 16:50:15作者: 金天黑日

  转https://blog.51cto.com/u_2225052/5873195

1 主要配置

  主要配置包含输入、过滤器、输出

# 输入
input {
  ...
}
 
# 过滤器
filter {
  ...
}
 
# 输出
output {
  ...
}

 

2 简要配置示例

2.1 示例

input {
    # 从文件读取日志信息
    file {
        path => "/var/log/error.log"
        type => "error"//type是给结果增加一个type属性,值为"error"的条目
        start_position => "beginning"//从开始位置开始读取
        # 使用 multiline 插件,传说中的多行合并
        codec => multiline {
            # 通过正则表达式匹配,具体配置根据自身实际情况而定
            pattern => "^\d"
            negate => true
            what => "previous"
        }
    }
}
 
#可配置多种处理规则,他是有顺序,所以通用的配置写下面
# filter {
#    grok {
#       match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
     }
# }
 
output {
    # 输出到 elasticsearch
    elasticsearch {
        hosts => ["192.168.22.41:9200"]
        index => "error-%{+YYYY.MM.dd}"//索引名称
    }
}

 

2.2 filter多个配置

  上面filter可以配置多个

file {  
        type => "tms_inbound.log"  
        path => "/JavaWeb/tms2.wltest.com/logs/tms_inbound.es.*.log"  
        codec => json {  
                charset => "UTF-8"  
            }  
    }  
 
  file {  
        type => "tms_outbound.log"  
        path => "/JavaWeb/tms2.wltest.com/logs/tms_outbound.es.*.log"  
        codec => json {  
                charset => "UTF-8"  
            }  
    }

 

3 配置文件结构及语法

3.1 结构{}

  Logstash通过{}来定义区域,区域内可以定义插件,一个区域内可以定义多个插件

  如下:一个input定义两个不同类型的input插件

input {
    stdin {
    }
    beats {
        port => 5044
    }
}

 

3.2 数据类型

  Logstash仅支持少量的数据类型

Boolean:ssl_enable => true

Number:port => 33

String:name => “Hello world”

Commonts:# this is a commen

 

3.3 字段引用   

  Logstash数据流中的数据被称之为Event对象,Event以JSON结构构成,Event的属性被称之为字段,如果你像在配置文件中引用这些字段,只需要把字段的名字写在中括号[]里就行了

  如[type],对于嵌套字段每层字段名称都写在[]里就可以了,比如:[tags][type]

  除此之外,对于Logstash的arrag类型支持下标与倒序下表,如:[tags][type][0],[tags][type][-1]

 

3.4 条件判断   

  Logstash支持下面的操作符

equality:==, !=, <, >, <=, >=

regexp:=~, !~

inclusion:in, not in

boolean:and, or, nand, xor

unary:!

 

if EXPRESSION {
  ...
} else if EXPRESSION {
  ...
} else {
  ...
}

 

3.5 环境变量引用   

  Logstash支持引用系统环境变量${},环境变量不存在时可以设置默认值

  示例

export TCP_PORT=12345
 
input {
  tcp {
    port => "${TCP_PORT:54321}"
  }
}

 

4 常用输入插件(Input Plug)

4.1 File读取插件

4.1.1 简介

  主要用来抓取文件的变化信息,将变化信息封装成Event进程处理或者传递。

 

4.1.2 配置示例

input
  file {
    path => ["/var/log/*.log", "/var/log/message"]
    type => "system"
    start_position => "beginning"
  }
}

 

4.1.3 常用参数

 

4.2 Beats监听插件

4.2.1 简介

  Beats插件用于建立监听服务,接收Filebeat或者其他beat发送的Events

 

4.2.2 配置示例

input {
    beats {
        port => 5044
    }
}

 

4.2.3 常用参数

  (空的同上面的)

 

 

4.3 TCP监听插件

4.3.1 简介

  TCP插件有两种工作模式,“Client”和“Server”,分别用于发送网络数据和监听网络数据

 

4.3.2 配置示例

tcp {
    port => 41414
}

 

4.3.3 常用参数

  (空的同上面的)

 

4.4 Redis读取插件

4.4.1 简介

  用于读取Redis中缓存的数据信息

 

4.4.2 配置示例

1)简要配置

input {
 
redis {
 
data_type => "list" #logstash redis插件工作方式
 
key => "logstash-test-list" #监听的键值
 
host => "127.0.0.1" #redis地址
 
port => 6379 #redis端口号
 
}
 
}
 
output {
 
stdout{}
 
}

 

2)详细配置

input {
 
redis {
 
batch_count => 1 #EVAL命令返回的事件数目
 
data_type => "list" #logstash redis插件工作方式
 
key => "logstash-test-list" #监听的键值
 
host => "127.0.0.1" #redis地址
 
port => 6379 #redis端口号
 
password => "123qwe" #如果有安全认证,此项为密码
 
db => 0 #redis数据库的编号
 
threads => 1 #启用线程数量
 
}
 
}
 
output {
 
stdout{}
 
}

 

4.5 kafka读取插件

4.5.1 简介

  用于读取kafka中缓存的数据信息

 

4.5.2 配置示例

input {
  kafka {
     bootstrap_servers => "192.168.1.252:9092" #kafka服务器地址
     topics => "252nginx-accesslog"
     batch_size => 5
     codec => "json" #写入的时候使用json编码,因为logstash收集后会转换成json格式
     group_id => "252nginx-access-log" 
     consumer_threads => 1
     decorate_events => true 
  }
        
}

 

4.6 Syslog监听插件

4.6.1 简介

  监听操作系统syslog信息

 

4.6.2 配置示例

  修改rsyslog配置文件

[root@node1 ~]# vim /etc/rsyslog.conf
*.* @@192.168.79.103:514
[root@node1 ~]# systemctl restart rsyslog

 

1)输出到屏幕

[root@node1 conf.d]# cat syslog.conf
input{
    syslog{
        type => "system-syslog"
    port => 514
    }
}
 
filter{
 
}
 
output{
    stdout{
    codec => rubydebug
    }
}
[root@node1 conf.d]# /opt/logstash/bin/logstash -f syslog.conf

 

2)输出到es

[root@node1 conf.d]# cat syslog.conf
input{
    syslog{
        type => "system-syslog"
    port => 514
    }
}
 
filter{
 
}
 
output{
    elasticsearch{
    hosts => ["192.168.79.103:9200"]
    index => "system-syslog-%{+YYYY.MM}"
    }
}
[root@node1 conf.d]# /opt/logstash/bin/logstash -f syslog.conf

 

5 常用过滤插件(Filter plugin)

  丰富的过滤器插件是logstash威力如此强大的重要因素,过滤器插件主要处理流经当前Logstash的事件信息,可以添加字段、移除字段、转换字段类型,通过正则表达式切分数据等,也可以根据条件判断来进行不同的数据处理方式

 

5.1 grok正则捕获

5.1.1 简介

  grok是Logstash中将非结构化数据解析成结构化数据以便于查询的最好工具,非常适合解析syslog logs,apache log, mysql log,以及一些其他的web log

 

5.1.2 预定义表达式调用

1)简介

  Logstash提供120个常用正则表达式可供安装使用,安装之后你可以通过名称调用它们

  

2)语法

%{SYNTAX:SEMANTIC}

SYNTAX:表示已经安装的正则表达式的名称

SEMANTIC:表示从Event中匹配到的内容的名称

 

3)简要示例

  Event的内容为“[debug] 127.0.0.1 - test log content”

  匹配%{IP:client}将获得“client: 127.0.0.1”的结果,前提是安装了IP表达式;

  如果你在捕获数据时想进行数据类型转换可以使用%{NUMBER:num:int}这种语法,默认情况下,所有的返回结果都是string类型,当前Logstash所支持的转换类型仅有“int”和“float”;

 

4)稍微完整点的示例

  日志文件http.log内容:55.3.244.1 GET /index.html 15824 0.043

  表达式:%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}

  配置文件内容

input {
  file {
    path => "/var/log/http.log"
  }
}
filter {
  grok {
    match => {"message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"}
  }
}

  输出结果

client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043

 

5.1.3 自定义表达式调用     

1)语法

  (?<field_name>the pattern here)     

 

2)简要示例

  捕获长度为10或11的十六进制queue_id

  使用表达式(?<queue_id>[0-9A-F]{10,11})

 

3)安装自定义表达式     

  与预定义表达式相同,你也可以将自定义的表达式配置到Logstash中,然后就可以像预定义的表达式一样使用;

  以下是操作步骤说明:

  (1)在Logstash根目录下创建文件夹“patterns”,在“patterns”文件夹中创建文件“extra”(文件名称无所谓,可自己选择有意义的文件名称);

  (2)在文件“extra”中添加表达式,格式:patternName regexp,名称与表达式之间用空格隔开即可,如下

# contents of ./patterns/postfix:
POSTFIX_QUEUEID [0-9A-F]{10,11}

 

4)使用自定义的表达式

  需要指定“patterns_dir”变量,变量内容指向表达式文件所在的目录

  举例如下:

  日志内容如下

## 日志内容 ##
Jan  1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>

  LogStash配置如下

## Logstash配置 ##
filter {
  grok {
    patterns_dir => ["./patterns"]
    match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
  }
}

  运行结果如下

## 运行结果 ##
timestamp: Jan 1 06:25:43
logsource: mailserver14
program: postfix/cleanup
pid: 21403
queue_id: BEF25A72965

 

5.1.4 常用配置参数

  (空的同上面的)

 

5.1.5 其它

  一般的正则表达式只能匹配单行文本,如果一个Event的内容为多行,可以在pattern前加“(?m)”
  对于Hash和Array类型,Hash表示键值对,Array表示数组
  Grok表达式在线debug地址​ ​
  预定义正则表达式参考地址

5.2 date时间处理插件   

5.2.1 简介

  该插件用于时间字段的格式转换

  比如将“Apr 17 09:32:01”(MMM dd HH:mm:ss)转换为“MM-dd HH:mm:ss”。

  而且通常情况下,Logstash会为自动给Event打上时间戳,但是这个时间戳是Event的处理时间(主要是input接收数据的时间),和日志记录时间会存在偏差(主要原因是buffer),我们可以使用此插件用日志发生时间替换掉默认是时间戳的值。

 

5.2.2常用配置参数

  (空的同上)

 

5.3 mutate数据修改插件

5.3.1 简介

  mutate 插件是 Logstash另一个重要插件。它提供了丰富的基础类型数据处理能力。可以重命名,删除,替换和修改事件中的字段

 

5.3.2 字符串转换为float型


 
[elk@Vsftp logstash]$ cat t2.conf 
input {
   stdin {
     }
 }
 
filter {
  grok {
   match =>{
   "message" =>"(?<request_time>\d+(?:\.\d+)?)"
     }
    }
     mutate {
        convert => ["request_time", "float"]
}
}
output {
   stdout {
   codec =>rubydebug
   }
}
 
[elk@Vsftp logstash]$ logstash -f t2.conf 
Settings: Default pipeline workers: 4
Pipeline main started
23.45
{
         "message" => "23.45",
        "@version" => "1",
      "@timestamp" => "2017-01-11T02:10:07.045Z",
            "host" => "Vsftp",
 
 

 

5.3.3 字符串转换成数值型

 
[elk@Vsftp logstash]$ cat t2.conf 
input {
   stdin {
     }
 }
 
filter {
  grok {
   match =>{
   "message" =>"(?<request_time>\d+(?:\.\d+)?)"
     }
    }
     mutate {
        convert => ["request_time", "integer"]
}
}
output {
   stdout {
   codec =>rubydebug
   }
}
 
[elk@Vsftp logstash]$ logstash -f t2.conf 
Settings: Default pipeline workers: 4
Pipeline main started
23.45
{
         "message" => "23.45",
        "@version" => "1",
      "@timestamp" => "2017-01-11T02:11:21.071Z",
            "host" => "Vsftp",
    "request_time" => 23
}

 

5.4 JSON插件

5.4.1 简介

  JSON插件用于解码JSON格式的字符串,一般是一堆日志信息中,部分是JSON格式,部分不是的情况下。

 

5.4.2 配置示例

## 事例配置,message是JSON格式的字符串:"{\"uid\":3081609001,\"type\":\"signal\"}" ##
filter {
    json {
        source => "message"
        target => "jsoncontent"
    }
}

输出结果

## 输出结果 ##
{
    "@version": "1",
    "@timestamp": "2014-11-18T08:11:33.000Z",
    "host": "web121.mweibo.tc.sinanode.com",
    "message": "{\"uid\":3081609001,\"type\":\"signal\"}",
    "jsoncontent": {
        "uid": 3081609001,
        "type": "signal"
    }
}
## 如果从事例配置中删除`target`,输出结果如下 ##
{
    "@version": "1",
    "@timestamp": "2014-11-18T08:11:33.000Z",
    "host": "web121.mweibo.tc.sinanode.com",
    "message": "{\"uid\":3081609001,\"type\":\"signal\"}",
    "uid": 3081609001,
    "type": "signal"
}

 

5.4.3 常用参数

  (空的同上)

 

 

 

5.5 elasticsearch查询过滤插件

5.5.1 简介

  用于查询Elasticsearch中的事件,可将查询结果应用于当前事件中

 

5.5.2 常用参数

  (空的同上)

 

6 常用输出插件(Output plugin)

6.1 ElasticSearch输出插件

6.1.1 简介

  用于将事件信息写入到Elasticsearch中,官方推荐插件,ELK必备插件

 

6.1.2 配置示例

output {
    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "filebeat-%{type}-%{+yyyy.MM.dd}"
        template_overwrite => true
    }
}

 

6.1.3 常用参数

  (空的同上)

 

6.2 Redis输出插件   

6.2.1 简介

  用于将Event写入Redis中进行缓存,通常情况下Logstash的Filter处理比较吃系统资源,复杂的Filter处理会非常耗时,如果Event产生速度比较快,可以使用Redis作为buffer使用

 

6.2.2 配置示例

output {
    redis {
        host => "127.0.0.1"
        port => 6379
        data_type => "list"
        key => "logstash-list"
    }
}

 

6.2.3 常用配置

  (空的同上)

 

6.3 File输出插件

6.3.1 简介

  用于将Event输出到文件内

 

6.3.2 配置示例

output {
    file {
        path => ...
        codec => line { format => "custom format: %{message}"}
    }
}

 

6.3.3 常用参数

  (空的同上)

 

6.4 TCP插件   

6.4.1 简介

  Write events over a TCP socket.Each event json is separated by a newline.Can either accept connections from clients or connect to a server, depending on mode

 

6.4.2 配置示例

tcp {
    host => ...
    port => ...
}

 

6.2.3 常用参数

  (空的同上)

 

6.5 kafka插件

6.5.1 简介

  用于将事件信息写入到kafka中

 

6.5.2 配置示例

output {
    if [type] == "nginx-access" {
        kafka {
            bootstrap_servers => "192.168.1.252:9092" #kafka服务器地址
            topic_id => "252nginx-accesslog"
            batch_size => 5
            codec => "json" #写入的时候使用json编码,因为logstash收集后会转换成json格式
        }
    } 
}

 

7 Logstash实例

7.1 接收Filebeat事件,输出到Redis

input {
    beats {
        port => 5044
    }
}
 
output {
    redis {
        host => "127.0.0.1"
        port => 6379
        data_type => "list"
        key => "logstash-list"
    }
}

 

7.2 读取Redis数据,根据“type”判断,分别处理,输出到ES

input {
    redis {
        host => "127.0.0.1"
        port => 6379
        data_type => "list"
        key => "logstash-list"
    }
}
 
filter {
    if [type] == "application" {
        grok {
            match => ["message", "(?m)-(?<systemName>.+?):(?<logTime>(?>\d\d){1,2}-(?:0?[1-9]|1[0-2])-(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) (?:2[0123]|[01]?[0-9]):(?:[0-5][0-9]):(?:(?:[0-5][0-9]|60)(?:[:.,][0-9]+)?)) \[(?<level>(\b\w+\b)) *\] (?<thread>(\b\w+\b)) \((?<point>.*?)\) - (?<content>.*)"]
        }
        date {
            match => ["logTime", "yyyy-MM-dd HH:mm:ss,SSS"]
        }
        json {
            source => "message"
        }
        date {
            match => ["timestamp", "yyyy-MM-dd HH:mm:ss,SSS"]
        }
    }
    if [type] == "application_bizz" {
        json {
            source => "message"
        }
        date {
            match => ["timestamp", "yyyy-MM-dd HH:mm:ss,SSS"]
        }
    }
    mutate {
        remove_field => ["@version", "beat", "logTime"]
    }
}
 
output {
    stdout{
    }
    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "filebeat-%{type}-%{+yyyy.MM.dd}"
        document_type => "%{documentType}"
        template_overwrite => true
    }
}