filebeat无法传递数据到kafka

发布时间 2023-07-08 17:28:15作者: wh459086748

一、故障现象

filebeat无法传递数据到kafka,但是topic却能创建成功,就很神奇。

kafka 10.0.7.7
filebeat 10.0.7.5

在kafka机器上执行消费者,一直没有数据
kafka-console-consumer.sh --bootstrap-server 10.0.7.7:9092 --topic filebeat --from-beginning

二、解决思路

1.查看本地的消费者和生产者是否可以传递数据

#生产者
[root@manager /data/zookeeper/logs]$ kafka-console-producer.sh --bootstrap-server 10.0.7.7:9092 --topic linux-zk
1
2
3

#消费者
[root@manager ~]$ kafka-console-consumer.sh --bootstrap-server 10.0.7.7:9092 --topic linux-zk --from-beginning
1
2
3

#查看之后发现没有异常

2.查看filebeat有无异常

#filebeat.yml 配置文件 一个很简单的输入输出
[root@b01 filebeat]# cat stdin-to-kafka.yaml 
filebeat.inputs:

- type: stdin
 
 
# 将数据输出到kafka
output.kafka:
  # 指定kafka主机列表
  hosts:
  - 10.0.7.7:9092
  # 指定kafka的topic
  topic: "filebeat"
 
#执行之后发现一直重复出现retryer: send unwait signal to consumer
[root@b01 filebeat]# ./filebeat -e -c stdin-to-kafka.yaml
2023-07-08T16:47:32.131+0800	INFO	[stdin.harvester]	log/harvester.go:309	Harvester started for paths: []	{"harvester_id": "934870c1-579f-4bda-ba6b-51f818d32ec8"}
1
2023-07-08T16:47:34.678+0800	INFO	[publisher]	pipeline/retry.go:219	retryer: send unwait signal to consumer
2023-07-08T16:47:34.678+0800	INFO	[publisher_pipeline_output]	pipeline/output.go:143	Connecting to kafka(10.0.7.7:9092)
2023-07-08T16:47:34.679+0800	INFO	[publisher_pipeline_output]	pipeline/output.go:151	Connection to kafka(10.0.7.7:9092) established
2023-07-08T16:47:34.678+0800	INFO	[publisher]	pipeline/retry.go:223	  done
2023-07-08T16:47:43.409+0800	INFO	[publisher]	pipeline/retry.go:219	retryer: send unwait signal to consumer
2023-07-08T16:47:43.409+0800	INFO	[publisher]	pipeline/retry.go:223	  done
2
2023-07-08T16:47:52.649+0800	INFO	[publisher]	pipeline/retry.go:219	retryer: send unwait signal to consumer
2023-07-08T16:47:52.649+0800	INFO	[publisher]	pipeline/retry.go:223	  done
2023-07-08T16:48:02.135+0800	INFO	[monitoring]	log/log.go:184	Non-zero metrics in the last 30s	{"monitoring": {"metrics": {"beat":{"cpu":{"system":{"ticks":60,"time":{"ms":68}},"total":{"ticks":200,"time":{"ms":213},"value":0},"user":{"ticks":140,"time":{"ms":145}}},"handles":{"limit":{"hard":4096,"soft":1024},"open":11},"info":{"ephemeral_id":"ecd19ccd-ff8e-42cf-a298-a5251d383b8c","uptime":{"ms":30067},"version":"7.17.5"},"memstats":{"gc_next":20310552,"memory_alloc":11365760,"memory_sys":34423816,"memory_total":57314664,"rss":75546624},"runtime":{"goroutines":43}},"filebeat":{"events":{"active":2,"added":2},"harvester":{"open_files":0,"running":1,"started":1}},"libbeat":{"config":{"module":{"running":0}},"output":{"events":{"active":2,"batches":4,"failed":2,"total":4},"type":"kafka"},"outputs":{"kafka":{"bytes_read":1110,"bytes_write":340}},"pipeline":{"clients":1,"events":{"active":2,"published":2,"retry":3,"total":2},"queue":{"max_events":4096}}},"registrar":{"states":{"current":0}},"system":{"cpu":{"cores":8},"load":{"1":0.15,"15":0.1,"5":0.08,"norm":{"1":0.0188,"15":0.0125,"5":0.01}}}}}}
2023-07-08T16:48:03.015+0800	INFO	[publisher]	pipeline/retry.go:219	retryer: send unwait signal to consumer
2023-07-08T16:48:03.015+0800	INFO	[publisher]	pipeline/retry.go:223	  done
2023-07-08T16:48:08.374+0800	INFO	[publisher]	pipeline/retry.go:219	retryer: send unwait signal to consumer
2023-07-08T16:48:08.374+0800	INFO	[publisher]	pipeline/retry.go:223	  done
2023-07-08T16:48:18.643+0800	INFO	[publisher]	pipeline/retry.go:219	retryer: send unwait signal to consumer

#这个时候,就给filebeat加上了日志输出logging.level: debug
[root@b01 filebeat]# cat stdin-to-kafka.yaml 
filebeat.inputs:

- type: stdin
                        
logging.level: debug
 
