linux-logstash

发布时间 2023-06-25 21:35:32作者: wh459086748

logstash

一、部署

1.基于rpm方式安装logstash

#下载软件包
[root@elk101.com ~]# ll
-rw-r--r--  1 root root  34965920 Apr  6 11:19 filebeat-7.17.5-x86_64.rpm

#安装logstash
[root@elk103.com ~]# rpm -ivh logstash-7.17.5-x86_64.rpm

#验证logstash版本
[root@elk103.com ~]# ln -svf /usr/share/logstash/bin/logstash /usr/local/sbin
‘/usr/local/sbin/logstash’ -> ‘/usr/share/logstash/bin/logstash’
[root@elk103.com ~]# logstash -V
Using JAVA_HOME defined java: /es/softwares/jdk1.8.0_291
WARNING: Using JAVA_HOME while Logstash distribution comes with a bundled JDK.
DEPRECATION: The use of JAVA_HOME is now deprecated and will be removed starting from 8.0. Please configure LS_JAVA_HOME instead.
logstash 7.17.5

#基于命令行启动logstash实例
[root@elk103.com ~]# logstash -e "input { stdin { type => stdin } } output { stdout { codec => rubydebug } }"

2.基于二进制安装

#下载软件包
[root@elk101.com ~]# ll
-rw-r--r--  1 root root 363609474 Apr  7 14:42 logstash-7.17.5-linux-x86_64.tar.gz

#解压软件包
[root@elk101.com ~]# tar xf logstash-7.17.5-linux-x86_64.tar.gz -C /es/softwares/

#验证logstash版本
[root@elk101.com ~]# ln -svf /es/softwares/logstash-7.17.5/bin/logstash /usr/local/sbin/
‘/usr/local/sbin/logstash’ -> ‘/es/softwares/logstash-7.17.5/bin/logstash’
[root@elk101.com ~]# logstash -V
Using JAVA_HOME defined java: /es/softwares/jdk1.8.0_291
WARNING: Using JAVA_HOME while Logstash distribution comes with a bundled JDK.
DEPRECATION: The use of JAVA_HOME is now deprecated and will be removed starting from 8.0. Please configure LS_JAVA_HOME instead.

#基于命令行启动logstash实例
[root@elk101.com ~]# logstash -e "input { stdin { type => stdin } } output { stdout {} }"

二、编写logstash配置文件

1.标准输入输出

[root@elk101.com ~]# mkdir config
[root@elk101.com ~/config]# cat stdin-to-stdout.conf
input { 
  stdin { type => stdin } 
} 

output { 
  stdout {} 
}

