ELK 管道全流程

22 May 2026 – wusfe · 4 min read

Filebeat -> Logstash -> ES -> Kibana 管道全流程

ELK(Elasticsearch、Logstash、Kibana)加上 Filebeat(或统称 ELK Stack)是目前最主流的日志中心化解决方案。本文从零解析 Filebeat -> Logstash -> Elasticsearch -> Kibana 的完整数据管道,给出可落地的配置示例。


1. 各组件职责

组件 职责 特点
Filebeat 日志采集 轻量级,占用资源极低,负责 tail 日志文件并发送
Logstash 解析与过滤 强大的 pipeline 处理:grok 解析、字段转换、geoip 丰富
Elasticsearch 存储与搜索 存储结构化日志,提供全文搜索和聚合
Kibana 可视化与查询 Web UI 搜索日志、创建仪表盘
应用服务器 (Nginx/Java/MySQL)
    │
    ▼
Filebeat 采集日志文件
    │
    ▼
Logstash 解析、过滤、丰富
    │
    ▼
Elasticsearch 存储索引
    │
    ▼
Kibana 查询 & 可视化

2. Filebeat 配置

场景一:Filebeat 直连 ES(小规模,无需复杂过滤)

# filebeat.yml
filebeat.inputs:
  - type: log
    enabled: true
    paths:
      - /var/log/nginx/access.log
    tags: ["nginx", "access"]
    fields:
      service: nginx
      environment: production
    fields_under_root: false
    # 多行日志合并(如 Java 异常堆栈)
    multiline:
      pattern: '^\d{4}-\d{2}-\d{2}'
      negate: true
      match: after

output.elasticsearch:
  hosts: ["https://es-node:9200"]
  username: "elastic"
  password: "changeme"
  ssl.verification_mode: none
  index: "nginx-%{+yyyy.MM.dd}"

场景二:Filebeat 输出到 Logstash(生产推荐)

# filebeat.yml
filebeat.inputs:
  - type: log
    enabled: true
    paths:
      - /var/log/nginx/access.log

output.logstash:
  hosts: ["logstash:5044"]
  loadbalance: true
  worker: 2

3. Logstash Pipeline 配置

Logstash 的 pipeline 三个核心阶段:Input -> Filter -> Output

解析 Nginx 访问日志的完整 Pipeline

# /etc/logstash/conf.d/nginx.conf
input {
  beats {
    port => 5044
    # 可以设置 SSL
    # ssl => true
    # ssl_certificate => "/etc/logstash/certs/logstash.crt"
    # ssl_key => "/etc/logstash/certs/logstash.key"
  }
}

filter {
  # 1. 如果 Filebeat 采集的是 JSON 格式日志,可以直接解析
  if [fields][service] == "nginx-json" {
    json {
      source => "message"
    }
  }

  # 2. Grok 解析 Nginx 标准 access log
  if [fields][service] == "nginx" {
    grok {
      match => {
        "message" => '%{IPORHOST:client_ip} - %{DATA:remote_user} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{DATA:request} HTTP/%{NUMBER:http_version}" %{NUMBER:response_code} %{NUMBER:body_bytes_sent} "%{DATA:referer}" "%{DATA:user_agent}"'
      }
      # 解析失败时打 tag
      tag_on_failure => ["_grokparsefailure"]
    }

    # 3. 字段类型转换
    mutate {
      convert => {
        "response_code" => "integer"
        "body_bytes_sent" => "integer"
      }
      # 移除不需要的字段
      remove_field => ["@version", "host", "input"]
      # 重命名字段
      rename => { "@timestamp" => "event_time" }
    }

    # 4. 时间戳解析
    date {
      match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
      target => "@timestamp"
    }

    # 5. GeoIP 丰富(根据 client_ip 解析地理位置)
    geoip {
      source => "client_ip"
      target => "geo"
      add_tag => ["geoip"]
    }

    # 6. User Agent 解析
    useragent {
      source => "user_agent"
      target => "ua"
    }
  }

  # 7. 删除 grok 解析失败的日志(或者单独路由到错误队列)
  if "_grokparsefailure" in [tags] {
    drop { }
  }
}

output {
  # 输出到 ES
  elasticsearch {
    hosts => ["https://es-node:9200"]
    user => "elastic"
    password => "changeme"
    ssl => true
    cacert => "/etc/logstash/certs/ca.crt"
    index => "nginx-%{+yyyy.MM.dd}"
    # 使用 ILM 管理索引生命周期
    ilm_enabled => true
    ilm_rollover_alias => "nginx"
    ilm_pattern => "000001"
  }

  # stdout 调试用(生产注释掉)
  stdout { codec => rubydebug }
}

