ELK+kafka集群部署

发布时间 2023-04-04 13:51:47作者: Jerry·

前言

业务层可以直接写入到kafka队列中,不用担心elasticsearch的写入效率问题。

消息系统主要功能

1、解耦 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束 2、冗余 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。 3、扩展性 因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。 4、灵活性 & 峰值处理能力 在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。 如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。 5、可恢复性  系统的一部分组件失效时,不会影响到整个系统。 消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。 6、顺序保证 在大多使用场景下,数据处理的顺序都很重要。 大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性) 7、缓冲 有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。 8、异步通信 很多时候,用户不想也不需要立即处理消息。 消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

Redis与Kafka

我们都知道Redis是以key的hash方式来分散对列存储数据的,且Redis作为集群使用时,对应的应用对应一个Redis,在某种程度上会造成数据的倾斜性,从而导致数据的丢失。 而从之前我们部署Kafka集群来看,kafka的一个topic(主题),可以有多个partition(副本),而且是均匀的分布在Kafka集群上,这就不会出现redis那样的数据倾斜性。Kafka同时也具备Redis的冗余机制,像Redis集群如果有一台机器宕掉是很有可能造成数据丢失,而Kafka因为是均匀的分布在集群主机上,即使宕掉一台机器,是不会影响使用。同时Kafka作为一个订阅消息系统,还具备每秒百万级别的高吞吐量,持久性的、分布式的特点等。

架构图

说明

说明 1、可以使用一台Nginx代理访问kibana的请求; 2、三台es组成es集群,并且在三台es上面都安装kibana;( 以下对elasticsearch简称es ),两台logstash; 3、中间三台服务器就是我的kafka(zookeeper)集群啦; 上面写的 消费者/生产者 这是kafka(zookeeper)中的概念; 4、使用filebeat收集日志(windows linux等)

角色

1.es1+zookeeper+kafka+logstash: 192.168.11.156

2.es2+zookeeper+kafka+logstash: 192.168.11.157

3.es3+zookeeper+kafka: 192.168.11.159

4.kibana: 192.168.11.156

5.filebeat:客户端安装

软件说明

1.es : 7.14.0

2.logstash : 7.14.0

3.kibana : 7.14.0

4.filebeat : 7.14.0

5.zookeeper : 3.4.14

6.kafka : 2.13

7.jdk : 1.8

安装步骤

1、ES集群安装配置;

2、Logstash客户端配置(直接写入数据到ES集群,写入系统messages日志);

3、Kafka+zookeeper集群配置;(Logstash写入数据到Kafka消息系统);

4、Kibana部署;

5、filebeat安装;

详细安装步骤

一.安装es集群

1.上传elasticsearch-7.14.0-linux-x86_64.tar.gz安装包到服务器/data下,并安装jdk;

2.解压安装包;

cd /data
tar -zxvf elasticsearch-7.14.0-linux-x86_64.tar.gz

3.修改服务器配置;

# 创建elk用户
useradd elk elk
# 修改elk用户拥有的内存权限至少需要262144;
sysctl -w vm.max_map_count=262144
# 修改 /etc/sysctl.conf文件
vm.max_map_count=262144
# 修改/etc/security/limits.conf
# 在文件末尾添加下面的参数值
* soft nofile 65536
* hard nofile 131072

4.修改es配置文件;

# 修改相应的选项
vi /data/elasticsearch-7.14.0/config/elasticsearch.yml