#启动logstash
[root@elk101.com ~]# logstash -f config/stdin-to-stdout.conf 
The stdin plugin is now waiting for input:
[2023-04-07T21:12:00,721][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
q111
{
    "@timestamp" => 2023-04-07T13:12:21.968Z,
       "message" => "q111",
      "@version" => "1",
          "host" => "elk101.com",
          "type" => "stdin"
}

2.logstash搭配filebeat实战案例

[root@elk101.com ~/config]# cat beats-to-stdout.conf
input { 
  # 指定输入的类型是一个beats
  beats {
    # 指定监听的端口号
    port => 8888
  }
} 

output { 
  # 将数据在标准输出显示
  stdout {} 
  
  # 将数据写入ES集群
  elasticsearch {
    # 指定ES主机地址
    hosts => ["http://localhost:9200"]
    # 指定索引名称
    index => "linux-es-logstash"
  }
}

[root@elk101.com ~]# logstash -rf config/beats-to-stdout.conf


#启动filebeat
[root@elk103.com /es/softwares/filebeat-7.17.5-linux-x86_64/config]# cat nginx-to-logstash.yaml
filebeat.inputs:
- type: log
  paths:
    - /var/log/nginx/access.log*

# 将数据输出到logstash中
output.logstash:
  # 指定logstash的主机和端口
  hosts: ["10.0.0.101:8888"]
[root@elk103.com /es/softwares/filebeat-7.17.5-linux-x86_64]# filebeat -e -c config/nginx-to-logstash.yaml

温馨提示

logstash:

-r 热加载,修改完配置文件,自动重新加载

-f 指定配置文件

image-20230407211825876

3.logstash的过滤插件之geoip实战案例

[root@elk103.com /es/softwares/filebeat-7.17.5-linux-x86_64/config]# cat /var/log/nginx/access.log
{"@timestamp":"2023-04-06T16:17:43+08:00","host":"10.0.0.103","clientip":"110.110.110.110","SendBytes":615,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"10.0.0.103","uri":"/index.html","domain":"10.0.0.103","xff":"-","referer":"-","tcp_xff":"-","http_user_agent":"curl/7.29.0","status":"200"}
{"@timestamp":"2023-04-06T18:18:18+08:00","host":"10.0.0.103","clientip":"101.231.54.100","SendBytes":0,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"10.0.0.103","uri":"/index.html","domain":"10.0.0.103","xff":"-","referer":"-","tcp_xff":"-","http_user_agent":"Mozilla/5.0 (iPad; CPU OS 13_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/87.0.4280.77 Mobile/15E148 Safari/604.1","status":"304"}
{"@timestamp":"2023-04-07T08:18:32+08:00","host":"10.0.0.103","clientip":"219.141.136.10","SendBytes":0,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"10.0.0.103","uri":"/index.html","domain":"10.0.0.103","xff":"-","referer":"-","tcp_xff":"-","http_user_agent":"Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1","status":"304"}
{"@timestamp":"2023-04-07T10:18:52+08:00","host":"10.0.0.103","clientip":"221.118.208.184","SendBytes":0,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"10.0.0.103","uri":"/index.html","domain":"10.0.0.103","xff":"-","referer":"-","tcp_xff":"-","http_user_agent":"Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1","status":"304"}
{"@timestamp":"2023-04-07T12:19:07+08:00","host":"10.0.0.103","clientip":"21.118.208.84","SendBytes":0,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"10.0.0.103","uri":"/index.html","domain":"10.0.0.103","xff":"-","referer":"-","tcp_xff":"-","http_user_agent":"Mozilla/5.0 (Linux; Android 10; SM-G981B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.162 Mobile Safari/537.36","status":"404"}


#logstash配置文件
[root@elk101.com ~/config]# cat beats-geoip-es.conf
input { 
  # 指定输入的类型是一个beats
  beats {
    # 指定监听的端口号
    port => 8888
  }
} 


filter {
  # 根据IP地址分析客户端的经纬度,国家,城市信息等。
  geoip {
     source => "clientip"
     remove_field => [ "agent","log","input","host","ecs","tags" ]
  }

}

output { 
  # 将数据在标准输出显示
  stdout {} 
  
  # 将数据写入ES集群
  elasticsearch {
    # 指定ES主机地址
    hosts => ["http://localhost:9200"]
    # 指定索引名称
    index => "linux-es-logstash-ip"
  }
}


#启动logstash
[root@elk101.com ~]# logstash -rf config/beats-geoip-es.conf


#filebeat采集数据到logstash
[root@elk103.com /es/softwares/filebeat-7.17.5-linux-x86_64/config]# cat nginx-to-logstash.yaml
filebeat.inputs:
- type: log
  paths:
    - /var/log/nginx/access.log*
  json.keys_under_root: true
  json.add_error_key: true

# 将数据输出到logstash中
output.logstash:
  # 指定logstash的主机和端口
  hosts: ["10.0.0.101:8888"]

#启动filebeat
[root@elk103.com /es/softwares/filebeat-7.17.5-linux-x86_64]# filebeat -e -c config/nginx-to-logstash.yaml


{
          "http_host" => "10.0.0.103",
       "upstreamhost" => "-",
       #geoip解析的内容
              "geoip" => {
          "country_name" => "United States",
         "country_code3" => "US",
        "continent_code" => "NA",
             "longitude" => -97.822,
              "latitude" => 37.751,
         "country_code2" => "US",
                    "ip" => "21.118.208.84",
              "timezone" => "America/Chicago",
              "location" => {
            "lon" => -97.822,
            "lat" => 37.751
        }
    },
           "clientip" => "21.118.208.84",
    "http_user_agent" => "Mozilla/5.0 (Linux; Android 10; SM-G981B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.162 Mobile Safari/537.36",
         "@timestamp" => 2023-04-07T13:29:44.170Z,
                "xff" => "-",
             "domain" => "10.0.0.103",
            "tcp_xff" => "-",
                "uri" => "/index.html",
       "upstreamtime" => "-",
       "responsetime" => 0,
             "status" => "404",
            "referer" => "-",
           "@version" => "1",
          "SendBytes" => 0
}

image-20230407213053848

4.logstash解析nginx原生日志并分析IP地址实战

#原生日志,没有修改过配置文件的,不是json的
[root@elk101.com ~/config]# cat beats-grok_geoip-es.conf
input { 
  beats {
    port => 8888
  }
} 


filter {
   grok {
      match => { "message" => "%{HTTPD_COMBINEDLOG}" }
      remove_field => [ "agent","log","input","host","ecs","tags" ]
   }

  geoip {
     source => "clientip"
  }

}

output { 
 #  stdout {} 
  
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "linux-es-logstash-nginx"
  }
}
#启动logstash
[root@elk101.com ~]# logstash -rf config/beats-grok_geoip-es.conf


#修改filebeat配置文件
[root@elk103.com /es/softwares/filebeat-7.17.5-linux-x86_64/config]# cat nginx-to-logstash.yaml
filebeat.inputs:
- type: log
  paths:
    - /tmp/test-filebeat/access.log

# 将数据输出到logstash中
output.logstash:
  # 指定logstash的主机和端口
  hosts: ["10.0.0.101:8888"]

[root@elk103.com /es/softwares/filebeat-7.17.5-linux-x86_64]# filebeat -e -c config/nginx-to-logstash.yaml 

image-202304072138285725.logstash解析将实际写入时间更正案例

[root@elk101.com ~/config]# cat beats-grok_geoip_date-es.conf
input { 
  beats {
    port => 8888
  }
} 


filter {
   grok {
      match => { "message" => "%{HTTPD_COMBINEDLOG}" }
      remove_field => [ "agent","log","input","host","ecs","tags" ]
   }

  geoip {
     source => "clientip"
  }

  date {
      # 匹配时间字符串字段并格式化
      # "22/Nov/2015:11:57:34 +0800"
      match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
      # 匹配时区
      timezone => "Asia/Shanghai"
      # 将转后的日期替换为指定字段,若不指定,则默认值为"@timestamp"
      target => "linux-es-date"
  }

}

output { 
 #stdout {} 
  
 elasticsearch {
   hosts => ["http://localhost:9200"]
   index => "linux-es-logstash-nginx-date"
 }
}
#启动logstash
[root@elk101.com ~]# logstash -rf config/beats-grok_geoip_date-es.conf

#logbeat配置
[root@elk103.com /es/softwares/filebeat-7.17.5-linux-x86_64/config]# cat nginx-to-logstash.yaml
filebeat.inputs:
- type: log
  paths:
    - /tmp/test-filebeat/access.log

# 将数据输出到logstash中
output.logstash:
  # 指定logstash的主机和端口
  hosts: ["10.0.0.101:8888"]
  
  
[root@elk103.com /es/softwares/filebeat-7.17.5-linux-x86_64]# filebeat -e -c config/nginx-to-logstash.yaml 


