elk日志采集系统

发布时间 2023-07-27 17:31:39作者: 咿呀哒喏

下载地址:https://elasticsearch.cn/download/

安装包:

mkdir elk
cd elk

 

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.5.1-x86_64.rpm
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.5.1-x86_64.rpm
wget https://artifacts.elastic.co/downloads/kibana/kibana-8.5.1-x86_64.rpm
wget https://artifacts.elastic.co/downloads/logstash/logstash-8.5.1-linux-x86_64.tar.gz
wget https://artifacts.elastic.co/downloads/logstash/logstash-8.5.1-x86_64.rpm
#wget https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v8.5.1/elasticsearch-analysis-ik-8.5.1.zip


 

安装中文分词器插件:

yum -y install ./*.rpm
git clone https://gitee.com/dev-chen/elasticsearch-analysis-ik.git
cp -r elasticsearch-analysis-ik/ /etc/elasticsearch/plugins/ik

 

elasticsearch下载记录

--------------------------- Security autoconfiguration information ------------------------------

Authentication and authorization are enabled.
TLS for the transport and HTTP layers is enabled and configured.

The generated password for the elastic built-in superuser is : Ows7ypw76KVJLXnrsR2r

If this node should join an existing cluster, you can reconfigure this with
'/usr/share/elasticsearch/bin/elasticsearch-reconfigure-node --enrollment-token <token-here>'
after creating an enrollment token on your existing cluster.

You can complete the following actions at any time:

Reset the password of the elastic built-in superuser with
'/usr/share/elasticsearch/bin/elasticsearch-reset-password -u elastic'.

Generate an enrollment token for Kibana instances with
 '/usr/share/elasticsearch/bin/elasticsearch-create-enrollment-token -s kibana'.

Generate an enrollment token for Elasticsearch nodes with
'/usr/share/elasticsearch/bin/elasticsearch-create-enrollment-token -s node'.

-------------------------------------------------------------------------------------------------
### NOT starting on installation, please execute the following statements to configure elasticsearch service to start automatically using systemd
 sudo systemctl daemon-reload
 sudo systemctl enable elasticsearch.service
### You can start elasticsearch service by executing
 sudo systemctl start elasticsearch.service

 

[root@test-linux ~]# ls /etc/elasticsearch/
certs                   elasticsearch-plugins.example.yml  jvm.options    log4j2.properties  role_mapping.yml  users
elasticsearch.keystore  elasticsearch.yml                  jvm.options.d  plugins            roles.yml         users_roles
[root@test-linux ~]# ls /etc/logstash/
conf.d  jvm.options  log4j2.properties  logstash-sample.conf  logstash.yml  pipelines.yml  startup.options
[root@test-linux ~]# ls /etc/filebeat/
fields.yml  filebeat.reference.yml  filebeat.yml  modules.d
[root@test-linux ~]# ls /etc/kibana/
kibana.keystore  kibana.yml  node.options
[root@test-linux ~]#

 elasticserach配置文件,注意此处的ssl配置,一旦启用ssl后面logstash和kibana的对接也得用https

# more  /etc/elasticsearch/elasticsearch.yml 
#::::::::::::::
#/etc/elasticsearch/elasticsearch.yml
#::::::::::::::
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
http.port: 9200
xpack.security.enabled: false
xpack.security.enrollment.enabled: true
xpack.security.http.ssl:
  enabled: false
  keystore.path: certs/http.p12
xpack.security.transport.ssl:
  enabled: false
  verification_mode: certificate
  keystore.path: certs/transport.p12
  truststore.path: certs/transport.p12
cluster.initial_master_nodes: ["ELK"]
http.host: 0.0.0.0
#::::::::::::::

#::::::::::::::


server.host为本机实际IP,ss -ntulp|grep 5601可以看到,此地址为对外通信的地址,如果写成回环地址无法继续访问

#more/etc/kibana/kibana.yml
::::::::::::::
server.port: 5601
server.host: "192.168.202.11"
server.name: "ELK"

elasticsearch.hosts: ["http://127.0.0.1:9200"]
elasticsearch.username: "kibana"
elasticsearch.password: "I3ZK+81kPWaEuJwVy=2k"

logging:
  appenders:
    file:
      type: file
      fileName: /var/log/kibana/kibana.log
      layout:
        type: json
  root:
    appenders:
      - default
      - file

i18n.locale: "zh-CN"

pid.file: /run/kibana/kibana.pid

filebeat配置文件,对接logstash

#more /etc/filebeat/filebeat.yml
#::::::::::::::
filebeat.inputs:
- type: log
  id: 1
  enabled: true
  paths:
    - /var/log/*.log
    - /var/log/*/*.log
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 1
setup.kibana:
#output.elasticsearch:
#  hosts: ["192.168.202.11:9200"]
output.logstash:
  hosts: ["192.168.202.11:5044"]
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~
#::::::::::::::

 

logstash配置文件,t对接filebeat作为input

#more /etc/logstash/conf.d/logstash-localhost.conf
#::::::::::::::
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  beats {
    port => 5044
  }
}

output {
  elasticsearch {
    hosts => ["http://127.0.0.1:9200"]
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    user => "elastic"
    password => "Vg0Xd-s9XHokOfIqlxMe"
  }
}
#::::::::::::::

 

