一、logstasg的grok自定义正则模式及单分支判断案例¶
官方文档:https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-grok.html#plugins-filters-grok-match
1、在elk121节点上编写logstash配置文件
[root@elk121 ~]# cat /logstash/config/06-tcp-grok_custom_pattern-es.conf
input {
beats {
port => 8888
type => "beats"
}
tcp {
port => 9999
type => "tcp"
}
http {
type => "http"
}
}
filter {
if [type] == "beats" {
grok {
match => { "message" => "%{HTTPD_COMBINEDLOG}" }
remove_field => [ "agent","log","input","host","ecs","tags" ]
}
}
if [type] == "tcp" {
grok {
# 指定加载pattern匹配模式的目录,可以是相对路径,也可以是绝对路径
patterns_dir => ["/root/patterns"]
# 基于指定字段进行匹配
# match => { "message" => "%{TEACHER:teacher}edu%{YEAR:year} 教室%{OOMNUMBER:classroom_number}"}
match => { "message" => "%{TEACHER:teacher}.{3}%{YEAR:year} 教室%{COMNUMBER:classroom_number}"}
}
}else{
}
}
output {
stdout {}
# elasticsearch {
# hosts => ["http://192.168.1.121:9200","http://192.168.1.122:9200","http://192.168.1.123:9200"]
# index => "linux85-logstash-nginx-date"
# }
}
2、在elk121节点上创建/root/patterns/jiaoshi07文件,进行grok自定义正则模式
[root@elk121 ~]# mkdir -p /root/patterns/
[root@elk121 ~]# cat /root/patterns/jiaoshi07
YEAR [\d]{4}
CLASSROOMNUMBER [0-9]{2}
TEACHER [A-Z]+
3、在elk121节点上指定配置文件启动logstash
| [root@elk121 ~]# logstash -rf /logstash/config/06-tcp-grok_custom_pattern-es.conf |
|---|
4、测试tcp类型
在elk121主机上新开一个窗口
| [root@elk121 ~]# echo "OLDBOYedu2023 教室07" | nc 192.168.1.121 9999 |
|---|
在elk121主机上观察结果,已成功匹配

5、测试beats类型
在elk123主机上编写filebeat配置文件
[root@elk123 ~]# cd /es/softwares/filebeat-7.17.5-linux-x86_64
[root@elk123 filebeat-7.17.5-linux-x86_64]# mkdir config
[root@elk123 filebeat-7.17.5-linux-x86_64]# vim config/19-nginx-to-logstash.yaml
filebeat.inputs:
- type: log
paths:
- /tmp/es/access.log
# 将数据输出到logstash中
output.logstash:
# 指定logstash的主机和端口
hosts: ["192.168.1.121:8888"]
在elk123主机上启动filebeat实例
[root@elk123 filebeat-7.17.5-linux-x86_64]# rm -rf data/
[root@elk123.oldboyedu.com filebeat-7.17.5-linux-x86_64]# filebeat -e -c config/19-nginx-to-logstash.yaml
在elk121主机上查看输出结果,已去除"agent","log","input","host","ecs","tags"相关字段