# 将数据输出到kafka
output.kafka:
  # 指定kafka主机列表
  hosts:
  - 10.0.7.7:9092
  # 指定kafka的topic
  topic: "filebeat"
  
  
 #这个时候在执行就会有debug日志
 [root@b01 filebeat]# ./filebeat -e -c stdin-to-kafka.yaml
 2023-07-08T16:53:29.267+0800	INFO	[publisher]	pipeline/retry.go:219	retryer: send unwait signal to consumer
2023-07-08T16:53:29.267+0800	INFO	[publisher]	pipeline/retry.go:223	  done
2023-07-08T16:53:29.267+0800	INFO	[publisher_pipeline_output]	pipeline/output.go:143	Connecting to kafka(10.0.7.7:9092)
2023-07-08T16:53:29.267+0800	DEBUG	[kafka]	kafka/client.go:100	connect: [10.0.7.7:9092]
2023-07-08T16:53:29.267+0800	INFO	[publisher_pipeline_output]	pipeline/output.go:151	Connection to kafka(10.0.7.7:9092) established
2023-07-08T16:53:30.909+0800	DEBUG	[input]	input/input.go:139	Run input
2023-07-08T16:53:40.131+0800	DEBUG	[kafka]	kafka/client.go:371	finished kafka batch
2023-07-08T16:53:40.131+0800	DEBUG	[kafka]	kafka/client.go:385	Kafka publish failed with: dial tcp: lookup manager on [::1]:53: read udp [::1]:34779->[::1]:53: read: connection refused

 #看最后一行,就会发现有个连接拒绝
 https://blog.csdn.net/ucsheep/article/details/81567959
 参考这个博客之后,发现应该吧这个lookup后边的主机名添加到/etc/hosts
 
 #添加之后,发现就可以通了,debug也没有报错了
[root@b01 filebeat]# ./filebeat -e -c stdin-to-kafka.yaml
2023-07-08T16:57:19.766+0800	INFO	[publisher_pipeline_output]	pipeline/output.go:143	Connecting to kafka(10.0.7.7:9092)
2023-07-08T16:57:19.766+0800	DEBUG	[kafka]	kafka/client.go:100	connect: [10.0.7.7:9092]
2023-07-08T16:57:19.766+0800	INFO	[publisher]	pipeline/retry.go:219	retryer: send unwait signal to consumer
2023-07-08T16:57:19.766+0800	INFO	[publisher]	pipeline/retry.go:223	  done
2023-07-08T16:57:19.767+0800	INFO	[publisher_pipeline_output]	pipeline/output.go:151	Connection to kafka(10.0.7.7:9092) established
2023-07-08T16:57:19.779+0800	DEBUG	[kafka]	kafka/client.go:371	finished kafka batch
2023-07-08T16:57:19.779+0800	DEBUG	[publisher]	memqueue/ackloop.go:160	ackloop: receive ack [0: 0, 1]
2023-07-08T16:57:19.779+0800	DEBUG	[publisher]	memqueue/eventloop.go:535	broker ACK events: count=1, start-seq=1, end-seq=1

2023-07-08T16:57:19.779+0800	DEBUG	[acker]	beater/acker.go:59	stateful ack	{"count": 1}
2023-07-08T16:57:19.779+0800	DEBUG	[publisher]	memqueue/ackloop.go:128	ackloop: return ack to broker loop:1
2023-07-08T16:57:19.779+0800	DEBUG	[publisher]	memqueue/ackloop.go:131	ackloop:  done send ack
2023-07-08T16:57:19.779+0800	DEBUG	[registrar]	registrar/registrar.go:263	Processing 1 events
2023-07-08T16:57:19.779+0800	DEBUG	[input]	file/states.go:68	New state added for 
2023-07-08T16:57:19.779+0800	DEBUG	[registrar]	registrar/registrar.go:230	Registrar state updates processed. Count: 1
2023-07-08T16:57:19.779+0800	ERROR	file/states.go:125	State for  should have been dropped, but couldn't as state is not finished.
2023-07-08T16:57:19.779+0800	DEBUG	[registrar]	registrar/registrar.go:253	Registrar states cleaned up. Before: 2, After: 2, Pending: 0
2023-07-08T16:57:19.780+0800	DEBUG	[registrar]	registrar/registrar.go:205	Registry file updated. 2 active states.
2023-07-08T16:57:24.716+0800	DEBUG	[input]	input/input.go:139	Run input
2023-07-08T16:57:34.717+0800	DEBUG	[input]	input/input.go:139	Run input
^C2023-07-08T16:57:36.926+0800	DEBUG	[service]	service/service.go:54	Received sigterm/sigint, stopping

查看消费者就有数据了
[root@manager ~]$ kafka-console-consumer.sh --bootstrap-server 10.0.7.7:9092 --topic filebeat --from-beginning
{"@timestamp":"2023-07-08T08:57:18.765Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.17.5"},"log":{"file":{"path":""},"offset":0},"message":"1111","input":{"type":"stdin"},"ecs":{"version":"1.12.0"},"host":{"name":"b01.pt"},"agent":{"id":"36644426-68b7-43c5-bf61-20fe7e842269","name":"b01.pt","type":"filebeat","version":"7.17.5","hostname":"b01.pt","ephemeral_id":"b125df50-5156-4f0a-9dde-1520f1b8a8c4"}}

这个问题主要困扰在kafka之前是别的同事部署的,有很多问题,导致了排查起来费劲。

1.他的配置文件乱码

2.kafka进程有好多个

之后遇到这个问题,一定要及时的换个机器,或者清空一下环境