一、案例分享¶
1.1 场景一¶
- 将 /root/my.txt文件中的日志数据按照指定的格式解析后,通过HTTP POST方法 发送到http://localhost:9090/地址。
- 发送的数据格式为JSON,每3秒发送一次。
fluentd的配置文件如下:
<source>
@type tail
path /root/my.txt
pos_file /root/my.txt.pos
tag mylog
<parse>
@type regexp
expression /(?<key>\w+)\s(?<value>\w+)/
</parse>
</source>
<match mylog>
@type http
endpoint http://localhost:9090/
open_timeout 2
http_method post
<format>
@type json
</format>
<buffer>
flush_interval 3s
</buffer>
</match>
1.2 场景二¶
- 读取指定路径(可以是环境变量或默认路径)的日志文件,不对源数据进行解析, 然后向每条记录中添加主机名信息,并将处理后的数据输出到标准输出。
<source>
@type tail
# 这里使用HISTFILE环境变量,如果没有设置,使用默认值/root/.bash_history
path "#{ENV["HISTFILE"] || /root/.bash_history}"
pos_file /root/.bash_history.pos
tag history
<parse>
@type none
</parse>
</source>
<filter history>
@type record_transformer
<record>
hostname ${hostname}
</record>
</filter>
<match history>
@type stdout
</match>
1.3 场景三¶
-
收集用户操作记录转发到另一个fluentd节点,同时将数据发送到Kafka和存入HDFS。
-
数据流为:fluentd采集端 -> fluentd收集端 -> kafka和HDFS
-
示例用户操作记录数据为:
root pts/1 2023-06-26 10:59 (10.180.206.1):root 2023-06-26 11:00:09 130 tail -f /var/log/command.his.log
采集节点的配置:
<source>
@type tail
path /var/log/command.his.log
pos_file /var/log/command.his.log.pos
tag history
<parse>
@type regexp
# 使用正则解析日志文件
expression /^(?<who_user>\w+)\s(?<pts>\S+)\s(?<who_time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2})\s\((?<remote_ip>\d+\.\d+\.\d+\.\d+)\):(?<user>\w+)\s(?<time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})\s(?<res>\d+)\s(?<command>.+)$/
time_key time
</parse>
</source>
<filter history>
@type record_transformer
<record>
# event内容增加hostname这一行
hostname ${hostname}
</record>
</filter>
<match history>
@type forward
send_timeout 60s
recover_wait 10s
hard_timeout 60s
<buffer>
# 1秒钟向另一个fluentd节点转发一次
flush_interval 1s
</buffer>
<server>
name myserver1
host 10.180.xxx.xxx
port 24225
weight 60
</server>
</match>
fluentd收集节点的配置:
<source>
@type forward
port 24225
bind 0.0.0.0
tag remote
</source>
<match remote>
# 使用copy方式,分两路输出
@type copy
<store>
@type kafka2
brokers 10.180.xxx.xxx:9092
use_event_time true
<buffer topic>
@type file
path /var/log/td-agent/buffer/td
flush_interval 3s
</buffer>
<format>
@type json
</format>
default_topic history
required_acks -1
</store>
<store>
@type webhdfs
host 10.180.xxx.xxx
port 50070
path "/history/access.log.%Y%m%d_%H.#{Socket.gethostname}.log"
<buffer>
flush_interval 60s
</buffer>
</store>
</match>