Bulk API 的最佳实践与避坑
Bulk API 的最佳实践与避坑
有个数据同步服务,从 MySQL 把订单数据推到 ES。单条插入,每秒钟 200 个请求,TPS 稳如老狗——直到某天运营做了个活动,瞬时同步量翻了 10 倍,ES 集群直接被打爆,429 Too Many Requests 刷满监控。
根因:犯了 ES 写入的第一个大忌——逐条写入。
本篇讲透 Bulk API:为什么快、怎么用不踩坑、以及生产环境调优参数怎么设。
为什么 Bulk 能快几十倍
ES 的写入流程大致分为三段:
客户端请求 → 协调节点路由 → 主分片写入 → 副本同步 → 返回
逐条写入时,每个文档都要独立完成这个完整链路,大头开销在网络往返和索引刷新。Bulk 把多条写入打包到一个 HTTP 请求里,整个链路只走一次。
逐条写入:200 req × (网络RTT + 索引刷新) ≈ 200 × 10ms = 2000ms
Bulk 写入:1 req × (网络RTT + 一次刷新) ≈ 50ms(200条)
实测:1000 条文档逐条写入约 8-10 秒,同样数据用 Bulk 约 0.3-0.5 秒,20-30 倍差距。
Bulk API 的格式
POST /_bulk
{ "index": { "_index": "orders", "_id": "001" } }
{ "order_id": "001", "amount": 8999, "status": "paid" }
{ "index": { "_index": "orders", "_id": "002" } }
{ "order_id": "002", "amount": 5999, "status": "pending" }
{ "delete": { "_index": "orders", "_id": "003" } }
{ "update": { "_index": "orders", "_id": "004" } }
{ "doc": { "status": "canceled" } }
格式要点:
- 每行用换行符
\n分隔,最后一行也要以\n结尾 - Content-Type 用
application/x-ndjson(Newline Delimited JSON) - 一个 Bulk 里可以混用 index、create、update、delete
批量多大合适
不是越大越好。 Bulk 体量过大会导致:
- 单次请求耗时过长,客户端超时
- 占用 ES HTTP 线程,其他请求排队
- 内存峰值过高
选一个数据量窗口而不是固定的文档数。推荐标准:
| 指标 | 推荐值 |
|---|---|
| 单次文档数 | 500 - 5,000 条 |
| 单次数据量 | 5MB - 15MB |
| 单次耗时 | < 5 秒 |
自适应批量发送(Go 示例):
var batch []Doc
var batchBytes int64
for _, doc := range docs {
data, _ := json.Marshal(doc)
batch = append(batch, doc)
batchBytes += int64(len(data))
if batchBytes >= 10*1024*1024 || len(batch) >= 5000 {
esClient.Bulk(batch)
batch = nil
batchBytes = 0
}
}
// 最后一批
if len(batch) > 0 {
esClient.Bulk(batch)
}
用数据量(5-15MB)而不是文档数来控制批量大小更稳定——小文档一次可以几千条,大文档可能几百条就到阈值了。
Bulk 的响应处理
Bulk 返回 HTTP 200 不代表每条都成功了:
{
"took": 350,
"errors": true,
"items": [
{ "index": { "_index": "orders", "_id": "001", "status": 201 } },
{ "index": { "_index": "orders", "_id": "002", "status": 409, "error": { "type": "version_conflict_engine_exception", ... } } },
{ "index": { "_index": "orders", "_id": "003", "status": 429, "error": { "type": "es_rejected_execution_exception", ... } } }
]
}
errors: true 告诉你这批里有失败。你必须逐条检查 items[].index.status:
resp, err := esClient.Bulk(bulkRequest)
if err != nil {
log.Fatal(err)
}
if resp.HasFailures() {
for _, item := range resp.Items {
if item.IsFailed() {
id := item.ID
errMsg := item.FailureMessage
// 429 → 重试队列
// 409 → 版本冲突,记录
// 400 → 数据格式问题,死信队列
handleFailure(id, item.Status, errMsg)
}
}
}
三条关键优化参数
1. refresh_interval:控制可见延迟
默认 1s,每次 refresh 生成一个新的 segment 文件,文档变为可搜索。大批量导入时关闭以减少 segment 碎片:
PUT /orders/_settings
{ "index": { "refresh_interval": "-1" } }
// 导完恢复
PUT /orders/_settings
{ "index": { "refresh_interval": "30s" } }
设为 -1 意味着停用自动刷新,导入完成后再手动 POST /orders/_refresh。
2. number_of_replicas:先关副本再导入
写入要同步到副本,副本越多写入越慢。大批量导入时先把副本数设 0:
PUT /orders/_settings
{ "index": { "number_of_replicas": 0 } }
// 导完恢复
PUT /orders/_settings
{ "index": { "number_of_replicas": 1 } }
这两项配合大流量导入,写入速度能翻 3-5 倍。
3. translog 异步化:牺牲少量安全性换速度
PUT /orders/_settings
{
"index": {
"translog.durability": "async",
"translog.sync_interval": "30s"
}
}
默认 request(每次请求都 fsync translog),改为 async 后 ES 每 30 秒把 translog 刷到磁盘。缺点是 ES 崩溃可能丢最后 30 秒的数据。适合数据可以从上游重放的场景。
坑 1:Bulk 请求过大导致超时
Bulk 默认没有超时配置,ES 会把整个请求读完才开始处理。如果客户端 50MB 数据慢速发包,HTTP 线程一直被占用。
解决:客户端设置 HTTP 超时(connect timeout 2s, read timeout 30s),单批控制在 15MB 以内。
坑 2:429 不重试
高并发下 ES 线程池满了会返回 429。不加重试逻辑,数据就丢了。
maxRetries := 3
for i := 0; i < maxRetries; i++ {
rsp, _ := esClient.Bulk(batch)
if !rsp.HasFailures() {
break
}
var retry []Doc
for _, item := range rsp.Items {
if item.Status == 429 {
retry = append(retry, batch[item.ItemID])
}
}
if len(retry) == 0 {
break
}
time.Sleep(time.Duration(math.Pow(2, float64(i))) * time.Second) // 1s, 2s, 4s 退避
batch = retry
}
总结
| 实践 | 要点 |
|---|---|
| 用 Bulk,不逐条写 | 性能差 20-30 倍 |
| 按数据量 5-15MB 控制批次 | 不要只按文档数 |
| 批量导入时关 refresh、关副本 | 速度翻 3-5 倍 |
| 处理 429,指数退避重试 | 别丢数据 |
| 逐条检查 Bulk 响应 | 200 不等于全部成功 |