..

Filebeat轻量型日志采集器及output插件开发

无论您是从安全设备、云、容器、主机还是 OT 进行数据收集,Filebeat 都将为您提供一种轻量型方法,用于转发和汇总日志与文件,让简单的事情不再繁杂。

介绍

Filebeat 是使用 Golang 实现的轻量型日志采集器。本质上是一个 agent ,可以安装在各个节点上,根据配置读取对应位置的日志,并上报到相应的地方去。

概要

Filebeat 并不依赖于 ElasticSearch,可以单独存在。我们可以单独使用Filebeat进行日志的上报和搜集。filebeat 内置了常用的 Output 组件, 例如 kafka、ElasticSearch、redis 等,出于调试考虑,也可以输出到 console 和 file 。我们可以利用现有的 Output 组件,将日志进行上报。

整体工作原理

Filebeat 由两个主要组件组成:harvester 和 prospector。

  • harvester (采集器) 的主要职责是读取单个文件的内容。读取每个文件,并将内容发送到 the output。 每个文件启动一个 harvester,harvester 负责打开和关闭文件,这意味着在运行时文件描述符保持打开状态。如果文件在读取时被删除或重命名,Filebeat 将继续读取文件。
  • prospector (查找器) 的主要职责是管理 harvester 并找到所有要读取的文件来源。如果输入类型为日志,则查找器将查找路径匹配的所有文件,并为每个文件启动一个 harvester。每个 prospector 都在自己的 Go 协程中运行。

注:Filebeat prospector只能读取本地文件, 没有功能可以连接到远程主机来读取存储的文件或日志。

由以上两个组件一起工作来读取文件(tail file)并将事件数据发送到指定的输出。 下图是 Filebeat 官方提供的架构图:

Filebeat背后的“老大”

说到Filebeat,它其实只是beats家族众多成员中的一个。除了Filebeat, 还有很多其他的beat小伙伴:

beat功能
Filebeat收集日志文件
Metricbeat收集各种指标数据
Packetbeat收集网络数据包
Auditbeat收集审计数据
Heartbeat收集服务运行状态监测数据

如果你愿意的话,你也可以按照beat的规范来写自己的beat。 能实现以上这些beat,都离不开beats家族真正的“老大”—— libbeat, 它是beat体系的核心库。我们接下来看一下libbeat到底都作了些什么

  • libbeat提供了publisher组件,用于对接input;
  • 收集到的数据在进入到libbeat后,首先会经过各种 processor的加工处理,比如过滤添加字段,多行合并等等;
  • input组件通过publisher组件将收集到的数据推送到publisher内部的队列;
  • libbeat本身实现了前面介绍过的多种output, 因此它负责将处理好的数据通过output组件发送出去;
  • libbeat本身封装了retry的逻辑;
  • libbeat负责将ACK反馈通过到input组件 ;

由此可见,大部分活儿都是libbeat来作,当“老大”不容易啊~。 input仅需要作两件事:

  • 从不同的介质中收集数据后投递给libbeat;
  • 接收libbeat反馈回来的ACK, 作相应的持久化;

安装与使用

Filebeat 基于 Go 语言开发无其他依赖,它最大的特点是性能稳定、配置简单、占用系统资源很少,安装使用也非常简单,可访问 Elastic-Beats 官网获取各版本 Filebeat。因为 Filebeat 各版本之间的差异较大,这里推荐7以上的新版,首先进行下载解压: tar -zxvf filebeat-7.tar.gz mv filebeat-7 filebeat cd filebeat

FileBeat启停指令:

  • 调试模式下采用:终端启动(退出终端或 ctrl+c 会退出运行)

./filebeat -e -c filebeat.yml

  • 线上环境配合 error 级别使用:以后台守护进程启动启动 filebeats

nohup ./filebeat -e -c filebeat.yml &

  • 零输出启动(不推荐):将所有标准输出及标准错误输出到/dev/null空设备,即没有任何输出信息。

nohup ./filebeat -e -c filebeat.yml >/dev/null 2>&1 &

  • 停止运行 FileBeat 进程

ps -ef | grep filebeat Kill -9 线程号

FileBeat配置文件

FileBeat 的配置文件定义了在读取文件的位置,输出流的位置以及相应的性能参数,本实例是以 Kafka 消息中间件作为缓冲,所有的日志收集器都向 Kafka 输送日志流,相应的配置项如下,并附配置说明:

filebeat.yml

filebeat.inputs: 
- type: log
  enabled: true
  paths:
    - /wls/applogs/rtlog/app.log
  fields: 
    log_topic: appName
  multiline:
        # pattern for error log, if start with space or cause by 
        pattern: '^[[:space:]]+(at|\.{3})\b|^Caused by:'
        negate:  false
        match:   after

output.kafka:
   enabled: true
   hosts: ["kafka-1:9092","kafka-2:9092"]
   topic: applog
   version: "0.10.2.0"
   compression: gzip

processors:
- drop_fields: 
   fields: ["beat", "input", "source", "offset"]