#logstash输出
{
          "message" => "27.159.21.216 - - [22/Nov/2015:11:03:35 +0800] \"GET /online/oldboyonline/images/ad/20151111/L2.png HTTP/1.1\" 200 44624 \"http://www.papaonline.com.cn/online/index.jsp?lang=zh_CN\" \"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36\"",
         "clientip" => "27.159.21.216",
        "timestamp" => "22/Nov/2015:11:03:35 +0800",
         "response" => "200",
             "verb" => "GET",
         "@version" => "1",
         "referrer" => "\"http://www.papaonline.com.cn/online/index.jsp?lang=zh_CN\"",
      "httpversion" => "1.1",
            "geoip" => {
              "location" => {
            "lat" => 34.7732,
            "lon" => 113.722
        },
        "continent_code" => "AS",
          "country_name" => "China",
         "country_code2" => "CN",
                    "ip" => "27.159.21.216",
         "country_code3" => "CN",
              "timezone" => "Asia/Shanghai",
              "latitude" => 34.7732,
             "longitude" => 113.722
    },
            "bytes" => "44624",
             "auth" => "-",
       "@timestamp" => 2023-04-07T13:47:27.888Z,
            "ident" => "-",
            #装换之后的日期
    "linux-es-date" => 2015-11-22T03:03:35.000Z,
          "request" => "/online/oldboyonline/images/ad/20151111/L2.png"
}


补充

[root@elk101.com ~/config]# cat beats-grok_geoip_date-es.conf
input { 
beats {
 port => 8888
}
} 


filter {
grok {
   match => { "message" => "%{HTTPD_COMBINEDLOG}" }
   remove_field => [ "agent","log","input","host","ecs","tags" ]
}

geoip {
  source => "clientip"
}

date {
   # 匹配时间字符串字段并格式化
   # "22/Nov/2015:11:57:34 +0800"
   match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
   # 匹配时区
   timezone => "Asia/Shanghai"
   # 将转后的日期替换为指定字段,若不指定,则默认值为"@timestamp"
   #补充:如果不设置的话,在建立索引模式的时候就可以选择timetamp
   #@timestamp会变成日志的访问时间
   #target => "linux-es-date"
}

}

output { 
#stdout {} 

elasticsearch {
hosts => ["http://localhost:9200"]
index => "linux-es-logstash-nginx-date"
}
}

image-20230407215325960

6. grok自定义正则模式

#先定义正则的格式
#进入logstash的配置文件目录
[root@elk101.com ~]# cd config/
#创建一个目录专门放正则的文件
[root@elk101.com ~/config]# mkdir -p patterns
#创建正则的文件
[root@elk101.com ~/config]# cat patterns/test
YEAR [\d]{4}
CLASSROOMNUMBER [0-9]{2}
TEACHER [A-Z]+


#创建logstash配置文件
[root@elk101.com ~/config]# cat tcp-grok_custom_pattern-es.conf
input { 
  tcp {
    port => 9999
    type => "tcp"
  }
} 


filter {
 
  
     grok {
         # 指定加载pattern匹配模式的目录,可以是相对路径,也可以是绝对路径
         #相对路径的话,要和执行命令的目录在一起
         patterns_dir => ["/root/config/patterns"]
         # 基于指定字段进行匹配
         # match => { "message" => "%{TEACHER:teacher}school%{YEAR:year} 教室%{CLASSROOMNUMBER:classroom_number}"}
         match => { "message" => "%{TEACHER:teacher}.{6}%{YEAR:year} 教室%{CLASSROOMNUMBER:classroom_number}"}
         add_field => {"custom-type" => "jiaoshi07-tcp"}
    }
  
}

output { 
 stdout {} 
}

#启动logstash
[root@elk101.com ~]# logstash -rf config/tcp-grok_custom_pattern-es.conf


#测试tcp
[root@elk103.com ~]# echo "LINUXschool2003 教室07" |nc 10.0.0.101 9999

#logstash输出
{
                "host" => "elk103.com",
          "@timestamp" => 2023-04-09T12:44:28.345Z,
             "message" => "LINUXschool2003 教室07",
            "@version" => "1",
                "type" => "tcp",
         "custom-type" => "jiaoshi07-tcp",
                "port" => 41786,
             "teacher" => "LINUX",
    "classroom_number" => "07",
                "year" => "2003"
}


7.logstash单分支和双分支

#logstash配置文件
[root@elk101.com ~/config]# cat tcp-grok_custom_pattern_if-es.conf
input { 
  beats {
    port => 8888
    type => "beats"
  }

  tcp {
    port => 9999
    type => "tcp"
  }

  http {
  #默认8080端口
    type => "http"
  }
} 


filter {
  if [type] == "beats" {
      grok {
         match => { "message" => "%{HTTPD_COMBINEDLOG}" }
         remove_field => [ "agent","log","input","host","ecs","tags" ]
      }

      geoip {
         source => "clientip"
         add_field => {"custom-type" => "jiaoshi07-beats"}
      }
      
      date {
          match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
          timezone => "Asia/Shanghai"
          target => "linux-es-date"
      }
  }

  if [type] == "tcp" {
     grok {
         # 指定加载pattern匹配模式的目录,可以是相对路径,也可以是绝对路径
         patterns_dir => ["/root/config/patterns"]
         # 基于指定字段进行匹配
         # match => { "message" => "%{TEACHER:teacher}school%{YEAR:year} 教室%{CLASSROOMNUMBER:classroom_number}"}
         match => { "message" => "%{TEACHER:teacher}.{6}%{YEAR:year} 教室%{CLASSROOMNUMBER:classroom_number}"}
         add_field => {"custom-type" => "jiaoshi07-tcp"}
    }
  }else {
    mutate {
       add_field => { 
           "school" => "linuxschool" 
           "class" => "linux85"
           "custom-type" => "jiaoshi07-http"
       } 
    }
  }

}

