Bulk API 的最佳实践与避坑

22 May 2026 – wusfe · 3 min read

Bulk API 的最佳实践与避坑

有个数据同步服务,从 MySQL 把订单数据推到 ES。单条插入,每秒钟 200 个请求,TPS 稳如老狗——直到某天运营做了个活动,瞬时同步量翻了 10 倍,ES 集群直接被打爆,429 Too Many Requests 刷满监控。

根因:犯了 ES 写入的第一个大忌——逐条写入

本篇讲透 Bulk API:为什么快、怎么用不踩坑、以及生产环境调优参数怎么设。

为什么 Bulk 能快几十倍

ES 的写入流程大致分为三段:

客户端请求 → 协调节点路由 → 主分片写入 → 副本同步 → 返回

逐条写入时,每个文档都要独立完成这个完整链路,大头开销在网络往返索引刷新。Bulk 把多条写入打包到一个 HTTP 请求里,整个链路只走一次。

逐条写入:200 req × (网络RTT + 索引刷新) ≈ 200 × 10ms = 2000ms
Bulk 写入:1 req × (网络RTT + 一次刷新) ≈ 50ms(200条)

实测:1000 条文档逐条写入约 8-10 秒,同样数据用 Bulk 约 0.3-0.5 秒,20-30 倍差距

Bulk API 的格式

POST /_bulk
{ "index": { "_index": "orders", "_id": "001" } }
{ "order_id": "001", "amount": 8999, "status": "paid" }
{ "index": { "_index": "orders", "_id": "002" } }
{ "order_id": "002", "amount": 5999, "status": "pending" }
{ "delete": { "_index": "orders", "_id": "003" } }
{ "update": { "_index": "orders", "_id": "004" } }
{ "doc": { "status": "canceled" } }

格式要点:

  • 每行用换行符 \n 分隔,最后一行也要以 \n 结尾
  • Content-Type 用 application/x-ndjson(Newline Delimited JSON)
  • 一个 Bulk 里可以混用 index、create、update、delete

批量多大合适

不是越大越好。 Bulk 体量过大会导致:

  • 单次请求耗时过长,客户端超时
  • 占用 ES HTTP 线程,其他请求排队
  • 内存峰值过高

选一个数据量窗口而不是固定的文档数。推荐标准:

指标 推荐值
单次文档数 500 - 5,000 条
单次数据量 5MB - 15MB
单次耗时 < 5 秒

自适应批量发送(Go 示例):

var batch []Doc
var batchBytes int64

for _, doc := range docs {
    data, _ := json.Marshal(doc)
    batch = append(batch, doc)
    batchBytes += int64(len(data))

    if batchBytes >= 10*1024*1024 || len(batch) >= 5000 {
        esClient.Bulk(batch)
        batch = nil
        batchBytes = 0
    }
}
// 最后一批
if len(batch) > 0 {
    esClient.Bulk(batch)
}

用数据量(5-15MB)而不是文档数来控制批量大小更稳定——小文档一次可以几千条,大文档可能几百条就到阈值了。

Bulk 的响应处理

Bulk 返回 HTTP 200 不代表每条都成功了:

{
  "took": 350,
  "errors": true,
  "items": [
    { "index": { "_index": "orders", "_id": "001", "status": 201 } },
    { "index": { "_index": "orders", "_id": "002", "status": 409, "error": { "type": "version_conflict_engine_exception", ... } } },
    { "index": { "_index": "orders", "_id": "003", "status": 429, "error": { "type": "es_rejected_execution_exception", ... } } }
  ]
}

errors: true 告诉你这批里有失败。你必须逐条检查 items[].index.status

resp, err := esClient.Bulk(bulkRequest)
if err != nil {
    log.Fatal(err)
}
if resp.HasFailures() {
    for _, item := range resp.Items {
        if item.IsFailed() {
            id := item.ID
            errMsg := item.FailureMessage
            // 429 → 重试队列
            // 409 → 版本冲突,记录
            // 400 → 数据格式问题,死信队列
            handleFailure(id, item.Status, errMsg)
        }
    }
}

三条关键优化参数

1. refresh_interval:控制可见延迟

默认 1s,每次 refresh 生成一个新的 segment 文件,文档变为可搜索。大批量导入时关闭以减少 segment 碎片:

PUT /orders/_settings
{ "index": { "refresh_interval": "-1" } }

// 导完恢复
PUT /orders/_settings
{ "index": { "refresh_interval": "30s" } }

设为 -1 意味着停用自动刷新,导入完成后再手动 POST /orders/_refresh

2. number_of_replicas:先关副本再导入

写入要同步到副本,副本越多写入越慢。大批量导入时先把副本数设 0:

PUT /orders/_settings
{ "index": { "number_of_replicas": 0 } }

// 导完恢复
PUT /orders/_settings
{ "index": { "number_of_replicas": 1 } }

这两项配合大流量导入,写入速度能翻 3-5 倍。

3. translog 异步化:牺牲少量安全性换速度

PUT /orders/_settings
{
  "index": {
    "translog.durability": "async",
    "translog.sync_interval": "30s"
  }
}

默认 request(每次请求都 fsync translog),改为 async 后 ES 每 30 秒把 translog 刷到磁盘。缺点是 ES 崩溃可能丢最后 30 秒的数据。适合数据可以从上游重放的场景。

坑 1:Bulk 请求过大导致超时

Bulk 默认没有超时配置,ES 会把整个请求读完才开始处理。如果客户端 50MB 数据慢速发包,HTTP 线程一直被占用。

解决:客户端设置 HTTP 超时(connect timeout 2s, read timeout 30s),单批控制在 15MB 以内。

坑 2:429 不重试

高并发下 ES 线程池满了会返回 429。不加重试逻辑,数据就丢了。

maxRetries := 3
for i := 0; i < maxRetries; i++ {
    rsp, _ := esClient.Bulk(batch)
    if !rsp.HasFailures() {
        break
    }

    var retry []Doc
    for _, item := range rsp.Items {
        if item.Status == 429 {
            retry = append(retry, batch[item.ItemID])
        }
    }
    if len(retry) == 0 {
        break
    }

    time.Sleep(time.Duration(math.Pow(2, float64(i))) * time.Second) // 1s, 2s, 4s 退避
    batch = retry
}

总结

实践 要点
用 Bulk,不逐条写 性能差 20-30 倍
按数据量 5-15MB 控制批次 不要只按文档数
批量导入时关 refresh、关副本 速度翻 3-5 倍
处理 429,指数退避重试 别丢数据
逐条检查 Bulk 响应 200 不等于全部成功