Filebeat轻量型日志采集器及output插件开发
无论您是从安全设备、云、容器、主机还是 OT 进行数据收集,Filebeat 都将为您提供一种轻量型方法,用于转发和汇总日志与文件,让简单的事情不再繁杂。
介绍
Filebeat 是使用 Golang 实现的轻量型日志采集器。本质上是一个 agent ,可以安装在各个节点上,根据配置读取对应位置的日志,并上报到相应的地方去。
概要
Filebeat 并不依赖于 ElasticSearch,可以单独存在。我们可以单独使用Filebeat进行日志的上报和搜集。filebeat 内置了常用的 Output 组件, 例如 kafka、ElasticSearch、redis 等,出于调试考虑,也可以输出到 console 和 file 。我们可以利用现有的 Output 组件,将日志进行上报。
整体工作原理
Filebeat 由两个主要组件组成:harvester 和 prospector。
- harvester (采集器) 的主要职责是读取单个文件的内容。读取每个文件,并将内容发送到 the output。 每个文件启动一个 harvester,harvester 负责打开和关闭文件,这意味着在运行时文件描述符保持打开状态。如果文件在读取时被删除或重命名,Filebeat 将继续读取文件。
- prospector (查找器) 的主要职责是管理 harvester 并找到所有要读取的文件来源。如果输入类型为日志,则查找器将查找路径匹配的所有文件,并为每个文件启动一个 harvester。每个 prospector 都在自己的 Go 协程中运行。
注:Filebeat prospector只能读取本地文件, 没有功能可以连接到远程主机来读取存储的文件或日志。
由以上两个组件一起工作来读取文件(tail file)并将事件数据发送到指定的输出。 下图是 Filebeat 官方提供的架构图:
Filebeat背后的“老大”
说到Filebeat,它其实只是beats家族众多成员中的一个。除了Filebeat, 还有很多其他的beat小伙伴:
beat | 功能 |
---|---|
Filebeat | 收集日志文件 |
Metricbeat | 收集各种指标数据 |
Packetbeat | 收集网络数据包 |
Auditbeat | 收集审计数据 |
Heartbeat | 收集服务运行状态监测数据 |
… | … |
如果你愿意的话,你也可以按照beat的规范来写自己的beat。 能实现以上这些beat,都离不开beats家族真正的“老大”—— libbeat, 它是beat体系的核心库。我们接下来看一下libbeat到底都作了些什么
- libbeat提供了publisher组件,用于对接input;
- 收集到的数据在进入到libbeat后,首先会经过各种 processor的加工处理,比如过滤添加字段,多行合并等等;
- input组件通过publisher组件将收集到的数据推送到publisher内部的队列;
- libbeat本身实现了前面介绍过的多种output, 因此它负责将处理好的数据通过output组件发送出去;
- libbeat本身封装了retry的逻辑;
- libbeat负责将ACK反馈通过到input组件 ;
由此可见,大部分活儿都是libbeat来作,当“老大”不容易啊~。 input仅需要作两件事:
- 从不同的介质中收集数据后投递给libbeat;
- 接收libbeat反馈回来的ACK, 作相应的持久化;
安装与使用
Filebeat 基于 Go 语言开发无其他依赖,它最大的特点是性能稳定、配置简单、占用系统资源很少,安装使用也非常简单,可访问 Elastic-Beats 官网获取各版本 Filebeat。因为 Filebeat 各版本之间的差异较大,这里推荐7以上的新版,首先进行下载解压: tar -zxvf filebeat-7.tar.gz mv filebeat-7 filebeat cd filebeat
FileBeat启停指令:
- 调试模式下采用:终端启动(退出终端或 ctrl+c 会退出运行)
./filebeat -e -c filebeat.yml
- 线上环境配合 error 级别使用:以后台守护进程启动启动 filebeats
nohup ./filebeat -e -c filebeat.yml &
- 零输出启动(不推荐):将所有标准输出及标准错误输出到/dev/null空设备,即没有任何输出信息。
nohup ./filebeat -e -c filebeat.yml >/dev/null 2>&1 &
- 停止运行 FileBeat 进程
ps -ef | grep filebeat Kill -9 线程号
FileBeat配置文件
FileBeat 的配置文件定义了在读取文件的位置,输出流的位置以及相应的性能参数,本实例是以 Kafka 消息中间件作为缓冲,所有的日志收集器都向 Kafka 输送日志流,相应的配置项如下,并附配置说明:
filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /wls/applogs/rtlog/app.log
fields:
log_topic: appName
multiline:
# pattern for error log, if start with space or cause by
pattern: '^[[:space:]]+(at|\.{3})\b|^Caused by:'
negate: false
match: after
output.kafka:
enabled: true
hosts: ["kafka-1:9092","kafka-2:9092"]
topic: applog
version: "0.10.2.0"
compression: gzip
processors:
- drop_fields:
fields: ["beat", "input", "source", "offset"]
logging.level: error
name: app-server-ip
- paths:定义了日志文件路径,可以采用模糊匹配模式,如*.log
- fields:topic 对应的消息字段或自定义增加的字段。
- output.kafka:filebeat 支持多种输出,支持向 kafka,logstash,elasticsearch 输出数据,此处设置数据输出到 kafka。
- enabled:这个启动这个模块。
- topic:指定要发送数据给 kafka 集群的哪个 topic,若指定的 topic 不存在,则会自动创建此 topic。
- version:指定 kafka 的版本。
- drop_fields:舍弃字段,filebeat 会 json 日志信息,适当舍弃无用字段节省空间资源。
- name:收集日志中对应主机的名字,建议 name 这里设置为 IP,便于区分多台主机的日志信息。
以上参数信息,需要用户个性化修改的主要是:paths,hosts,topic,version 和 name。
Filebeat input 配置热加载
filebeat.yml
filebeat.config.inputs:
enabled: true
path: ../configs/input*.yml
reload.enabled: true
reload.period: 5s
output.syslog:
network: tcp
addr: 127.0.0.1:5140
tag: tag
fields:
prober_id: 1
input.yml
- type: log
paths:
- /Users/chentong/project/log-prober-go/prober-filebeat/cmd/filebeat/example.log
Filebeat 写入 kafka 示例
filebeat.inputs:
- type: log
paths:
- /var/log/*.log
- /Users/chentong/project/log-prober-go/logs/app.log
output.kafka:
hosts:
- 127.0.0.1:9092
topic: test_filebeat
keep_alive: 10s
fields:
prober_id: 999
Filebeat output syslog插件开发
背景
filebeat 自带只支持 es、logstash、kafka、redis、file、console 数据输出. 需要自定义输出 syslog
工程搭建
引入对beat
的依赖
go get github.com/elastic/beats/v7
定义在filebeat中的配置文件
filebeat
通常以配置文件的方式加载插件。让我们定义一下必须的配置,就像elasticsearch
中的连接地址等等一样。
output.syslog:
# 接收方式
network: tcp
# syslog 地址
addr: 127.0.0.1:5140
# 自定义消息tag
tag: tag
go文件中的配置
type syslogConfig struct {
Network string `config:"network"`
Addr string `config:"addr"`
Tag string `config:"tag"`
// 发布日志事件的 worker goroutines 数量
Workers int `config:"workers" validate:"min=1"`
// Max number of events in a batch to send to a single client
BatchSize int `config:"batch_size" validate:"min=1"`
// Max number of retries for single batch of events
RetryLimit int `config:"retry_limit" validate:"min=1"`
}
初始化加载插件
加载插件
在某个init函数中注册插件
func init() {
outputs.RegisterType("syslog", newSyslogOutPut)
}
在newSyslogOutPut
中卸载配置,并提供配置给Syslog
客户端
func newSyslogOutPut(_ outputs.IndexManager, _ beat.Info, stats outputs.Observer, cfg *common.Config) (outputs.Group, error) {
config := syslogConfig{Workers: 1, BatchSize: 1, RetryLimit: 1}
if err := cfg.Unpack(&config); err != nil {
return outputs.Fail(err)
}
clients := make([]outputs.NetworkClient, config.Workers)
for i := 0; i < config.Workers; i++ {
clients[i] = &SyslogClient{
network: config.Network,
addr: config.Addr,
tag: config.Tag,
stats: stats,
}
}
return outputs.SuccessNet(true, config.BatchSize, config.RetryLimit, clients)
}
初始化syslog
客户端
syslog
客户端不仅仅是一个syslog
客户端,而且还需要实现filebeat
中的NetworkClient
接口,接下来,让我们来关注接口中的每一个方法的作用及实现
String()接口
String
作为客户端的名字,用来标识日志以及指标。是最简单的一个接口
func (c *SyslogClient) String() string {
return "syslog"
}
Connect()接口
Connect
用来初始化客户端
func (c *SyslogClient) Connect() error {
dial, err := gsyslog.DialLogger(c.network, c.addr, gsyslog.LOG_NOTICE, "LOCAL0", c.tag)
if err != nil {
return err
}
c.client = dial
return nil
}
注意,这里初始化失败,需要Sleep
一段时间,否则,filebeat会一直重试。这绝非是你想要的。或许对于场景来说,退避重试可能会更好
Close()接口
关闭客户端,也是很简单的接口
func (c *SyslogClient) Close() error {
return c.client.Close()
}
Publish()接口
func (c *SyslogClient) Publish(_ context.Context, batch publisher.Batch) error {
events := batch.Events()
// 记录这批日志
c.stats.NewBatch(len(events))
retryEvents, err := c.PublishEvents(events)
if err != nil {
// 发送失败,则重试 受RetryLimit的限制
batch.RetryEvents(retryEvents)
return err
} else {
c.stats.Acked(len(events))
batch.ACK()
}
return nil
}
发布 syslog 数据
func (c *SyslogClient) PublishEvents(events []publisher.Event) ([]publisher.Event, error) {
for i, event := range events {
message, err := event.Content.Fields.GetValue("message")
if err != nil {
return events[i:], err
}
m, ok := message.(string)
if !ok {
return events[i:], errors.New("cast log to str failed")
}
// syslog 发送数据
_, err = c.client.Write([]byte(m))
if err != nil {
return events[i:], err
}
}
return nil, nil
}
最后是SyslogClient
type SyslogClient struct {
network string
addr string
tag string
stats outputs.Observer
client gsyslog.Syslogger
}