output { 
 stdout {} 
}

#启动logstash
[root@elk101.com ~]# logstash -rf config/tcp-grok_custom_pattern_if-es.conf

#测试
#tcp测试
[root@elk103.com ~]# echo "LINUXschool2003 教室07" |nc 10.0.0.101 9999
#logstash输出
{
          "@timestamp" => 2023-04-09T12:52:11.314Z,
             "teacher" => "LINUX",
                "port" => 41790,
                "year" => "2003",
    "classroom_number" => "07",
            "@version" => "1",
                "host" => "elk103.com",
                "type" => "tcp",
             "message" => "LINUXschool2003 教室07",
         "custom-type" => "jiaoshi07-tcp"
}


#http测试
POST 10.0.0.101:8080
{
    "name":"stu01",
    "age": 18
}

#logstash输出
{
          "class" => "linux85",
     "@timestamp" => 2023-04-09T12:53:25.784Z,
        "headers" => {
         "content_length" => "40",
          "postman_token" => "d8590dde-8ddb-4b86-8a03-00aa0efe5698",
        "accept_encoding" => "gzip, deflate, br",
             "connection" => "keep-alive",
        "http_user_agent" => "PostmanRuntime/7.26.10",
           "content_type" => "application/json",
           "http_version" => "HTTP/1.1",
              "http_host" => "10.0.0.101:8080",
            "http_accept" => "*/*",
         "request_method" => "POST",
           "request_path" => "/"
    },
           "name" => "stu01",
       "@version" => "1",
            "age" => 18,
           "host" => "10.0.0.1",
           "type" => "http",
         "school" => "linuxschool",
    "custom-type" => "jiaoshi07-http"
}


#filebeat测试
[root@elk103.com /tmp/test-filebeat]# cat access.log 
101.226.61.184 - - [22/Nov/2015:11:02:00 +0800] "GET /mobile/sea-modules/gallery/zepto/1.1.3/zepto.js HTTP/1.1" 200 24662 "http://m.linuxyuan.com.cn/mobile/theme/oldboy/home/index.html" "Mozilla/5.0 (Linux; U; Android 5.1.1; zh-cn; HUAWEI CRR-UL00 Build/HUAWEICRR-UL00) AppleWebKit/533.1 (KHTML, like Gecko)Version/4.0 MQQBrowser/5.4 TBS/025478 Mobile Safari/533.1 MicroMessenger/6.3.7.51_rbb7fa12.660 NetType/3gnet Language/zh_CN"


#filebeat配置文件
[root@elk103.com /es/softwares/filebeat-7.17.5-linux-x86_64/config]# cat nginx-to-logstash-if.yaml
filebeat.inputs:
- type: log
  paths:
    - /tmp/test-filebeat/access.log  

output.logstash:
  hosts: ["10.0.0.101:8888"]


#启动filebeat
[root@elk103.com /es/softwares/filebeat-7.17.5-linux-x86_64]# filebeat -e -c config/nginx-to-logstash-if.yaml

#logstash输出
{
            "ident" => "-",
            "bytes" => "24662",
             "verb" => "GET",
        "timestamp" => "22/Nov/2015:11:02:00 +0800",
         "referrer" => "\"http://m.linuxyuan.com.cn/mobile/theme/oldboy/home/index.html\"",
      "httpversion" => "1.1",
            "geoip" => {
         "country_code3" => "CN",
          "country_name" => "China",
        "continent_code" => "AS",
                    "ip" => "101.226.61.184",
              "timezone" => "Asia/Shanghai",
         "country_code2" => "CN",
              "location" => {
            "lat" => 34.7732,
            "lon" => 113.722
        },
             "longitude" => 113.722,
              "latitude" => 34.7732
    },
         "clientip" => "101.226.61.184",
          "message" => "101.226.61.184 - - [22/Nov/2015:11:02:00 +0800] \"GET /mobile/sea-modules/gallery/zepto/1.1.3/zepto.js HTTP/1.1\" 200 24662 \"http://m.linuxyuan.com.cn/mobile/theme/oldboy/home/index.html\" \"Mozilla/5.0 (Linux; U; Android 5.1.1; zh-cn; HUAWEI CRR-UL00 Build/HUAWEICRR-UL00) AppleWebKit/533.1 (KHTML, like Gecko)Version/4.0 MQQBrowser/5.4 TBS/025478 Mobile Safari/533.1 MicroMessenger/6.3.7.51_rbb7fa12.660 NetType/3gnet Language/zh_CN\"",
      "custom-type" => [
        [0] "jiaoshi07-beats",
        [1] "jiaoshi07-http"
    ],
           "school" => "linuxschool",
       "@timestamp" => 2023-04-09T13:03:50.679Z,
         "response" => "200",
    "linux-es-date" => 2015-11-22T03:02:00.000Z,
         "@version" => "1",
             "type" => "beats",
          "request" => "/mobile/sea-modules/gallery/zepto/1.1.3/zepto.js",
            "class" => "linux85",
             "auth" => "-"
}



#从custom-type标签可以看出,他不仅执行了单独的if,还执行了下边的else条件
"custom-type" => [
        [0] "jiaoshi07-beats",
        [1] "jiaoshi07-http"
    ]

8.logstash的多分支

#logstash配置文件
[root@elk101.com ~/config]# cat tcp-grok_custom_pattern_eles_if-es.conf
input { 
  beats {
    port => 8888
    type => "beats"
  }

  tcp {
    port => 9999
    type => "tcp"
  }

  http {
  #默认8080端口
    type => "http"
  }
} 