二、logstash的多分支案例¶
1、在elk121节点上编写logstash配置文件
[root@elk121 ~]# cat /logstash/config/07-tcp-grok_custom_pattern_if-es.conf
input {
beats {
port => 8888
type => "beats"
}
tcp {
port => 9999
type => "tcp"
}
http {
type => "http"
}
}
filter {
if [type] == "beats" {
grok {
match => { "message" => "%{HTTPD_COMBINEDLOG}" }
remove_field => [ "agent","log","input","host","ecs","tags" ]
}
geoip {
source => "clientip"
add_field => {"custom-type" => "jiaoshi07-beats"}
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
timezone => "Asia/Shanghai"
target => "linux85-date"
}
} else if [type] == "tcp" {
grok {
# 指定加载pattern匹配模式的目录,可以是相对路径,也可以是绝对路径
patterns_dir => ["/root/patterns"]
# 基于指定字段进行匹配
# match => { "message" => "%{TEACHER:teacher}edu%{YEAR:year} 教室%{CLASSROOMNUMBER:classroom_number}"}
match => { "message" => "%{TEACHER:teacher}.{3}%{YEAR:year} 教室%{CLASSROOMNUMBER:classroom_number}"}
add_field => {"custom-type" => "jiaoshi07-tcp"}
}
}else {
mutate {
add_field => {
"school" => "oldboyedu"
"class" => "linux85"
"custom-type" => "jiaoshi07-http"
}
}
}
}
output {
stdout {}
# elasticsearch {
# hosts => ["http://192.168.1.121:9200","http://192.168.1.122:9200","http://192.168.1.123:9200"]
# index => "linux85-logstash-nginx-date"
# }
}
2、在elk121节点上创建/root/patterns/jiaoshi07文件,进行grok自定义正则模式
[root@elk121 ~]# mkdir -p /root/patterns/
[root@elk121 ~]# cat /root/patterns/jiaoshi07
YEAR [\d]{4}
CLASSROOMNUMBER [0-9]{2}
TEACHER [A-Z]+
3、在elk121节点上指定配置文件启动logstash
| [root@elk121 ~]# logstash -rf /logstash/config/07-tcp-grok_custom_pattern_if-es.conf |
|---|
4、测试tcp类型
在elk121主机上新开一个窗口
| [root@elk121 ~]# echo "OLDBOYedu2023 教室07" | nc 192.168.1.121 9999 |
|---|
在elk121主机上观察结果,已成功匹配

5、测试beats类型
在elk123主机上编写filebeat配置文件
[root@elk123 ~]# cd /es/softwares/filebeat-7.17.5-linux-x86_64
[root@elk123 filebeat-7.17.5-linux-x86_64]# mkdir config
[root@elk123 filebeat-7.17.5-linux-x86_64]# vim config/19-nginx-to-logstash.yaml
filebeat.inputs:
- type: log
paths:
- /tmp/es/access.log
# 将数据输出到logstash中
output.logstash:
# 指定logstash的主机和端口
hosts: ["192.168.1.121:8888"]
在elk123主机上启动filebeat实例
[root@elk123 filebeat-7.17.5-linux-x86_64]# rm -rf data/
[root@elk123.oldboyedu.com filebeat-7.17.5-linux-x86_64]# filebeat -e -c config/19-nginx-to-logstash.yaml
在elk121主机上查看输出结果,已去除"agent","log","input","host","ecs","tags"相关字段,并且设置geoip及date

6、测试http类型
使用postman填写POST请求192.168.1.121:8080,输入JSON内容
{
"name": "zq"
"hobby": ["黑丝","gril"]
}

在elk121主机上进行验证查看