Grok 常用模式速查

模式 匹配内容 示例
%{IPORHOST:client} IP 或主机名 192.168.1.100
%{HTTPDATE:ts} HTTP 标准日期 10/Jun/2024:08:55:36 +0800
%{WORD:method} HTTP 方法 GET/POST/PUT
%{NUMBER:code} 数字 200/404/500
%{GREEDYDATA:ua} 剩余所有字符(贪婪) User-Agent 字符串
%{LOGLEVEL:level} 日志级别 INFO/WARN/ERROR

4. ILM + Rollover 配合日志索引自动管理

PUT _ilm/policy/nginx_ilm_policy
{
  "policy": {
    "phases": {
      "hot": {
        "min_age": "0ms",
        "actions": {
          "rollover": {
            "max_primary_shard_size": "30gb",
            "max_age": "1d"
          },
          "set_priority": {
            "priority": 100
          }
        }
      },
      "warm": {
        "min_age": "3d",
        "actions": {
          "shrink": {
            "number_of_shards": 1
          },
          "forcemerge": {
            "max_num_segments": 1
          },
          "set_priority": {
            "priority": 50
          }
        }
      },
      "cold": {
        "min_age": "7d",
        "actions": {
          "set_priority": {
            "priority": 0
          }
        }
      },
      "delete": {
        "min_age": "30d",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

Rollover 的好处:索引大小可控,热数据在 SSD 上,冷数据自动迁移到慢存储,过期数据自动删除。


5. 常见瓶颈与排查

瓶颈位置 症状 原因 解决
Filebeat 积压 Kafka/Logstash 消费不过来 下游处理慢 增加 Filebeat worker、调大 harvester_limit
Logstash 内存不足 pipeline 频繁 GC、事件堆积 Filter 过多或 Grok 正则复杂度高 增加 JVM 内存(-Xmx)、拆分 pipeline
Logstash CPU 打满 input 队列积压 Grok 解析太多 dissect 替代 grok(性能高 10 倍+)、增加 Logstash 节点
ES 写入瓶颈 429 rejected refresh 间隔太短、shard 太多 调大 refresh_interval、减少 shard 数
网络带宽打满 Filebeat -> Logstash 或 Logstash -> ES 延迟高 网络带宽不足 压缩传输(compression_level: 3)、使用持久队列

Logstash 性能优化配置

# logstash.yml
pipeline.workers: 8          # 每个 pipeline 的 worker 数,设 CPU 核数
pipeline.batch.size: 500     # 每个批次处理的 event 数
pipeline.batch.delay: 50     # 批次等待时间(ms)
pipeline.ordered: auto       # 是否保证顺序
queue.type: persisted        # 持久化队列,防止积压丢数据
queue.max_bytes: 4gb

6. 踩坑案例

案例 1:时区问题导致 Kibana 时间错乱

现象:Kibana Discover 里日志时间比实际时间晚了 8 小时。

原因:Nginx 日志中的时间戳没有时区信息,Logstash 默认按 UTC 解析,Kibana 又按 UTC+8 显示。

解决

date {
  match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
  target => "@timestamp"
  timezone => "Asia/Shanghai"   # 加上时区
}

案例 2:Grok 解析失败率过高

现象:大量日志带有 _grokparsefailure tag。

原因:Nginx 日志格式是自定义的,默认 NGINXACCESS 模式不匹配。

解决:用 Grok Debugger 在线调试 Grok 表达式,确认匹配成功后再写入配置。

案例 3:Filebeat 重复发送日志

现象:ES 中同一条日志出现多次。

原因:Filebeat 的 registry 文件(记录读取位置)损坏或被删除,导致 Filebeat 重新读取。

解决:检查 data/registry/filebeat/ 目录权限,确保 Filebeat 进程有读写权限。


总结

阶段 关键配置 关键参数
Filebeat 采集 inputsmultiline harvester_limitloadbalance
Logstash 处理 grokmutatedategeoip pipeline.workersbatch.sizequeue.type
ES 存储 ILM Policy、Index Template number_of_shardsrefresh_interval
Kibana 展示 Index Pattern、Discover 时区设置

ELK 管道不是搭好就完事的。随着日志量增长,你会先后遇到 Logstash 内存不足、ES 写入 rejection、Filebeat 文件句柄耗尽等问题。理解每层的瓶颈特征并提前做好容量规划,才是长期稳定运行的保障。