filter {
  if [type] == "beats" {
      grok {
         match => { "message" => "%{HTTPD_COMBINEDLOG}" }
         remove_field => [ "agent","log","input","host","ecs","tags" ]
      }

      geoip {
         source => "clientip"
         add_field => {"custom-type" => "jiaoshi07-beats"}
      }
      
      date {
          match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
          timezone => "Asia/Shanghai"
          target => "linux-es-date"
      }
      #####################修改为else if#######################
  }else if [type] == "tcp" {
     grok {
         # 指定加载pattern匹配模式的目录,可以是相对路径,也可以是绝对路径
         patterns_dir => ["/root/config/patterns"]
         # 基于指定字段进行匹配
         # match => { "message" => "%{TEACHER:teacher}school%{YEAR:year} 教室%{CLASSROOMNUMBER:classroom_number}"}
         match => { "message" => "%{TEACHER:teacher}.{6}%{YEAR:year} 教室%{CLASSROOMNUMBER:classroom_number}"}
         add_field => {"custom-type" => "jiaoshi07-tcp"}
    }
  }else {
    mutate {
       add_field => { 
           "school" => "linuxschool" 
           "class" => "linux85"
           "custom-type" => "jiaoshi07-http"
       } 
    }
  }

}

output { 
 stdout {} 
}

#启动logstash
[root@elk101.com ~]# logstash -rf config/tcp-grok_custom_pattern_eles_if-es.conf

#测试
#tcp测试
[root@elk103.com ~]# echo "LINUXschool2003 教室07" |nc 10.0.0.101 9999
#logstash输出
{
                "type" => "tcp",
             "teacher" => "LINUX",
                "host" => "elk103.com",
                "year" => "2003",
          "@timestamp" => 2023-04-09T13:08:47.716Z,
             "message" => "LINUXschool2003 教室07",
    "classroom_number" => "07",
            "@version" => "1",
                "port" => 41798,
         "custom-type" => "jiaoshi07-tcp"
}


#http测试
POST 10.0.0.101:8080
{
    "name":"stu01",
    "age": 18
}

#logstash输出
{
           "type" => "http",
           "name" => "stu01",
            "age" => 18,
        "headers" => {
        "accept_encoding" => "gzip, deflate, br",
              "http_host" => "10.0.0.101:8080",
             "connection" => "keep-alive",
            "http_accept" => "*/*",
         "content_length" => "40",
           "http_version" => "HTTP/1.1",
        "http_user_agent" => "PostmanRuntime/7.26.10",
         "request_method" => "POST",
           "content_type" => "application/json",
          "postman_token" => "b84fb99a-d4d1-4627-a011-39923767265a",
           "request_path" => "/"
    },
           "host" => "10.0.0.1",
     "@timestamp" => 2023-04-09T13:09:00.354Z,
       "@version" => "1",
          "class" => "linux85",
    "custom-type" => "jiaoshi07-http",
         "school" => "linuxschool"
}


#filebeat测试
[root@elk103.com /tmp/test-filebeat]# cat access.log 
101.226.61.184 - - [22/Nov/2015:11:02:00 +0800] "GET /mobile/sea-modules/gallery/zepto/1.1.3/zepto.js HTTP/1.1" 200 24662 "http://m.linuxyuan.com.cn/mobile/theme/oldboy/home/index.html" "Mozilla/5.0 (Linux; U; Android 5.1.1; zh-cn; HUAWEI CRR-UL00 Build/HUAWEICRR-UL00) AppleWebKit/533.1 (KHTML, like Gecko)Version/4.0 MQQBrowser/5.4 TBS/025478 Mobile Safari/533.1 MicroMessenger/6.3.7.51_rbb7fa12.660 NetType/3gnet Language/zh_CN"


#filebeat配置文件
[root@elk103.com /es/softwares/filebeat-7.17.5-linux-x86_64/config]# cat nginx-to-logstash-else-if.yaml 
filebeat.inputs:
- type: log
  paths:
    - /tmp/test-filebeat/access.log  

output.logstash:
  hosts: ["10.0.0.101:8888"]


#启动filebeat
[root@elk103.com /es/softwares/filebeat-7.17.5-linux-x86_64]# filebeat -e -c config/nginx-to-logstash-else-if.yaml

#logstash输出
{
      "custom-type" => "jiaoshi07-beats",
             "type" => "beats",
      "httpversion" => "1.1",
            "geoip" => {
                    "ip" => "101.226.61.184",
              "latitude" => 34.7732,
             "longitude" => 113.722,
              "location" => {
            "lat" => 34.7732,
            "lon" => 113.722
        },
         "country_code2" => "CN",
         "country_code3" => "CN",
          "country_name" => "China",
              "timezone" => "Asia/Shanghai",
        "continent_code" => "AS"
    },
    "linux-es-date" => 2015-11-22T03:02:00.000Z,
             "auth" => "-",
          "request" => "/mobile/sea-modules/gallery/zepto/1.1.3/zepto.js",
            "bytes" => "24662",
          "message" => "101.226.61.184 - - [22/Nov/2015:11:02:00 +0800] \"GET /mobile/sea-modules/gallery/zepto/1.1.3/zepto.js HTTP/1.1\" 200 24662 \"http://m.linuxyuan.com.cn/mobile/theme/oldboy/home/index.html\" \"Mozilla/5.0 (Linux; U; Android 5.1.1; zh-cn; HUAWEI CRR-UL00 Build/HUAWEICRR-UL00) AppleWebKit/533.1 (KHTML, like Gecko)Version/4.0 MQQBrowser/5.4 TBS/025478 Mobile Safari/533.1 MicroMessenger/6.3.7.51_rbb7fa12.660 NetType/3gnet Language/zh_CN\"",
       "@timestamp" => 2023-04-09T13:10:01.230Z,
         "clientip" => "101.226.61.184",
            "ident" => "-",
         "@version" => "1",
             "verb" => "GET",
        "timestamp" => "22/Nov/2015:11:02:00 +0800",
         "response" => "200",
         "referrer" => "\"http://m.linuxyuan.com.cn/mobile/theme/oldboy/home/index.html\""
}