7、如果将"beat,tcp,http"这3个输入类型写入ES集群对应不同的索引,可以在上面的基础进行修改
重新定义/logstash/config/08-ketanglianxi.conf文件
[root@elk121 ~]# cp /logstash/config/07-tcp-grok_custom_pattern_if-es.conf /logstash/config/08-ketanglianxi.conf
[root@elk121 ~]# vim /logstash/config/08-ketanglianxi.conf
input {
beats {
port => 8888
type => "beats"
}
tcp {
port => 9999
type => "tcp"
}
http {
type => "http"
}
}
filter {
if [type] == "beats" {
grok {
match => { "message" => "%{HTTPD_COMBINEDLOG}" }
remove_field => [ "agent","log","input","host","ecs","tags" ]
}
geoip {
source => "clientip"
add_field => {"custom-type" => "jiaoshi07-beats"}
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
timezone => "Asia/Shanghai"
target => "linux85-date"
}
} else if [type] == "tcp" {
grok {
# 指定加载pattern匹配模式的目录,可以是相对路径,也可以是绝对路径
patterns_dir => ["/root/patterns"]
# 基于指定字段进行匹配
# match => { "message" => "%{TEACHER:teacher}edu%{YEAR:year} 教室%{CLASSROOMNUMBER:classroom_number}"}
match => { "message" => "%{TEACHER:teacher}.{3}%{YEAR:year} 教室%{CLASSROOMNUMBER:classroom_number}"}
add_field => {"custom-type" => "jiaoshi07-tcp"}
}
}else {
mutate {
add_field => {
"school" => "oldboyedu"
"class" => "linux85"
"custom-type" => "jiaoshi07-http"
}
}
}
}
output {
#stdout {}
if [type] == "beats" {
elasticsearch {
hosts => ["http://192.168.1.121:9200","http://192.168.1.122:9200","http://192.168.1.123:9200"]
index => "linux85-logstash-beats"
}
} else if [type] == "tcp" {
elasticsearch {
hosts => ["http://192.168.1.121:9200","http://192.168.1.122:9200","http://192.168.1.123:9200"]
index => "linux85-logstash-tcp"
}
} else {
elasticsearch {
hosts => ["http://192.168.1.121:9200","http://192.168.1.122:9200","http://192.168.1.123:9200"]
index => "linux85-logstash-http"
}
}
}
在elk121节点上指定配置文件启动logstash
| [root@elk121 ~]# logstash -rf /logstash/config/08-ketanglianxi.conf |
|---|
重新进行http、tcp、beats测试后,查看kibana索引

创建索引模式


三、logstash的多实例案例¶
1、在elk121节点上编写logstash配置文件
编写/logstash/config/09-multiple_instance-beats.conf文件
[root@elk121 ~]# vim /logstash/config/09-multiple_instance-beats.conf
input {
beats {
port => 8888
type => "beats"
}
}
filter {
grok {
match => { "message" => "%{HTTPD_COMBINEDLOG}" }
remove_field => [ "agent","log","input","host","ecs","tags" ]
}
geoip {
source => "clientip"
add_field => {"custom-type" => "jiaoshi07-beats"}
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
timezone => "Asia/Shanghai"
target => "linux85-date"
}
}
output {
# stdout {}
elasticsearch {
hosts => ["http://192.168.1.121:9200","http://192.168.1.122:9200","http://192.168.1.123:9200"]
index => "linux85-multiple_instance-beats"
}
}
编写/logstash/config/09-multiple_instance-http.conf文件
[root@elk121 ~]# vim /logstash/config/09-multiple_instance-http.conf
input {
http {
type => "http"
}
}
filter {
mutate {
add_field => {
"school" => "oldboyedu"
"class" => "linux85"
"custom-type" => "jiaoshi07-http"
}
}
}
output {
# stdout {}
elasticsearch {
hosts => ["http://192.168.1.121:9200","http://192.168.1.122:9200","http://192.168.1.123:9200"]
index => "linux85-multiple_instance-http"
}
}
编写/logstash/config/09-multiple_instance-tcp.conf文件
[root@elk121 ~]# vim /logstash/config/09-multiple_instance-tcp.conf
input {
tcp {
port => 9999
type => "tcp"
}
}
filter {
grok {
# 指定加载pattern匹配模式的目录,可以是相对路径,也可以是绝对路径
patterns_dir => ["/root/patterns"]
# 基于指定字段进行匹配
# match => { "message" => "%{TEACHER:teacher}edu%{YEAR:year} 教室%{CLASSROOMNUMBER:classroom_number}"}
match => { "message" => "%{TEACHER:teacher}.{3}%{YEAR:year} 教室%{CLASSROOMNUMBER:classroom_number}"}
add_field => {"custom-type" => "jiaoshi07-tcp"}
}
}
output {
# stdout {}
elasticsearch {
hosts => ["http://192.168.1.121:9200","http://192.168.1.122:9200","http://192.168.1.123:9200"]
index => "linux85-multiple_instance-tcp"
}
}
2、在elk121节点上多实例启动
[root@elk121 ~]# logstash -rf config/09-multiple_instance-beats.conf
[root@elk121 ~]# logstash -rf config/09-multiple_instance-http.conf --path.data /tmp/http/
[root@elk121 ~]# logstash -rf config/09-multiple_instance-tcp.conf --path.data /tmp/tcp/
说明:如果不指定--path.data ,默认路径为/es/softwares/logstash-7.17.5/data/
四、logstash的多pipline案例¶
特别注意:当logstash指定-f选项时,会自动忽略pipeline相关配置文件

