企业级logstash简单使用(ELK)

发布时间 2023-07-05 14:51:59作者: Comfortable

企业级logstash简单使用(ELK)

要使用logstash收集到Elasticsearch的方式,需确保logstash版本与es版本一致。

由于我也是刚刚研究使用,所以本文暂不会出现原理性的东西。

Logstash

介绍

Logstash是具有实时流水线能力的开源的数据收集引擎。Logstash可以动态统一不同来源的数据,并将数据标准化到您选择的目标输出。它提供了大量插件,可帮助我们解析,丰富,转换和缓冲任何类型的数据。

inputs(输入阶段)

会生成事件。包括:file、kafka、beats等

filters(过滤器阶段)

可以将过滤器和条件语句结合使用对事件进行处理。包括:grok、mutate等

outputs(输出阶段)

将事件数据发送到特定的目的地,完成了所以输出处理,改事件就完成了执行。如:elasticsearch、file等

使用方式

下载地址 :https://www.elastic.co/fr/downloads/logstash

下载之后随便解压到某个目录,会得到以下这些目录和文件,我们需要注意的就三个目录,bin、config、logs,下面一个一个说。

先来看config文件夹,进入后会有这几个文件:

查看logstash-sample.conf配置文件

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  #输入插件beats,轻量化
  beats { 
    #监听端口
    port => 5044
  }
}

output {
  #es连接地址及索引配置
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    #user => "elastic"
    #password => "changeme"
  }
}

下来稍微修改一下,我们启动试试。

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  beats {
    port => 5044
  }
  #添加file插件
  file {
    #测试环境中我们一般是nohup后台启动jar包,默认日志追加到nohup文件中,这里我们用插件读取这个日志发送到es上试试
    path => "/home/mm/mmm/nohup.out"
    mode => "read"
  }
}

output {
  elasticsearch {
    #配置自己的es连接,这里是使用es默认模板
    hosts => ["http://localhost:9200"]
    index => "ceshi"
    #user => "elastic"
    #password => "changeme"
  }
}

退回到bin目录下,启动

./logstash -f /自己路径下的配置文件/logstash/config/logstash-sample.conf

这样子就是启动成功了。

下面就是我们收集到的日志,大家可以看看默认都有什么字段。

     {        
        "_index" : "console-analysis",
        "_type" : "_doc",
        "_id" : "_DDNH4kBVvgVIOGHRiop",
        "_score" : 1.0,
        "_source" : {
          "port" : 57910,
          "thread_name" : "main",
          "host" : "172.17.0.5",
          "logger_name" : "com.alibaba.nacos.client.naming",
          "@version" : "1",
          "level_value" : 20000,
          "message" : "[BEAT] adding beat: BeatInfo{port=17007, ip='192.168.1.59', weight=1.0, serviceName='DEFAULT_GROUP@@amcp-analysis', cluster='DEFAULT', metadata={preserved.register.source=SPRING_CLOUD}, scheduled=false, period=5000, stopped=false} to beat map.",
          "level" : "INFO",
          "logHost" : "192.168.1.59:5044",
          "appname" : "analysis"
        }

Springboot集成logstash+elasticsearch

加入依赖

        <!-- https://mvnrepository.com/artifact/net.logstash.logback/logstash-logback-encoder -->
        <dependency>
            <groupId>net.logstash.logback</groupId>
            <artifactId>logstash-logback-encoder</artifactId>
            <version>7.1.1</version>
        </dependency>

配置

在resources目录下创建一个logback-spring.xml的xml文件。

如果是使用nacos来获取配置的话,文件名字不能是logback-spring.xml,因为会导致logback-spring.xml文件被加载两次,这样在logback-spring.xml文件中如果想读取nacos上的配置的话是拿不到的。

在yml或者properties文件中添加配置

logging:
 config: classpath:logback-nacos.xml

logstash:
 host: localhost:5044
logback-spring.xml配置文件
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>
	<!-- 获取配置中的值 -->
    <springProperty scope="context" name="logHost" source="logstash.host"/>

    <!-- 添加logstash连接配置 -->
    <appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
        <destination>${logHost}</destination> <!-- Logstash的主机和端口,可以配置多个 -->
        <!-- 其他属性 -->
        <connectionTimeout>5000</connectionTimeout>
        <reconnectionDelay>5000</reconnectionDelay>
        <encoder charset="UTF-8"
                 class="net.logstash.logback.encoder.LogstashEncoder">
            <!-- 自定义字段,这里我是用来区分收集的是哪个程序的日志,后面logstash配置我们可以看下它的作用 -->
            <customFields>{"appname":"analysis"}</customFields>
        </encoder>

    </appender>

    <!-- 控制台输出配置,不添加的话日志不会在控制台输出 -->
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}  %-5level --- [%thread] %logger{36} : %msg%n</pattern>
        </encoder>
    </appender>
	
    <!-- level设置收集日志级别,可以配置多个appender,这个配置文件中也可以直接添加filter过滤标签,这个大家自己尝试一下 -->
    <root level="info">
        <appender-ref ref="console" /> <!-- 添加控制台appender -->
        <appender-ref ref="logstash" />
    </root>