#从custom-type标签可以看出,他这次只执行了单独的if
"custom-type" => "jiaoshi07-beats",

补充:使用多分支语法分别将"beat,tcp,http"这3个输入类型写入ES集群对应不同的索引:

output { 
# stdout {} 

if [type] == "beats" {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "linux-es-beats"
  }
} else if  [type] == "tcp" { 
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "linux-es-tcp"
  }
} else {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "linux-es-http"
  }
}
}

9.多实例案例:拆分logstash配置文件

[root@elk101.com ~/config]# cat multiple_instance-http.conf 
input { 
  http {
    type => "http"
  }
} 


filter {
    mutate {
       add_field => { 
           "school" => "linuxschool" 
           "class" => "linux85"
           "custom-type" => "jiaoshi07-http"
       } 
    }
}

output { 
	stdout {} 
}

[root@elk101.com ~/config]# cat multiple_instance-tcp.conf 
input { 
  tcp {
    port => 9999
    type => "tcp"
  }
} 


filter {
   grok {
		# 指定加载pattern匹配模式的目录,可以是相对路径,也可以是绝对路径
         patterns_dir => ["/root/config/patterns"]
         # 基于指定字段进行匹配
         # match => { "message" => "%{TEACHER:teacher}school%{YEAR:year} 教室%{CLASSROOMNUMBER:classroom_number}"}
         match => { "message" => "%{TEACHER:teacher}.{6}%{YEAR:year} 教室%{CLASSROOMNUMBER:classroom_number}"}
         add_field => {"custom-type" => "jiaoshi07-tcp"}
  }

}

output { 
   stdout {} 
}

[root@elk101.com ~/config]# cat multiple_instance-beats.conf 
input { 
  beats {
    port => 8888
    type => "beats"
  }
} 


filter {
   grok {
      match => { "message" => "%{HTTPD_COMBINEDLOG}" }
      remove_field => [ "agent","log","input","host","ecs","tags" ]
   }

   geoip {
      source => "clientip"
      add_field => {"custom-type" => "jiaoshi07-beats"}
   }
   
   date {
       match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
       timezone => "Asia/Shanghai"
       target => "linux-es-date"
   }
}

output { 
	stdout {} 
}


#多实例运行,需要指定不同的path.data,要不就会报错
[root@elk101.com ~]# logstash -rf config/multiple_instance-tcp.conf --path.data /tmp/linux-es-tcp
[root@elk101.com ~]# logstash -rf config/multiple_instance-http.conf --path.data /tmp/linux-es-http
[root@elk101.com ~]# logstash -rf config/multiple_instance-beats.conf --path.data /tmp/linux-es-beats

10.logstash的pipline案例

[root@elk101.com ~/config]# cat pipeline-http.conf 
input { 
  http {
    type => "http"
  }
} 


filter {
    mutate {
       add_field => { 
           "school" => "linuxschool" 
           "class" => "linux85"
           "custom-type" => "jiaoshi07-http"
       } 
    }
}

output { 
	stdout {} 
}

[root@elk101.com ~/config]# cat pipeline-tcp.conf 
input { 
  tcp {
    port => 9999
    type => "tcp"
  }
} 


filter {
   grok {
		# 指定加载pattern匹配模式的目录,可以是相对路径,也可以是绝对路径
         patterns_dir => ["/root/config/patterns"]
         # 基于指定字段进行匹配
         # match => { "message" => "%{TEACHER:teacher}school%{YEAR:year} 教室%{CLASSROOMNUMBER:classroom_number}"}
         match => { "message" => "%{TEACHER:teacher}.{6}%{YEAR:year} 教室%{CLASSROOMNUMBER:classroom_number}"}
         add_field => {"custom-type" => "jiaoshi07-tcp"}
  }

}

output { 
   stdout {} 
}

[root@elk101.com ~/config]# cat pipeline-beats.conf 
input { 
  beats {
    port => 8888
    type => "beats"
  }
} 


filter {
   grok {
      match => { "message" => "%{HTTPD_COMBINEDLOG}" }
      remove_field => [ "agent","log","input","host","ecs","tags" ]
   }

   geoip {
      source => "clientip"
      add_field => {"custom-type" => "jiaoshi07-beats"}
   }
   
   date {
       match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
       timezone => "Asia/Shanghai"
       target => "linux-es-date"
   }
}

output { 
	stdout {} 
}


#修改pipline的配置文件
#id名字只要不重复即可
[root@elk101.com ~]# cat /es/softwares/logstash-7.17.5/config/pipelines.yml
- pipeline.id: linux-es-pipeline-beats
  path.config: "/root/config/pipeline-beats.conf"
- pipeline.id: linux-es-pipeline-tcp
  path.config: "/root/config/pipeline-tcp.conf"
- pipeline.id: linux-es-pipeline-http
  path.config: "/root/config/pipeline-http.conf"


#启动logstash
[root@elk101.com ~]# logstash

补充:

logstash启动时不-f指定配置文件的话,会默认加载logstash-7.17.5/config/pipelines.yml