1、在elk121主机上编写配置文件
编写/logstash/config/10-pipeline-beats.conf文件
[root@elk121 ~]# vim / logstash /config/10-pipeline-beats.conf
input {
beats {
port => 8888
type => "beats"
}
}
filter {
grok {
match => { "message" => "%{HTTPD_COMBINEDLOG}" }
remove_field => [ "agent","log","input","host","ecs","tags" ]
}
geoip {
source => "clientip"
add_field => {"custom-type" => "jiaoshi07-beats"}
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
timezone => "Asia/Shanghai"
target => "linux85-date"
}
}
output {
# stdout {}
elasticsearch {
hosts => ["http://192.168.1.121:9200","http://192.168.1.122:9200","http://192.168.1.123:9200"]
index => "linux85-multiple_pipeline-beats"
}
}
编写/logstash/config/10-pipeline-http.conf文件
[root@elk121 ~]# vim /logstash/config/10-pipeline-http.conf
input {
http {
type => "http"
}
}
filter {
mutate {
add_field => {
"school" => "oldboyedu"
"class" => "linux85"
"custom-type" => "jiaoshi07-http"
}
}
}
output {
# stdout {}
elasticsearch {
hosts => ["http://192.168.1.121:9200","http://192.168.1.122:9200","http://192.168.1.123:9200"]
index => "linux85-multiple_pipeline-http"
}
}
编写/logstash/config/10-pipeline-tcp.conf文件
[root@elk121 ~]# vim /logstash/config/10-pipeline-tcp.conf
input {
tcp {
port => 9999
type => "tcp"
}
}
filter {
grok {
# 指定加载pattern匹配模式的目录,可以是相对路径,也可以是绝对路径
patterns_dir => ["/root/patterns"]
# 基于指定字段进行匹配
# match => { "message" => "%{TEACHER:teacher}edu%{YEAR:year} 教室%{CLASSROOMNUMBER:classroom_number}"}
match => { "message" => "%{TEACHER:teacher}.{3}%{YEAR:year} 教室%{CLASSROOMNUMBER:classroom_number}"}
add_field => {"custom-type" => "jiaoshi07-tcp"}
}
}
output {
# stdout {}
elasticsearch {
hosts => ["http://192.168.1.121:9200","http://192.168.1.122:9200","http://192.168.1.123:9200"]
index => "linux85-multiple_pipeline-tcp"
}
}
2、在elk121主机上修改pipline的配置文件
在/es/softwares/logstash-7.17.5/config/pipelines.yml文件末尾添加如下内容
[root@elk121 ~]# vim /es/softwares/logstash-7.17.5/config/pipelines.yml
- pipeline.id: linux85-pipeline-beats
path.config: "/logstash/config/10-pipeline-beats.conf"
- pipeline.id: linux85-pipeline-tcp
path.config: "/logstash/config/10-pipeline-tcp.conf"
- pipeline.id: linux85-pipeline-http
path.config: "/logstash/config/10-pipeline-http.conf"
修改后的配置文件内容为如下
[root@elk121 ~]# egrep -v '^#|^$' /es/softwares/logstash-7.17.5/config/pipelines.yml
- pipeline.id: linux85-pipeline-beats
path.config: "/logstash/config/10-pipeline-beats.conf"
- pipeline.id: linux85-pipeline-tcp
path.config: "/logstash/config/10-pipeline-tcp.conf"
- pipeline.id: linux85-pipeline-http
path.config: "/logstash/config/10-pipeline-http.conf"
3、在elk121主机上启动logstash实例
| [root@elk121 ~]# logstash |
|---|
4、测试tcp类型
在elk121主机上新开一个窗口
| [root@elk121 ~]# echo "OLDBOYedu2023 教室07" | nc 192.168.1.121 9999 |
|---|
在kibana上观察结果,已成功匹配