# ======================== Elasticsearch Configuration =========================
#
# NOTE: Elasticsearch comes with reasonable defaults for most settings.
#       Before you set out to tweak and tune the configuration, make sure you
#       understand what are you trying to accomplish and the consequences.
#
# The primary way of configuring a node is via this file. This template lists
# the most important settings you may want to configure for a production cluster.
#
# Please consult the documentation for further information on configuration options:
# https://www.elastic.co/guide/en/elasticsearch/reference/index.html
#
# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:
#
# 集群名称
cluster.name: elk-boer
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
# 节点名称,不能重复
node.name: elk01
#
# Add custom attributes to the node:
#
#node.attr.rack: r1
#
# 是否能当master
node.master: true
#
# 是否能储存数据
node.data: true
#
# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
path.data: /data/elasticsearch-7.14.0/data
# 
#
# Path to log files:
#
#path.logs: /data/elasticsearch-7.14.0/logs
#
# ----------------------------------- Memory -----------------------------------
#
# Lock the memory on startup:
#
#bootstrap.memory_lock: true
#
# Make sure that the heap size is set to about half the memory available
# on the system and that the owner of the process is allowed to use this
# limit.
#
# Elasticsearch performs poorly when the system is swapping the memory.
#
# ---------------------------------- Network -----------------------------------
#
# By default Elasticsearch is only accessible on localhost. Set a different
# address here to expose this node on the network:
#
#network.host: 192.168.0.1
# 配置可以访问的IP,无其他要求不修改
network.host: 0.0.0.0
#
# By default Elasticsearch listens for HTTP traffic on the first free port it
# finds starting at 9200. Set a specific HTTP port here:
#
# es服务端口号
http.port: 9200
#
# For more information, consult the network module documentation.
#
# --------------------------------- Discovery ----------------------------------
#
# Pass an initial list of hosts to perform discovery when this node is started:
# The default list of hosts is ["127.0.0.1", "[::1]"]
#
#discovery.seed_hosts: ["host1", "host2"]
# 集群配置
discovery.seed_hosts: ["elk01", "elk02","elk03"]
#
# Bootstrap the cluster using an initial set of master-eligible nodes:
#
#cluster.initial_master_nodes: ["node-1", "node-2"]
# 集群配置
cluster.initial_master_nodes: ["elk01", "elk02","elk03"]
#
# For more information, consult the discovery and cluster formation module documentation.
#
# ---------------------------------- Various -----------------------------------
#
# Require explicit names when deleting indices:
#
#action.destructive_requires_name: true

5.启动es;

#!/bin/sh
# 用elk用户 -d参数后台启动,ES_JAVA_OPTS参数为内存的一半
sudo -u elk ES_JAVA_OPTS="-Xms8192m -Xmx8192m"  /data/elasticsearch-7.14.0/bin/elasticsearch  -d

6.访问IP:9200 ;