#指定配置文件会有如下提示
[2023-04-09T21:23:36,494][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified

11.logstash的useragent过滤器

#logstash配置文件
[root@elk101.com ~/config]# cat beats-grok_geoip_date_useragent-es.conf
input { 
  beats {
    port => 8888
  }
} 


filter {
   mutate {
      remove_field => [ "agent","log","input","host","ecs","tags" ]
   }

  geoip {
     source => "clientip"
  }

  date {
      match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
      timezone => "Asia/Shanghai"
  }

  # 用于分析客户端设备类型的插件
  useragent {
    source => "http_user_agent"
	#指定将解析的数据放在哪个字段,若不指定,则默认放在顶级字段中
    #target => "linux-es-agent"
  }

}

output { 
	stdout {} 
  
}
#启动logstash
[root@elk101.com ~]# logstash -rf config/beats-grok_geoip_date_useragent-es.conf


#filebeat配置文件
[root@elk103.com /es/softwares/filebeat-7.17.5-linux-x86_64/config]# cat nginx_useragent-to-logstash.yaml
filebeat.inputs:
- type: log
  paths:
    - /var/log/nginx/access.log*
  json:
    keys_under_root: true
    add_error_key: true
    overwrite_keys: true 

output.logstash:
  hosts: ["10.0.0.101:8888"]
#启动filebeat
[root@elk103.com /es/softwares/filebeat-7.17.5-linux-x86_64]# filebeat -e -c config/nginx_useragent-to-logstash.yaml

#测试
#filebeat的nginx文件
[root@elk103.com /tmp/test-filebeat]# cat /var/log/nginx/access.log-20230408
{"@timestamp":"2023-04-08T15:19:24+08:00","host":"10.0.0.103","clientip":"221.218.208.84","SendBytes":555,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"10.0.0.103","uri":"/favicon.ico","domain":"10.0.0.103","xff":"-","referer":"http://10.0.0.103/","tcp_xff":"-","http_user_agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36","status":"404"}


#logstash输出
{
            "tcp_xff" => "-",
       "responsetime" => 0,
                "uri" => "/favicon.ico",
         "@timestamp" => 2023-04-08T07:19:24.000Z,
              "patch" => "0",
            "os_full" => "Windows 10",
            "os_name" => "Windows",
                 "os" => "Windows",
             "status" => "404",
             "device" => "Other",
              "geoip" => {
              "latitude" => 39.9143,
           "region_name" => "Beijing",
                    "ip" => "221.218.208.84",
              "timezone" => "Asia/Shanghai",
           "region_code" => "BJ",
             "city_name" => "Beijing",
         "country_code3" => "CN",
          "country_name" => "China",
             "longitude" => 116.3861,
              "location" => {
            "lat" => 39.9143,
            "lon" => 116.3861
        },
        "continent_code" => "AS",
         "country_code2" => "CN"
    },
    "http_user_agent" => "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36",
            "referer" => "http://10.0.0.103/",
           "os_major" => "10",
           "@version" => "1",
       "upstreamtime" => "-",
           "clientip" => "221.218.208.84",
               "name" => "Chrome",
         "os_version" => "10",
          "SendBytes" => 555,
                "xff" => "-",
              "major" => "112",
              "minor" => "0",
            "version" => "112.0.0.0",
          "http_host" => "10.0.0.103",
       "upstreamhost" => "-",
             "domain" => "10.0.0.103"
}

12.logstash的mutate

#mutate组件数据准备-python脚本
cat > generate_log.py  <<EOF
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author : wh

import datetime
import random
import logging
import time
import sys

LOG_FORMAT = "%(levelname)s %(asctime)s [com.yuanlinux.%(module)s] - %(message)s "
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"

# 配置root的logging.Logger实例的基本配置
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, datefmt=DATE_FORMAT, filename=sys.argv[1]
, filemode='a',)
actions = ["浏览页面", "评论商品", "加入收藏", "加入购物车", "提交订单", "使用优惠券", "领取优惠券",
 "搜索", "查看订单", "付款", "清空购物车"]

while True:
    time.sleep(random.randint(1, 5))
    user_id = random.randint(1, 10000)
    # 对生成的浮点数保留2位有效数字.
    price = round(random.uniform(15000, 30000),2)
    action = random.choice(actions)
    svip = random.choice([0,1])
    logging.info("DAU|{0}|{1}|{2}|{3}".format(user_id, action,svip,price))
EOF

[root@elk103.com /tmp/test-filebeat]# cat  generate_log.py 
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author : wh

import datetime
import random
import logging
import time
import sys

LOG_FORMAT = "%(levelname)s %(asctime)s [com.yuanlinux.%(module)s] - %(message)s "
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"

# 配置root的logging.Logger实例的基本配置
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, datefmt=DATE_FORMAT, filename=sys.argv[1]
, filemode='a',)
actions = ["浏览页面", "评论商品", "加入收藏", "加入购物车", "提交订单", "使用优惠券", "领取优惠券",
 "搜索", "查看订单", "付款", "清空购物车"]

while True:
    time.sleep(random.randint(1, 5))
    user_id = random.randint(1, 10000)
    # 对生成的浮点数保留2位有效数字.
    price = round(random.uniform(15000, 30000),2)
    action = random.choice(actions)
    svip = random.choice([0,1])
    logging.info("DAU|{0}|{1}|{2}|{3}".format(user_id, action,svip,price))

#运行脚本
[root@elk103.com /tmp/test-filebeat]# python generate_log.py  /tmp/app.log

#后台运行nohup python generate_log.py  /tmp/app.log &>/dev/null &

#作用就是生成日志
[root@elk103.com /es/softwares/filebeat-7.17.5-linux-x86_64/config]# cat /tmp/app.log
INFO 2023-04-09 21:50:00 [com.yuanlinux.generate_log] - DAU|3235|领取优惠券|1|28507.86 
INFO 2023-04-09 21:50:04 [com.yuanlinux.generate_log] - DAU|3011|付款|0|20234.68 
INFO 2023-04-09 21:50:06 [com.yuanlinux.generate_log] - DAU|5890|加入收藏|1|23764.09 
INFO 2023-04-09 21:50:08 [com.yuanlinux.generate_log] - DAU|4794|评论商品|1|22223.06 
INFO 2023-04-09 21:50:10 [com.yuanlinux.generate_log] - DAU|5464|浏览页面|1|23107.91 
INFO 2023-04-09 21:50:12 [com.yuanlinux.generate_log] - DAU|638|查看订单|1|27998.74
#logstash配置文件
[root@elk101.com ~/config]# cat beats-mutate-es.conf
input { 
  beats {
    port => 9999
  }
} 