5、测试beats类型
在elk123主机上编写filebeat配置文件
[root@elk123 ~]# cd /es/softwares/filebeat-7.17.5-linux-x86_64
[root@elk123 filebeat-7.17.5-linux-x86_64]# mkdir config
[root@elk123 filebeat-7.17.5-linux-x86_64]# vim config/19-nginx-to-logstash.yaml
filebeat.inputs:
- type: log
paths:
- /tmp/es/access.log
# 将数据输出到logstash中
output.logstash:
# 指定logstash的主机和端口
hosts: ["192.168.1.121:8888"]
在elk123主机上启动filebeat实例
[root@elk123 filebeat-7.17.5-linux-x86_64]# rm -rf data/
[root@elk123.oldboyedu.com filebeat-7.17.5-linux-x86_64]# filebeat -e -c config/19-nginx-to-logstash.yaml
在kibana上观察结果,已成功匹配

6、测试http类型
使用postman填写POST请求192.168.1.121:8080,输入JSON内容
{
"name": "zq"
"hobby": ["黑丝","gril"]
}

在kibana上观察结果,已成功匹配

五、logstash的useragent过滤器及kibana出图展示案例¶
1、在elk123主机上创建日志文件进行分析
[root@elk123 filebeat-7.17.5-linux-x86_64]# cat /var/log/nginx/access.log
{"@timestamp":"2023-04-06T16:17:43+08:00","host":"10.0.0.103","clientip":"110.110.110.110","SendBytes":615,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"10.0.0.103","uri":"/index.html","domain":"10.0.0.103","xff":"-","referer":"-","tcp_xff":"-","http_user_agent":"curl/7.29.0","status":"200"}
{"@timestamp":"2023-04-06T18:18:18+08:00","host":"10.0.0.103","clientip":"101.231.54.100","SendBytes":0,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"10.0.0.103","uri":"/index.html","domain":"10.0.0.103","xff":"-","referer":"-","tcp_xff":"-","http_user_agent":"Mozilla/5.0 (iPad; CPU OS 13_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/87.0.4280.77 Mobile/15E148 Safari/604.1","status":"304"}
{"@timestamp":"2023-04-07T08:18:32+08:00","host":"10.0.0.103","clientip":"219.141.136.10","SendBytes":0,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"10.0.0.103","uri":"/index.html","domain":"10.0.0.103","xff":"-","referer":"-","tcp_xff":"-","http_user_agent":"Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1","status":"304"}
{"@timestamp":"2023-04-07T10:18:52+08:00","host":"10.0.0.103","clientip":"221.118.208.184","SendBytes":0,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"10.0.0.103","uri":"/index.html","domain":"10.0.0.103","xff":"-","referer":"-","tcp_xff":"-","http_user_agent":"Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1","status":"304"}
{"@timestamp":"2023-04-07T12:19:07+08:00","host":"10.0.0.103","clientip":"21.118.208.84","SendBytes":0,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"10.0.0.103","uri":"/index.html","domain":"10.0.0.103","xff":"-","referer":"-","tcp_xff":"-","http_user_agent":"Mozilla/5.0 (Linux; Android 10; SM-G981B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.162 Mobile Safari/537.36","status":"404"}
2、在elk123主机上配置filebeat配置文件
[root@elk123 ~]# cd /es/softwares/filebeat-7.17.5-linux-x86_64
[root@elk123 filebeat-7.17.5-linux-x86_64]# mkdir config
[root@elk123 filebeat-7.17.5-linux-x86_64]# vim config/20-nginx-to-logstash.yaml
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/access.log*
json:
keys_under_root: true
add_error_key: true
overwrite_keys: true
# 将数据输出到logstash中
output.logstash:
# 指定logstash的主机和端口
hosts: ["192.168.1.121:8888"]
3、在elk123主机上启动filebeat实例
| [root@elk103.oldboyedu.com filebeat-7.17.5-linux-x86_64]# filebeat -e -c config/20-nginx-to-logstash.yaml |
|---|
4、在elk121主机上编写logstash配置文件
[root@elk121 ~]# cat /logstash/config/11-beats-grok_geoip_date_useragent-es.conf
input {
beats {
port => 8888
}
}
filter {
mutate {
remove_field => [ "agent","log","input","host","ecs","tags" ]
}
geoip {
source => "clientip"
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
timezone => "Asia/Shanghai"
}
# 用于分析客户端设备类型的插件
useragent {
# 指定基于哪个字段分析设备
source => "http_user_agent"
# 指定将解析的数据放在哪个字段,若不指定,则默认放在顶级字段中
target => "linux85-agent"
}
}
output {
# stdout {}
elasticsearch {
hosts => ["http://192.168.1.121:9200","http://192.168.1.122:9200","http://192.168.1.123:9200"]
index => "linux85-logstash-nginx-useragent"
}
}
5、在elk121主机上启动logstash实例
| [root@elk121 ~]# logstash -rf /logstash/config/11-beats-grok_geoip_date_useragent-es.conf |
|---|
6、登录kibana点击【索引管理】,观察到成功创建索引

