Vector Aggregate

发布时间 2023-09-01 11:02:22作者: XSWClevo

官网地址:https://vector.dev/

聚合文档地址:https://vector.dev/docs/reference/configuration/transforms/aggregate/

解释:

[transforms.my_transform_id]
type = "aggregate"    # 本次的transforms类型
inputs = [ "my-source-or-transform-id" ]    // 输入类型
interval_ms = 10_000    // 聚合时间

如果单独使用,看不到聚合效果,并且vector的聚合后会得到Metrics事件日志。如需其他日志类型,需要自己在TOML内写转换逻辑

Metrics:https://vector.dev/docs/reference/configuration/transforms/aggregate/#telemetry-metrics

 

官网例子(可以参考):

前提:你的日志为Metrics类型格式

[{"metric":{"counter":{"value":1.1},"kind":"incremental","name":"counter.1","tags":{"host":"my.host.com"},"timestamp":"2021-07-12T07:58:44.223543Z"}},{"metric":{"counter":{"value":2.2},"kind":"incremental","name":"counter.1","tags":{"host":"my.host.com"},"timestamp":"2021-07-12T07:58:45.223543Z"}},{"metric":{"counter":{"value":1.1},"kind":"incremental","name":"counter.1","tags":{"host":"different.host.com"},"timestamp":"2021-07-12T07:58:45.223543Z"}},{"metric":{"counter":{"value":22.33},"kind":"absolute","name":"gauge.1","tags":{"host":"my.host.com"},"timestamp":"2021-07-12T07:58:47.223543Z"}},{"metric":{"counter":{"value":44.55},"kind":"absolute","name":"gauge.1","tags":{"host":"my.host.com"},"timestamp":"2021-07-12T07:58:45.223543Z"}}]

通过聚合函数后:

[sources.my_sources_id]
type = "file" # 其他均可,参考官方的sources
include = ["D:\\az\\Vector\\json\\srcInfo.json"] # 地址需要使用 \\ 转义
read_from = "beginning"

[transforms.my_transform_id]
type = "aggregate"
inputs = [ "my_sources_id" ]
interval_ms = 5_000

[sinks.my_sink_id]
type = "console"
input = ["my_transform_id"]
encoding.codec = "json"

得到的结果:

[{"metric":{"counter":{"value":3.3},"kind":"incremental","name":"counter.1","tags":{"host":"my.host.com"},"timestamp":"2021-07-12T07:58:45.223543Z"}},{"metric":{"counter":{"value":1.1},"kind":"incremental","name":"counter.1","tags":{"host":"different.host.com"},"timestamp":"2021-07-12T07:58:45.223543Z"}},{"metric":{"counter":{"value":44.55},"kind":"absolute","name":"gauge.1","tags":{"host":"my.host.com"},"timestamp":"2021-07-12T07:58:45.223543Z"}}]

 

如果是其他类型的日志就不能使用以上的方式,这里提供一个例子:

data_dir = "D:\\az\\Vector\\json" # 定义的工作目录,但是没有用上
[log_schema]
timestamp_key = "@appname"    # 聚合的主键,大多数采用时间

[sources.file_input]
  type = "file" # required
  include = ["D:\\az\\Vector\\json\\srcInfo.json"] # required
  read_from = "beginning"

[transforms.parse_log_json]
  type = "remap"
  inputs = ["file_input"]
  source = '''
  . = parse_json!(string!(.message)) # 将消息转为json
'''

[transforms.log_to_met]
  type = "log_to_metric"    # 日志转为 metric 类型
  inputs = [ "parse_log_json" ]
  
  [[transforms.log_to_met.metrics]]
  type = "counter"
  field = "procid" # 必须指定一个字段,生成后是name = "procid"
  #procid = "procid"
  #name = "procid"
    [transforms.log_to_met.metrics.tags]
	tags = "{{.}}"    # 将所有属性放入tags对象里面 如果没有这行,下面几行注释需要放开
	#procid = "{{procid}}"
	#appname = "{{appname}}"
 	#facility = "{{facility}}"
 	#hostname = "{{hostname}}"
 	#message = "{{message}}"
 	#msgid = "{{msgid}}"
 	#severity = "{{severity}}"
 	#version = "{{version}}"

[transforms.my_transform_agg]
type = "aggregate"
inputs = [ "log_to_met" ]
interval_ms = 2000

# 这行主要是做数据结果的处理 参数 vrl函数
[transforms.my_transform_convert]
type = "remap"
inputs = [ "my_transform_agg" ]
source = '''
appname= del(.tags.appname)
facility = del(.tags.facility)
hostname = del(.tags.hostname)
message = del(.tags.message)
msgid = del(.tags.msgid)
'''


[sinks.console]
type = "console"
inputs = [ "my_transform_convert" ]
target = "stdout"

  [sinks.console.encoding]
  codec = "json"