ELK 管道全流程
Filebeat -> Logstash -> ES -> Kibana 管道全流程
ELK(Elasticsearch、Logstash、Kibana)加上 Filebeat(或统称 ELK Stack)是目前最主流的日志中心化解决方案。本文从零解析 Filebeat -> Logstash -> Elasticsearch -> Kibana 的完整数据管道,给出可落地的配置示例。
1. 各组件职责
| 组件 | 职责 | 特点 |
|---|---|---|
| Filebeat | 日志采集 | 轻量级,占用资源极低,负责 tail 日志文件并发送 |
| Logstash | 解析与过滤 | 强大的 pipeline 处理:grok 解析、字段转换、geoip 丰富 |
| Elasticsearch | 存储与搜索 | 存储结构化日志,提供全文搜索和聚合 |
| Kibana | 可视化与查询 | Web UI 搜索日志、创建仪表盘 |
应用服务器 (Nginx/Java/MySQL)
│
▼
Filebeat 采集日志文件
│
▼
Logstash 解析、过滤、丰富
│
▼
Elasticsearch 存储索引
│
▼
Kibana 查询 & 可视化
2. Filebeat 配置
场景一:Filebeat 直连 ES(小规模,无需复杂过滤)
# filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
tags: ["nginx", "access"]
fields:
service: nginx
environment: production
fields_under_root: false
# 多行日志合并(如 Java 异常堆栈)
multiline:
pattern: '^\d{4}-\d{2}-\d{2}'
negate: true
match: after
output.elasticsearch:
hosts: ["https://es-node:9200"]
username: "elastic"
password: "changeme"
ssl.verification_mode: none
index: "nginx-%{+yyyy.MM.dd}"
场景二:Filebeat 输出到 Logstash(生产推荐)
# filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
output.logstash:
hosts: ["logstash:5044"]
loadbalance: true
worker: 2
3. Logstash Pipeline 配置
Logstash 的 pipeline 三个核心阶段:Input -> Filter -> Output。
解析 Nginx 访问日志的完整 Pipeline
# /etc/logstash/conf.d/nginx.conf
input {
beats {
port => 5044
# 可以设置 SSL
# ssl => true
# ssl_certificate => "/etc/logstash/certs/logstash.crt"
# ssl_key => "/etc/logstash/certs/logstash.key"
}
}
filter {
# 1. 如果 Filebeat 采集的是 JSON 格式日志,可以直接解析
if [fields][service] == "nginx-json" {
json {
source => "message"
}
}
# 2. Grok 解析 Nginx 标准 access log
if [fields][service] == "nginx" {
grok {
match => {
"message" => '%{IPORHOST:client_ip} - %{DATA:remote_user} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{DATA:request} HTTP/%{NUMBER:http_version}" %{NUMBER:response_code} %{NUMBER:body_bytes_sent} "%{DATA:referer}" "%{DATA:user_agent}"'
}
# 解析失败时打 tag
tag_on_failure => ["_grokparsefailure"]
}
# 3. 字段类型转换
mutate {
convert => {
"response_code" => "integer"
"body_bytes_sent" => "integer"
}
# 移除不需要的字段
remove_field => ["@version", "host", "input"]
# 重命名字段
rename => { "@timestamp" => "event_time" }
}
# 4. 时间戳解析
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
}
# 5. GeoIP 丰富(根据 client_ip 解析地理位置)
geoip {
source => "client_ip"
target => "geo"
add_tag => ["geoip"]
}
# 6. User Agent 解析
useragent {
source => "user_agent"
target => "ua"
}
}
# 7. 删除 grok 解析失败的日志(或者单独路由到错误队列)
if "_grokparsefailure" in [tags] {
drop { }
}
}
output {
# 输出到 ES
elasticsearch {
hosts => ["https://es-node:9200"]
user => "elastic"
password => "changeme"
ssl => true
cacert => "/etc/logstash/certs/ca.crt"
index => "nginx-%{+yyyy.MM.dd}"
# 使用 ILM 管理索引生命周期
ilm_enabled => true
ilm_rollover_alias => "nginx"
ilm_pattern => "000001"
}
# stdout 调试用(生产注释掉)
stdout { codec => rubydebug }
}
Grok 常用模式速查
| 模式 | 匹配内容 | 示例 |
|---|---|---|
%{IPORHOST:client} |
IP 或主机名 | 192.168.1.100 |
%{HTTPDATE:ts} |
HTTP 标准日期 | 10/Jun/2024:08:55:36 +0800 |
%{WORD:method} |
HTTP 方法 | GET/POST/PUT |
%{NUMBER:code} |
数字 | 200/404/500 |
%{GREEDYDATA:ua} |
剩余所有字符(贪婪) | User-Agent 字符串 |
%{LOGLEVEL:level} |
日志级别 | INFO/WARN/ERROR |
4. ILM + Rollover 配合日志索引自动管理
PUT _ilm/policy/nginx_ilm_policy
{
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_primary_shard_size": "30gb",
"max_age": "1d"
},
"set_priority": {
"priority": 100
}
}
},
"warm": {
"min_age": "3d",
"actions": {
"shrink": {
"number_of_shards": 1
},
"forcemerge": {
"max_num_segments": 1
},
"set_priority": {
"priority": 50
}
}
},
"cold": {
"min_age": "7d",
"actions": {
"set_priority": {
"priority": 0
}
}
},
"delete": {
"min_age": "30d",
"actions": {
"delete": {}
}
}
}
}
}
Rollover 的好处:索引大小可控,热数据在 SSD 上,冷数据自动迁移到慢存储,过期数据自动删除。
5. 常见瓶颈与排查
| 瓶颈位置 | 症状 | 原因 | 解决 |
|---|---|---|---|
| Filebeat 积压 | Kafka/Logstash 消费不过来 | 下游处理慢 | 增加 Filebeat worker、调大 harvester_limit |
| Logstash 内存不足 | pipeline 频繁 GC、事件堆积 | Filter 过多或 Grok 正则复杂度高 | 增加 JVM 内存(-Xmx)、拆分 pipeline |
| Logstash CPU 打满 | input 队列积压 |
Grok 解析太多 | 用 dissect 替代 grok(性能高 10 倍+)、增加 Logstash 节点 |
| ES 写入瓶颈 | 429 rejected |
refresh 间隔太短、shard 太多 | 调大 refresh_interval、减少 shard 数 |
| 网络带宽打满 | Filebeat -> Logstash 或 Logstash -> ES 延迟高 | 网络带宽不足 | 压缩传输(compression_level: 3)、使用持久队列 |
Logstash 性能优化配置
# logstash.yml
pipeline.workers: 8 # 每个 pipeline 的 worker 数,设 CPU 核数
pipeline.batch.size: 500 # 每个批次处理的 event 数
pipeline.batch.delay: 50 # 批次等待时间(ms)
pipeline.ordered: auto # 是否保证顺序
queue.type: persisted # 持久化队列,防止积压丢数据
queue.max_bytes: 4gb
6. 踩坑案例
案例 1:时区问题导致 Kibana 时间错乱
现象:Kibana Discover 里日志时间比实际时间晚了 8 小时。
原因:Nginx 日志中的时间戳没有时区信息,Logstash 默认按 UTC 解析,Kibana 又按 UTC+8 显示。
解决:
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
timezone => "Asia/Shanghai" # 加上时区
}
案例 2:Grok 解析失败率过高
现象:大量日志带有 _grokparsefailure tag。
原因:Nginx 日志格式是自定义的,默认 NGINXACCESS 模式不匹配。
解决:用 Grok Debugger 在线调试 Grok 表达式,确认匹配成功后再写入配置。
案例 3:Filebeat 重复发送日志
现象:ES 中同一条日志出现多次。
原因:Filebeat 的 registry 文件(记录读取位置)损坏或被删除,导致 Filebeat 重新读取。
解决:检查 data/registry/filebeat/ 目录权限,确保 Filebeat 进程有读写权限。
总结
| 阶段 | 关键配置 | 关键参数 |
|---|---|---|
| Filebeat 采集 | inputs、multiline |
harvester_limit、loadbalance |
| Logstash 处理 | grok、mutate、date、geoip |
pipeline.workers、batch.size、queue.type |
| ES 存储 | ILM Policy、Index Template | number_of_shards、refresh_interval |
| Kibana 展示 | Index Pattern、Discover | 时区设置 |
ELK 管道不是搭好就完事的。随着日志量增长,你会先后遇到 Logstash 内存不足、ES 写入 rejection、Filebeat 文件句柄耗尽等问题。理解每层的瓶颈特征并提前做好容量规划,才是长期稳定运行的保障。