点击【索引模式】创建索引模式

点击【菜单栏】-【Discover】


7、设置kibana出图展示
添加【linux85-agent.device】字段

添加【linux85-agent.os】字段和【linux85-agent.os_full】字段

保存模板

创建可视化库



搜索【device】字段进行拖拽

8、在elk123主机上日志文件末尾再添加日志信息
[root@elk123 filebeat-7.17.5-linux-x86_64]# vim /var/log/nginx/access.log
…
…
{"@timestamp":"2023-04-07T10:18:52+08:00","host":"10.0.0.103","clientip":"221.118.208.184","SendBytes":0,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"10.0.0.103","uri":"/index.html","domain":"10.0.0.103","xff":"-","referer":"-","tcp_xff":"-","http_user_agent":"Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1","status":"304"}
{"@timestamp":"2023-04-07T12:19:07+08:00","host":"10.0.0.103","clientip":"21.118.208.84","SendBytes":0,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"10.0.0.103","uri":"/index.html","domain":"10.0.0.103","xff":"-","referer":"-","tcp_xff":"-","http_user_agent":"Mozilla/5.0 (Linux; Android 10; SM-G981B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.162 Mobile Safari/537.36","status":"404"}
{"@timestamp":"2023-04-07T10:18:52+08:00","host":"10.0.0.103","clientip":"221.118.208.184","SendBytes":0,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"10.0.0.103","uri":"/index.html","domain":"10.0.0.103","xff":"-","referer":"-","tcp_xff":"-","http_user_agent":"Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1","status":"304"}
{"@timestamp":"2023-04-07T12:19:07+08:00","host":"10.0.0.103","clientip":"21.118.208.84","SendBytes":0,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"10.0.0.103","uri":"/index.html","domain":"10.0.0.103","xff":"-","referer":"-","tcp_xff":"-","http_user_agent":"Mozilla/5.0 (Linux; Android 10; SM-G981B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.162 Mobile Safari/537.36","status":"404"}
9、在kibana上设置每秒刷新1次

页面即时刷新

10、在kibana上保存Len可视化,名称为:客户操作系统占比