[root@elk01 elasticsearch-7.14.0]# curl http://192.168.11.157:9200
{
  "name" : "elk02",
  "cluster_name" : "elk-boer",
  "cluster_uuid" : "QrvFk9tSQT2qPD7kBHPBLw",
  "version" : {
    "number" : "7.14.0",
    "build_flavor" : "default",
    "build_type" : "tar",
    "build_hash" : "dd5a0a2acaa2045ff9624f3729fc8a6f40835aa1",
    "build_date" : "2021-07-29T20:49:32.864135063Z",
    "build_snapshot" : false,
    "lucene_version" : "8.9.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}

二.安装zoopeeker

1.安装jdk1.8,并解压zookeeper压缩包;

2.修改/etc/profile;

vim  /etc/profile

JAVA_HOME=/java/jdk1.8.0_161
PATH=$JAVA_HOME/bin:$PATH
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME
export PATH
export CLASSPATH

3.使用source命令使profile文件内容生效 ;

source  /etc/profile
java -version

4.修改配置文件;

cd /data/zookeeper/conf
mv  zoo_sample.cfg  zoo.cfg
vim  zoo.cfg

5.将 zookeeper集群的三台服务器地址填入,端口默认 ;

# zookeeper1 配置文件实例
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/data/zookeeper
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.1=192.168.11.156:2888:3888
server.2=192.168.11.157:2888:3888
server.3=192.168.11.159:2888:3888

6.创建tmp文件夹以及创建myid文件 ;

cd  /data/zookeeper/
echo 1 > myid   #zookeeper1创建
echo 2 > myid   #zookeeper2创建
echo 3 > myid   #zookeeper3创建

7.启动所有Zookeeper ;

cd  /data/zookeeper/bin
./zkServer.sh  start

8.注册服务

三.kafka

1.解压kafka_2.13-2.6.0.tgz压缩包;

2.修改配置文件;

cd  /data/kafka/config
vim  server.properties  
broker.id=1                 #三台服务器id分别为1、2、3

listeners=PLAINTEXT://192.168.11.156:9092     #本服务器地址(端口号默认)

advertised.listeners=PLAINTEXT://192.168.11.156:9092     #本服务器地址(端口号默认)

log.dirs=/data/kafka/logs      #log文件位置

zookeeper.connect=192.168.11.156:2181,192.168.11.157:2181,192.168.11.159:2181    #zookeeper服务器集群IP(端口号默认)

3.启动kafka

cd  /data/kafka
./bin/kafka-server-start.sh  -daemon  ./config/server.properties     #-daemon为后台启动参数

四.安装Logstash

1.上传并解压logstash-7.14.0-linux-x86_64.tar.gz;

2.修改配置文件(没有可以创建);

input{
      kafka {
        bootstrap_servers => "192.168.11.156:9092,192.168.11.157:9092,192.168.11.159:9092" #kafka服务器地址
        topics => "monitoring-log"
        group_id => "elk-boer"
        decorate_events => true #kafka标记
        consumer_threads => 1
        codec => "json" #写入的时候使用json编码,因为logstash收集后会转换成json格式
    }
}
output{
    elasticsearch {
        hosts => ["192.168.11.156:9200",
                  "192.168.11.157:9200",
                  "192.168.11.159:9200"]
        index => "monitoring-log-%{+YYYY.MM}"
    }
}

3.启动logstash;

#!/bin/sh
nohup ./bin/logstash -f config/logstash.conf & >./nohup.out

五.安装kibana

1.上传并解压kibana-7.14.0-linux-x86_64.tar.gz压缩包到/data;

2.修改配置文件;

# Kibana is served by a back end server. This setting specifies the port to use.
#server.port: 5601

# Specifies the address to which the Kibana server will bind. IP addresses and host names are both valid values.
# The default is 'localhost', which usually means remote machines will not be able to connect.
# To allow connections from remote users, set this parameter to a non-loopback address.
#server.host: "localhost"
server.host: "0.0.0.0"

# Enables you to specify a path to mount Kibana at if you are running behind a proxy.
# Use the `server.rewriteBasePath` setting to tell Kibana if it should remove the basePath
# from requests it receives, and to prevent a deprecation warning at startup.
# This setting cannot end in a slash.
#server.basePath: ""

# Specifies whether Kibana should rewrite requests that are prefixed with
# `server.basePath` or require that they are rewritten by your reverse proxy.
# This setting was effectively always `false` before Kibana 6.3 and will
# default to `true` starting in Kibana 7.0.
#server.rewriteBasePath: false

# Specifies the public URL at which Kibana is available for end users. If
# `server.basePath` is configured this URL should end with the same basePath.
#server.publicBaseUrl: ""

# The maximum payload size in bytes for incoming server requests.
#server.maxPayload: 1048576

# The Kibana server's name.  This is used for display purposes.
#server.name: "your-hostname"

# The URLs of the Elasticsearch instances to use for all your queries.

3.启动kibana;

#!/bin/sh
sudo -u elk /data/kibana-7.14.0/bin/kibana -c config/kibana.yml & >/data/kibana-7.14.0/kibana.log

六.安装filebeat

1.上传并解压filebeat-7.14.0-linux-x86_64.tar.gz文件到/data;

2.修改配置文件;

cd /data/filebeat-7.14.0
vi filebeat.yml
###################### Filebeat Configuration Example #########################

# This file is an example configuration file highlighting only the most common
# options. The filebeat.reference.yml file from the same directory contains all the
# supported options with more comments. You can use it as a reference.
#
# You can find the full configuration reference here:
# https://www.elastic.co/guide/en/beats/filebeat/index.html

# For more available modules and options, please see the filebeat.reference.yml sample
# configuration file.

# ============================== Filebeat inputs ===============================

filebeat.inputs:

# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.

- type: log

  # Change to true to enable this input configuration.
  # enabled: false

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /data/log/*.log
    #- c:\programdata\elasticsearch\logs\*

  # Exclude lines. A list of regular expressions to match. It drops the lines that are
  # matching any regular expression from the list.
  #exclude_lines: ['^DBG']

  # Include lines. A list of regular expressions to match. It exports the lines that are
  # matching any regular expression from the list.
  #include_lines: ['^ERR', '^WARN']

  # Exclude files. A list of regular expressions to match. Filebeat drops the files that
  # are matching any regular expression from the list. By default, no files are dropped.
  #exclude_files: ['.gz$']

  # Optional additional fields. These fields can be freely picked
  # to add additional information to the crawled log files for filtering
  #fields:
  #  level: debug
  #  review: 1

  ### Multiline options

  # Multiline can be used for log messages spanning multiple lines. This is common
  # for Java Stack Traces or C-Line Continuation

  # The regexp Pattern that has to be matched. The example pattern matches all lines starting with [
  #multiline.pattern: ^\[

  # Defines if the pattern set under pattern should be negated or not. Default is false.
  #multiline.negate: false

  # Match can be set to "after" or "before". It is used to define if lines should be append to a pattern
  # that was (not) matched before or after or as long as a pattern is not matched based on negate.
  # Note: After is the equivalent to previous and before is the equivalent to to next in Logstash
  #multiline.match: after

# filestream is an input for collecting log messages from files. It is going to replace log input in the future.
- type: filestream

  # Change to true to enable this input configuration.
  #enabled: false

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /data/log/*.log
    #- c:\programdata\elasticsearch\logs\*

  # Exclude lines. A list of regular expressions to match. It drops the lines that are
  # matching any regular expression from the list.
  #exclude_lines: ['^DBG']

  # Include lines. A list of regular expressions to match. It exports the lines that are
  # matching any regular expression from the list.
  #include_lines: ['^ERR', '^WARN']

  # Exclude files. A list of regular expressions to match. Filebeat drops the files that
  # are matching any regular expression from the list. By default, no files are dropped.
  #prospector.scanner.exclude_files: ['.gz$']

  # Optional additional fields. These fields can be freely picked
  # to add additional information to the crawled log files for filtering
  #fields:
  #  level: debug
  #  review: 1

# ============================== Filebeat modules ==============================

filebeat.config.modules:
  # Glob pattern for configuration loading
  path: ${path.config}/modules.d/*.yml

  # Set to true to enable config reloading
  reload.enabled: false

  # Period on which files under path should be checked for changes
  #reload.period: 10s

# ======================= Elasticsearch template setting =======================

setup.template.settings:
  index.number_of_shards: 3
  #index.codec: best_compression
  #_source.enabled: false


# ================================== General ===================================

# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
#name:

# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]

# Optional fields that you can specify to add additional information to the
# output.
#fields:
#  env: staging

# ================================= Dashboards =================================
# These settings control loading the sample dashboards to the Kibana index. Loading
# the dashboards is disabled by default and can be enabled either by setting the
# options here or by using the `setup` command.
#setup.dashboards.enabled: false

# The URL from where to download the dashboards archive. By default this URL
# has a value which is computed based on the Beat name and version. For released
# versions, this URL points to the dashboard archive on the artifacts.elastic.co
# website.
#setup.dashboards.url:

# =================================== Kibana ===================================

# Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API.
# This requires a Kibana endpoint configuration.
setup.kibana:

  # Kibana Host
  # Scheme and port can be left out and will be set to the default (http and 5601)
  # In case you specify and additional path, the scheme is required: http://localhost:5601/path
  # IPv6 addresses should always be defined as: https://[2001:db8::1]:5601
  host: "192.168.11.156:5601"

  # Kibana Space ID
  # ID of the Kibana Space into which the dashboards should be loaded. By default,
  # the Default Space will be used.
  #space.id:

# =============================== Elastic Cloud ================================

# These settings simplify using Filebeat with the Elastic Cloud (https://cloud.elastic.co/).

# The cloud.id setting overwrites the `output.elasticsearch.hosts` and
# `setup.kibana.host` options.
# You can find the `cloud.id` in the Elastic Cloud web UI.
#cloud.id:

# The cloud.auth setting overwrites the `output.elasticsearch.username` and
# `output.elasticsearch.password` settings. The format is `<user>:<pass>`.
#cloud.auth:

# ================================== Outputs ===================================

# Configure what output to use when sending the data collected by the beat.

# ----------------------------     Kafka   Output   ----------------------------
output.kafka:
  enabled: true
  hosts: ["192.168.11.156:9092","192.168.11.157:9092","192.168.11.159:9092"]
  topic: "monitoring-log"
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

# ---------------------------- Elasticsearch Output ----------------------------
#output.elasticsearch:
  # Array of hosts to connect to.
  #hosts: ["192.168.11.157:9200"]

  # Protocol - either `http` (default) or `https`.
  #protocol: "https"

  # Authentication credentials - either API key or username/password.
  #api_key: "id:api_key"
  #username: "elastic"
  #password: "changeme"

# ------------------------------ Logstash Output -------------------------------
#output.logstash:
  # The Logstash hosts
  #hosts: ["localhost:5044"]

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"

# ================================= Processors =================================
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~

# ================================== Logging ===================================

# Sets log level. The default log level is info.
# Available log levels are: error, warning, info, debug
#logging.level: debug

# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publisher", "service".
#logging.selectors: ["*"]

# ============================= X-Pack Monitoring ==============================
# Filebeat can export internal metrics to a central Elasticsearch monitoring
# cluster.  This requires xpack monitoring to be enabled in Elasticsearch.  The
# reporting is disabled by default.

# Set to true to enable the monitoring reporter.
#monitoring.enabled: false

# Sets the UUID of the Elasticsearch cluster under which monitoring data for this
# Filebeat instance will appear in the Stack Monitoring UI. If output.elasticsearch
# is enabled, the UUID is derived from the Elasticsearch cluster referenced by output.elasticsearch.
#monitoring.cluster_uuid:

# Uncomment to send the metrics to Elasticsearch. Most settings from the
# Elasticsearch output are accepted here as well.
# Note that the settings should point to your Elasticsearch *monitoring* cluster.
# Any setting that is not set is automatically inherited from the Elasticsearch
# output configuration, so if you have the Elasticsearch output configured such
# that it is pointing to your Elasticsearch monitoring cluster, you can simply
# uncomment the following line.
#monitoring.elasticsearch:

# ============================== Instrumentation ===============================

# Instrumentation support for the filebeat.
#instrumentation:
    # Set to true to enable instrumentation of filebeat.
    #enabled: false

    # Environment in which filebeat is running on (eg: staging, production, etc.)
    #environment: ""

    # APM Server hosts to report instrumentation results to.
    #hosts:
    #  - http://localhost:8200

    # API Key for the APM Server(s).
    # If api_key is set then secret_token will be ignored.
    #api_key:

    # Secret token for the APM Server(s).
    #secret_token:


# ================================= Migration ==================================

# This allows to enable 6.7 migration aliases
#migration.6_to_7.enabled: true

  </code></pre>
</details>

3.启动filebeat;

#!/bin/sh
nohup ./filebeat -e -c filebeat.yml & >./nohup.out

5.安装windows版本filebeat,上传filebeat-7.14.0-windows-x86.zip,并解压;

6.修改配置文件同上;

7.注册成windows服务(注意安装路径修改)

sc create filebeat-7.14.0 binpath= "D:\filebeat-7.14.0\filebeat.exe -e -c D:\filebeat-7.14.0\filebeat.yml" type= own start= auto displayname= filebeat-7.14.0

8.启动windows服务;

7.访问kibana查看数据

默认没有开启密码验证登录。

1.http://192.168.11.156:5601/

查看用户为forlook/forlook

完(持续更新)