#more /etc/logstash/conf.d/logstash-switch.conf
#::::::::::::::
input{
    tcp { port => 5002
    type => "Cisco"}
    udp { port => 514
    type => "HUAWEI"}
    udp { port => 5002
    type => "Cisco"}
    udp { port => 5003
    type => "H3C"}
}
filter {
    if [type] == "Cisco" {
    grok {
    match => { "message" => "<%{BASE10NUM:syslog_pri}>%{NUMBER:log_sequence}: .%{SYSLOGTIMESTAMP:timestamp}: %%{DATA:facility}-%{P
OSINT:severity}-%{CISCO_REASON:mnemonic}: %{GREEDYDATA:message}" }
    match => { "message" => "<%{BASE10NUM:syslog_pri}>%{NUMBER:log_sequence}: %{SYSLOGTIMESTAMP:timestamp}: %%{DATA:facility}-%{PO
SINT:severity}-%{CISCO_REASON:mnemonic}: %{GREEDYDATA:message}" }
    add_field => {"severity_code" => "%{severity}"}
    overwrite => ["message"]
    }
}
    elseif [type] == "H3C" {
    grok {
    match => { "message" => "<%{BASE10NUM:syslog_pri}>%{SYSLOGTIMESTAMP:timestamp} %{YEAR:year} %{DATA:hostname} %%%{DATA:vvmodule
}/%{POSINT:severity}/%{DATA:digest}: %{GREEDYDATA:message}" }
    remove_field => [ "year" ]
    add_field => {"severity_code" => "%{severity}"}
    overwrite => ["message"]
    }
}
        elseif [type] == "HUAWEI" {
    grok {
       match => { "message" => "<%{BASE10NUM:syslog_pri}>%{SYSLOGTIMESTAMP:timestamp} %{DATA:hostname} %%%{DATA:ddModuleName}/%{PO
SINT:severity}/%{DATA:Brief}:%{GREEDYDATA:message}"}
       match => { "message" => "<%{BASE10NUM:syslog_pri}>%{SYSLOGTIMESTAMP:timestamp} %{DATA:hostname} %{DATA:ddModuleName}/%{POSI
NT:severity}/%{DATA:Brief}:%{GREEDYDATA:message}"}
       remove_field => [ "timestamp" ]
    add_field => {"severity_code" => "%{severity}"}
    overwrite => ["message"]
    }
}
#mutate {
#        gsub => [
#        "severity", "0", "Emergency",
#        "severity", "1", "Alert",
#        "severity", "2", "Critical",
#        "severity", "3", "Error",
#        "severity", "4", "Warning",
#        "severity", "5", "Notice",
#        "severity", "6", "Informational",
#        "severity", "7", "Debug"
#        ]
#    }
}
output{
    stdout {
       codec => rubydebug
}
    elasticsearch {
        index =>
        "syslog-%{+YYYY.MM.dd}"
        hosts => ["http://127.0.0.1:9200"]
        user => "elastic"
        password => "Vg0Xd-s9XHokOfIqlxMe"
    }
}
[root@ELK ~]#
#more /etc/logstash/conf.d/logstash-snmp.conf.template
#::::::::::::::
input {
 snmp {
   interval => 60
   hosts => [{host => "udp:192.168.0.249/161" community => "public"},
             {host => "udp:192.168.20.253/161" community => "public"},
             {host => "udp:192.168.12.253/161" community => "public"},
             {host => "udp:192.168.0.241/161" community => "public"}
             ]
   tables => [{"name"=> "mac-address" "columns"=> ["1.3.6.1.2.1.17.4.3.1.1","1.3.6.1.2.1.17.4.3.1.2"] },
              {"name"=> "arp-address" "columns"=>["1.3.6.1.2.1.4.22.1.1","1.3.6.1.2.1.4.22.1.2","1.3.6.1.2.1.4.22.1.3"]}]
     }
}
filter {
clone {
clones => [event]
add_field => { "clone" => "true" }
}

if [clone] { mutate {remove_field => ["mac-address"] }}
else { mutate { remove_field => ["arp-address"]}}

if [mac-address] {
split { field => "mac-address" }
mutate {
rename => { "[mac-address][iso.org.dod.internet.mgmt.mib-2.dot1dBridge.dot1dTp.dot1dTpFdbTable.dot1dTpFdbEntry.dot1dTpFdbAddress]"
 => "MACaddress"}
rename => { "[mac-address][iso.org.dod.internet.mgmt.mib-2.dot1dBridge.dot1dTp.dot1dTpFdbTable.dot1dTpFdbEntry.dot1dTpFdbPort]" =>
 "FDBPort"}
remove_field => ["mac-address"]
add_field => {"cmdbtype" => "MACtable"}
}
elasticsearch {
hosts =>["192.168.202.11:9200"]
index => "nhserear-snmpfdbtable-2021.01.20"
query =>"fdbport:%{[FDBPort]} AND host:%{[host]}"
fields => { "ifDescr" => "ifDescr" }
}

}
if [arp-address] {
split { field => "arp-address" }
mutate {
rename => { "[arp-address][iso.org.dod.internet.mgmt.mib-2.ip.ipNetToMediaTable.ipNetToMediaEntry.ipNetToMediaIfIndex]" => "ifInde
x"}
rename => { "[arp-address][iso.org.dod.internet.mgmt.mib-2.ip.ipNetToMediaTable.ipNetToMediaEntry.ipNetToMediaNetAddress]" => "IPa
ddress"}
rename => { "[arp-address][iso.org.dod.internet.mgmt.mib-2.ip.ipNetToMediaTable.ipNetToMediaEntry.ipNetToMediaPhysAddress]" => "MA
Caddress"}
remove_field => ["arp-address"]
add_field => {"cmdbtype" => "ARPtable"}
}
elasticsearch {
hosts =>["192.168.202.11:9200"]
index => "nhserear-snmpiftable-2021.01.20"
query =>"ifIndex:%{[ifIndex]} AND host:%{[host]}"
fields => { "ifDescr" => "ifDescr" }
}

}
}

output {
 elasticsearch{
 hosts=> ["192.168.202.11:9200"]
 index=> "nhserear-snmp-%{+YYYY.MM.dd}"
}
}
#::::::::::::::