六、logstash的mutate过滤插件及kibana的dashboard案例¶
1、在elk123主机上准备生成python脚本
[root@elk123 ~]# cat > generate_log.py <<EOF
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author : Jason Yin
import datetime
import random
import logging
import time
import sys
LOG_FORMAT = "%(levelname)s %(asctime)s [com.oldboyedu.%(module)s] - %(message)s "
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
# 配置root的logging.Logger实例的基本配置
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, datefmt=DATE_FORMAT, filename=sys.argv[1]
, filemode='a',)
actions = ["浏览页面", "评论商品", "加入收藏", "加入购物车", "提交订单", "使用优惠券", "领取优惠券",
"搜索", "查看订单", "付款", "清空购物车"]
while True:
time.sleep(random.randint(1, 5))
user_id = random.randint(1, 10000)
# 对生成的浮点数保留2位有效数字.
price = round(random.uniform(15000, 30000),2)
action = random.choice(actions)
svip = random.choice([0,1])
logging.info("DAU|{0}|{1}|{2}|{3}".format(user_id, action,svip,price))
EOF
2、后台运行,生成/tmp/app.log文件
| [root@elk123 ~]# nohup python generate_log.py /tmp/app.log &>/dev/null & |
|---|
3、在elk121主机上编写logstash配置文件
[root@elk121 ~]# vim /logstash/config/12-beats-mutate-es.conf
input {
beats {
port => 9999
}
}
filter {
mutate {
remove_field => [ "agent","log","input","host","ecs","tags" ]
}
mutate {
# 将message字段使用"|"进行切分
split => { "message" => "|" }
}
mutate {
add_field => {
userid => "%{[message][1]}"
verb => "%{[message][2]}"
svip => "%{[message][3]}"
price => "%{[message][4]}"
}
}
mutate {
rename => {
"verb" => "action"
}
}
mutate {
convert => {
"userid" => "integer"
"svip" => "boolean"
"price" => "float"
}
}
}
output {
# stdout {}
elasticsearch {
hosts => ["http://192.168.1.121:9200","http://192.168.1.122:9200","http://192.168.1.123:9200"]
index => "linux85-logstash-nginx-mutate"
}
}
4、在elk121主机上启动logstash实例
| [root@elk121 ~]# logstash -rf /logstash/config/12-beats-mutate-es.conf |
|---|
5、在elk123主机上编写logstash配置文件
[root@elk123 ~]# cd /es/softwares/filebeat-7.17.5-linux-x86_64/
[root@elk123 filebeat-7.17.5-linux-x86_64]# vim config/21-apps-to-logstash.yaml
filebeat.inputs:
- type: log
paths:
- /tmp/app.log
output.logstash:
hosts: ["192.168.1.121:9999"]
6、在elk123主机上启动filebeat实例
| [root@elk123 filebeat-7.17.5-linux-x86_64]# filebeat -e -c config/21-apps-to-logstash.yaml |
|---|
7、打开kibana界面,点击【索引管理】查看到索引已成功创建

8、创建【索引模式】

9、保存新的日志模板,模板名称为app日志
添加【userid】、【app】、【svip】、【price】字段

保存为app日志模板

10、定做可视化库
点击【菜单栏】-【Visualize Library】

点击【创建可视化】-【Lens】


搜索【svip】字段,拖拽到右边界面,保存名为app-svip用户统计的可视化库


搜索【userid】字段,拖拽到右边界面,保存名为app-总用户量的可视化库


搜索【action】字段,拖拽到右边界面,保存名为app-用户行为的可视化库


11、创建dashboard




保存名为app的仪表板