filter {
   mutate {
      remove_field => [ "agent","log","input","host","ecs","tags" ]
   }
  
   mutate {
      # 将message字段使用"|"进行切分
      split => { "message" => "|" }
   }

   mutate {
     add_field => {
        userid => "%{[message][1]}"
        verb => "%{[message][2]}"
        svip => "%{[message][3]}"
        price => "%{[message][4]}"
     }
   }
  
   mutate {
   #重命名
     rename => {
        "verb" => "action"
     }
   }

   mutate {
   	 #转换类型
     convert => {
       "userid" => "integer"
       #将1和2装化为true和flase
       "svip" => "boolean"
       "price" => "float"
     }
   }


}

output { 
  stdout {} 
}
#运行logstash
[root@elk101.com ~]# logstash -rf config/beats-mutate-es.conf


#filebeat配置文件
[root@elk103.com /es/softwares/filebeat-7.17.5-linux-x86_64/config]# cat apps-to-logstash.yaml
filebeat.inputs:
- type: log
  paths:
    - /tmp/app.log 

output.logstash:
  hosts: ["10.0.0.101:9999"]
#运行filebeat
[root@elk103.com /es/softwares/filebeat-7.17.5-linux-x86_64]# filebeat -e -c config/apps-to-logstash.yaml
  
#logstash输出数据
{
       "message" => [
        [0] "INFO 2023-04-09 22:00:20 [com.yuanlinux.generate_log] - DAU",
        [1] "6064",
        [2] "使用优惠券",
        [3] "0",
        [4] "20900.52 "
    ],
          "svip" => false,
         "price" => 20900.52,
      "@version" => "1",
        "action" => "使用优惠券",
    "@timestamp" => 2023-04-09T14:00:21.028Z,
        "userid" => 6064
}

#输出数据到es
[root@elk101.com ~/config]# cat beats-mutate-es.conf
input { 
  beats {
    port => 9999
  }
} 


filter {
   mutate {
      remove_field => [ "agent","log","input","host","ecs","tags" ]
   }
  
   mutate {
      # 将message字段使用"|"进行切分
      split => { "message" => "|" }
   }

   mutate {
     add_field => {
        userid => "%{[message][1]}"
        verb => "%{[message][2]}"
        svip => "%{[message][3]}"
        price => "%{[message][4]}"
     }
   }
  
   mutate {
     rename => {
        "verb" => "action"
     }
   }

   mutate {
     convert => {
       "userid" => "integer"
       "svip" => "boolean"
       "price" => "float"
     }
   }


}

output { 
	elasticsearch {
   		hosts => ["http://localhost:9200"]
		index => "linux-es-logstash-mutate"
	 }
}

13. mutate案例和kibana结合

13.1 查看索引

image-20230409220624061

13.2 创建索引模式

image-20230409220653659

13.3 创建discover

image-20230409220735120

13.4 设置自动刷新时间

image-20230409220758956

13.5 保存discover

image-20230409220835558

13.6 创建可视图

image-20230409221103974

image-20230409221115217

image-20230409221153549

image-20230409221221416

image-20230409221306817

13.7 创建仪表盘

image-20230409221357711

image-20230409221406970

image-20230409221507606

14. 地理位置案例

#测试
#01-创建索引映射
PUT	http://10.0.0.103:9200/linux-es-map
{
  "mappings": {
    "properties": {
      "location": { 
        "type": "geo_point"
      }
    }
  }
}


#02-写入地理位置-lat代表纬度,lon代表经度
POST http://10.0.0.103:9200/linux-es-map/_doc
{
  "location": { 
    "lat": 39.914,
    "lon": 116.386
  }
}

15. 修复nginx日志解析经纬度问题故障演练

image-20230409221658422

image-20230409221738779

image-20230409221810347

14.将nginx日志分析,通过kibana展示数据,pv,带宽总量,公网IP的Top10统计等信息。

[root@elk101.com ~/config]# cat beats-grok_geoip_date_useragent_100wdata-es.conf
input { 
  beats {
    port => 8888
  }
} 


filter {
    grok {
      match => { "message" => "%{HTTPD_COMBINEDLOG}" }
   }

   mutate {
      remove_field => [ "agent","log","input","host","ecs","tags" ]
   }

  geoip {
     source => "clientip"
  }

  date {
      match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
      timezone => "Asia/Shanghai"
  }

  # 用于分析客户端设备类型的插件
  useragent {
    source => "http_user_agent"
	#会把解析的数据放在这个字段里
    #target => "linux-es-agent"
  }

}

output { 
   elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "linux-es-nginx-project"
    #虚拟机扛不住,按日期分类
    #index => "linux-es-nginx-project-%{+yyyy-MM-dd}"
    #这里要获取地址,所以要取得名字能匹配上设置完地址的那个ip索引模板
  } 
}
#启动logstash
[root@elk101.com ~]# logstash -rf config/beats-grok_geoip_date_useragent_100wdata-es.conf

[root@elk103.com /es/softwares/filebeat-7.17.5-linux-x86_64/config]# cat nginx-100wdata-logstash.yaml
filebeat.inputs:
- type: log
  paths:
    - /tmp/test-filebeat/access100w.log

# 将数据输出到logstash中
output.logstash:
  # 指定logstash的主机和端口
  hosts: ["10.0.0.101:8888"]