</configuration>

logstash配置

logstash-sample.conf配置文件
input {
  #配置监听端口,也就是destination
  tcp {
    mode => "server"
    host => "localhost"
    port => 5044
    codec => json_lines
  }
}

filter {
  #判断appname,在配置中声明一个变量,不同的appname赋予不同的值,这里其实就是根据我自定义的字段来给不同的es索引名称
  if [appname] == "analysis"{
    mutate {
      add_field => {
        "[@metadata][index]" => "console-analysis"
      }
    }
  }

  #判断日志级别,收集到的日志默认字段level会记录日志级别,这个时候我们可以根据需要对日志进行操作。下面这个操作是将日志级别,日志记录到自定义字段,以及将记录的时间进行转换记录到time字段,默认记录的时间是带时区的
  if [level] =~ /DEBUG/ {
    mutate {
      add_field => {
        "type" => "DEBUG"
        "details" => "%{message}"
      }
    }
    ruby {
      code => "
        event.set('time', event.timestamp.time.localtime.strftime('%Y-%m-%d %H:%M:%S'))
      "
    }
    mutate {
     remove_field => ["[@timestamp]"]
    }
  }
}

output {
 #这里判断message字段中如果不包含HiddenHorzOCR就记录,只是演示一下这里面也可以进行逻辑判断
 if !([message] =~ /HiddenHorzOCR/) {
   if [@metadata][index] {
     elasticsearch {
       hosts => ["http://192.168.1.59:9200"]
       #这个索引就是我们在filter中判断appname时赋的值
       index => "%{[@metadata][index]}"
       #指定es要使用的模板,也可以使用默认的
       template => "/home/collect.json"
       #模板名称
       template_name => "collect"
       #加载模板是否覆盖之前的模板
       template_overwrite => true
     }
   }
 }
}
es模板
{
  "index_patterns": ["console*"],
  "settings": {
    "number_of_shards": 5,
    "max_result_window": "500000000"
  },
  "mappings": {
    //自定义几个字段
    "properties": {
      "type": { "type": "keyword" },
      "details": { "type": "text" },
      "time": { 
                "type": "keyword" 
      }
    }
  }
}

所有配置添加完成之后,启动logstash和自己的应用程序,这个时候就可以上es或者kibana上查看创建出的索引以及收集到的日志。

这是指定模板后收集到的日志。

      {
        "_index" : "console-analysis",
        "_type" : "_doc",
        "_id" : "_jDNH4kBVvgVIOGHRiop",
        "_score" : 1.0,
        "_source" : {
          "port" : 57910,
          "details" : "Scanning for api listing references",
          "type" : "INFO",
          "time" : "2023-07-04 15:28:13",
          "thread_name" : "main",
          "host" : "172.17.0.5",
          "logger_name" : "springfox.documentation.spring.web.scanners.ApiListingReferenceScanner",
          "@version" : "1",
          "level_value" : 20000,
          "message" : "Scanning for api listing references",
          "level" : "INFO",
          "logHost" : "192.168.1.59:5044",
          "appname" : "analysis"
        }
      }