七、ELFK架构分析nginx日志并出图展示¶
将nginx日志分析,通过kibana展示数据,pv,带宽总量,公网IP的Top10统计等信息。
1、在elk121节点上配置logstash配置文件
[root@elk121 ~]# vim /logstash/config/13-procect.conf
input {
beats {
port => 7777
}
}
filter {
mutate {
remove_field => [ "agent","log","input","host","ecs","tags" ]
}
geoip {
source => "clientip"
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
timezone => "Asia/Shanghai"
}
useragent {
source => "http_user_agent"
# target => "linux85-agent"
}
}
output {
# stdout {}
elasticsearch {
hosts => ["http://192.168.1.121:9200","http://192.168.1.122:9200","http://192.168.1.123:9200"]
index => "linux85-logstash-nginx-project-%{+yyyy.MM.dd}"
}
}
2、在elk121节点上启动logstash实例
| [root@elk121 ~]# logstash -rf /logstash/config/13-procect.conf |
|---|
3、在elk123节点上配置filebeat配置文件
[root@elk123 filebeat-7.17.5-linux-x86_64]# vim config/22-project.yaml
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/access.log*
json:
keys_under_root: true
add_error_key: true
overwrite_keys: true
output.logstash:
hosts: ["192.168.1.121:7777"]
4、在elk123节点上启动filebeat实例
| [root@elk123 filebeat-7.17.5-linux-x86_64]# filebeat -e -c config/22-project.yaml |
|---|
5、在kibana上查看索引,观察到成功创建

6、在kibana上创建索引模式

点击【菜单栏】-【Discover】

7、添加clientip字段、device字段、geoip.city_name字段、geoip.country_name字段、os字段、SendBytes字段、status字段、uri字段

8、保存项目

9、创建可视化
(1)创建PV可视化库
点击【Visualize Library】

点击【创建可视化】

点击【基于聚合】

点击【指标】

点击【linux85-logstash-nginx-project-2024.01*】

定制标签PV

点击【保存】

(2)创建IP统计可视化库
点击【创建可视化】

点击【基于聚合】

点击【指标】


聚合选择【唯一计数】,字段选择【clientip.keyword】,定制标签填写【IP统计】

保存可视化

(3)创建带宽统计可视化库
选择【Lens】

拖拽【SendBytes】并设置函数【求和】、名称为【带宽统计】、值格式为【字节(1024)】

保存模板

10、创建仪表板
点击【Dashboard】

点击【创建仪表板】

点击【从库中添加】


调整仪表板,完成后保存即可

11、如果想添加地图信息,需要执行下面操作:
(1) 创建索引映射
填写PUT 请求http://192.168.1.123:9200/linux85-map
{
"mappings": {
"properties": {
"location": {
"type": "geo_point"
}
}
}
}

(2) 写入地理位置-lat代表纬度,lon代表经度
填写POST 请求http://192.168.1.123:9200/linux85-map/_doc
{
"location": {
"lat": 39.914,
"lon": 116.386
}
}
(3)创建索引模式

(4)添加Maps
点击【Maps】后选择索引模式linux85-map*后,点击【添加图层】

保存到地图到已有的仪表板(nginx日志大盘)


八、logstash从kafka拉取数据并解析json格式实战案例¶
说明:本实验是在完成3.2.13 filebeat将数据写入到Kafka案例的基础上进行的
1、在elk121节点上配置logstash配置文件
[root@elk121 ~]# cat /logstash/config/17-kafka-to-stdout.conf
input {
kafka {
# 指定kafka集群地址
bootstrap_servers => "192.168.1.121:9092,192.168.1.122:9092,192.168.1.123:9092"
# 指定消费的topic
topics => ["test03"]
# 指定消费者组
group_id => "group-id01"
# 指定消费的偏移量,"earliest"表示从头读取数据,"latest"表示从最新的位置读取数据.
auto_offset_reset => "earliest"
}
}
filter {
json {
# 对指定字段进行json格式解析。
source => "message"
}
mutate {
remove_field => [ "agent","log","input","host","ecs","tags" ]
}
}
output {
stdout {}
}
2、在elk121节点上启动logstash,观察到成功从kafka拿取数据
| [root@elk121 ~]# logstash -rf /logstash/config/17-kafka-to-stdout.conf |
|---|