logging.level: error
name: app-server-ip
  • paths:定义了日志文件路径,可以采用模糊匹配模式,如*.log
  • fields:topic 对应的消息字段或自定义增加的字段。
  • output.kafka:filebeat 支持多种输出,支持向 kafka,logstash,elasticsearch 输出数据,此处设置数据输出到 kafka。
  • enabled:这个启动这个模块。
  • topic:指定要发送数据给 kafka 集群的哪个 topic,若指定的 topic 不存在,则会自动创建此 topic。
  • version:指定 kafka 的版本。
  • drop_fields:舍弃字段,filebeat 会 json 日志信息,适当舍弃无用字段节省空间资源。
  • name:收集日志中对应主机的名字,建议 name 这里设置为 IP,便于区分多台主机的日志信息。

以上参数信息,需要用户个性化修改的主要是:paths,hosts,topic,version 和 name。

Filebeat input 配置热加载

filebeat.yml

filebeat.config.inputs:
  enabled: true
  path: ../configs/input*.yml
  reload.enabled: true
  reload.period: 5s

output.syslog:
  network: tcp
  addr: 127.0.0.1:5140
  tag: tag

fields:
  prober_id: 1

input.yml

- type: log
  paths:
    - /Users/chentong/project/log-prober-go/prober-filebeat/cmd/filebeat/example.log

Filebeat 写入 kafka 示例

filebeat.inputs:
    - type: log
      paths:
        - /var/log/*.log
        - /Users/chentong/project/log-prober-go/logs/app.log
output.kafka:
    hosts:
        - 127.0.0.1:9092
    topic: test_filebeat
    keep_alive: 10s
fields:
    prober_id: 999

Filebeat output syslog插件开发

背景

filebeat 自带只支持 es、logstash、kafka、redis、file、console 数据输出. 需要自定义输出 syslog

工程搭建

引入对beat的依赖

go get github.com/elastic/beats/v7

定义在filebeat中的配置文件 filebeat通常以配置文件的方式加载插件。让我们定义一下必须的配置,就像elasticsearch中的连接地址等等一样。

output.syslog:
  # 接收方式
  network: tcp   
  # syslog 地址
  addr: 127.0.0.1:5140
  # 自定义消息tag
  tag: tag

go文件中的配置

type syslogConfig struct {
	Network string `config:"network"`
	Addr    string `config:"addr"`
	Tag     string `config:"tag"`
	// 发布日志事件的 worker goroutines 数量
	Workers int `config:"workers" validate:"min=1"`
	// Max number of events in a batch to send to a single client
	BatchSize int `config:"batch_size" validate:"min=1"`
	// Max number of retries for single batch of events
	RetryLimit int `config:"retry_limit" validate:"min=1"`
}

初始化加载插件

加载插件

在某个init函数中注册插件

func init() {
	outputs.RegisterType("syslog", newSyslogOutPut)
}

newSyslogOutPut中卸载配置,并提供配置给Syslog客户端

func newSyslogOutPut(_ outputs.IndexManager, _ beat.Info, stats outputs.Observer, cfg *common.Config) (outputs.Group, error) {
	config := syslogConfig{Workers: 1, BatchSize: 1, RetryLimit: 1}
	if err := cfg.Unpack(&config); err != nil {
		return outputs.Fail(err)
	}
	clients := make([]outputs.NetworkClient, config.Workers)
	for i := 0; i < config.Workers; i++ {
		clients[i] = &SyslogClient{
			network: config.Network,
			addr:    config.Addr,
			tag:     config.Tag,
			stats:   stats,
		}
	}
	return outputs.SuccessNet(true, config.BatchSize, config.RetryLimit, clients)
}

初始化syslog客户端

syslog客户端不仅仅是一个syslog客户端,而且还需要实现filebeat中的NetworkClient接口,接下来,让我们来关注接口中的每一个方法的作用及实现

String()接口

String作为客户端的名字,用来标识日志以及指标。是最简单的一个接口

func (c *SyslogClient) String() string {
	return "syslog"
}

Connect()接口

Connect用来初始化客户端

func (c *SyslogClient) Connect() error {
	dial, err := gsyslog.DialLogger(c.network, c.addr, gsyslog.LOG_NOTICE, "LOCAL0", c.tag)
	if err != nil {
		return err
	}
	c.client = dial
	return nil
}

注意,这里初始化失败,需要Sleep一段时间,否则,filebeat会一直重试。这绝非是你想要的。或许对于场景来说,退避重试可能会更好

Close()接口

关闭客户端,也是很简单的接口

func (c *SyslogClient) Close() error {
	return c.client.Close()
}

Publish()接口

func (c *SyslogClient) Publish(_ context.Context, batch publisher.Batch) error {
	events := batch.Events()
	// 记录这批日志
	c.stats.NewBatch(len(events))
	retryEvents, err := c.PublishEvents(events)
	if err != nil {
		// 发送失败,则重试 受RetryLimit的限制
		batch.RetryEvents(retryEvents)
		return err
	} else {
		c.stats.Acked(len(events))
		batch.ACK()
	}
	return nil
}

发布 syslog 数据

func (c *SyslogClient) PublishEvents(events []publisher.Event) ([]publisher.Event, error) {
	for i, event := range events {
		message, err := event.Content.Fields.GetValue("message")
		if err != nil {
			return events[i:], err
		}
		m, ok := message.(string)
		if !ok {
			return events[i:], errors.New("cast log to str failed")
		}
		// syslog 发送数据
		_, err = c.client.Write([]byte(m))
		if err != nil {
			return events[i:], err
		}
	}
	return nil, nil
}

最后是SyslogClient

type SyslogClient struct {
	network string
	addr    string
	tag     string

	stats  outputs.Observer
	client gsyslog.Syslogger
}