代价较小的批量操作 | Elasticsearch: 权威指南 | Elastic
2025-01-26
与 mget
可以使我们一次取回多个文档同样的方式,
bulk
API 允许在单个步骤中进行多次 create
、 index
、 update
或 delete
请求。
如果你需要索引一个数据流比如日志事件,它可以排队和索引数百或数千批次。
bulk
与其他的请求体格式稍有不同,如下所示:
{ action: { metadata }}\n { request body }\n { action: { metadata }}\n { request body }\n ...
这种格式类似一个有效的单行 JSON 文档 流 ,它通过换行符(\n
)连接到一起。注意两个要点:
\n
)结尾, 包括最后一行 。这些换行符被用作一个标记,可以有效分隔行。
在 为什么是有趣的格式? 中, 我们解释为什么 bulk
API 使用这种格式。
action/metadata
行指定 哪一个文档 做 什么操作 。
metadata
应该
指定被索引、创建、更新或者删除的文档的 _index
、 _type
和 _id
。
例如,一个 delete
请求看起来是这样的:
{ "delete": { "_index": "website", "_type": "blog", "_id": "123" }}
request body
行由文档的 _source
本身组成--文档包含的字段和值。它是 index
和 create
操作所必需的,这是有道理的:你必须提供文档以索引。
它也是 update
操作所必需的,并且应该包含你传递给 update
API 的相同请求体: doc
、 upsert
、 script
等等。
删除操作不需要 request body
行。
{ "create": { "_index": "website", "_type": "blog", "_id": "123" }} { "title": "My first blog post" }
如果不指定 _id
,将会自动生成一个 ID :
{ "index": { "_index": "website", "_type": "blog" }} { "title": "My second blog post" }
为了把所有的操作组合在一起,一个完整的 bulk
请求
有以下形式:
POST /_bulk { "delete": { "_index": "website", "_type": "blog", "_id": "123" }} { "create": { "_index": "website", "_type": "blog", "_id": "123" }} { "title": "My first blog post" } { "index": { "_index": "website", "_type": "blog" }} { "title": "My second blog post" } { "update": { "_index": "website", "_type": "blog", "_id": "123", "_retry_on_conflict" : 3} } { "doc" : {"title" : "My updated blog post"} }
这个 Elasticsearch 响应包含 items
数组,
这个数组的内容是以请求的顺序列出来的每个请求的结果。
{ "took": 4, "errors": false, "items": [ { "delete": { "_index": "website", "_type": "blog", "_id": "123", "_version": 2, "status": 200, "found": true }}, { "create": { "_index": "website", "_type": "blog", "_id": "123", "_version": 3, "status": 201 }}, { "create": { "_index": "website", "_type": "blog", "_id": "EiwfApScQiiy7TIKFxRCTw", "_version": 1, "status": 201 }}, { "update": { "_index": "website", "_type": "blog", "_id": "123", "_version": 4, "status": 200 }} ] }
每个子请求都是独立执行,因此某个子请求的失败不会对其他子请求的成功与否造成影响。
如果其中任何子请求失败,最顶层的 error
标志被设置为 true
,并且在相应的请求报告出错误明细:
POST /_bulk { "create": { "_index": "website", "_type": "blog", "_id": "123" }} { "title": "Cannot create - it already exists" } { "index": { "_index": "website", "_type": "blog", "_id": "123" }} { "title": "But we can update it" }
在响应中,我们看到 create
文档 123
失败,因为它已经存在。但是随后的 index
请求,也是对文档 123
操作,就成功了:
{ "took": 3, "errors": true, "items": [ { "create": { "_index": "website", "_type": "blog", "_id": "123", "status": 409, "error": "DocumentAlreadyExistsException [[website][4] [blog][123]: document already exists]" }}, { "index": { "_index": "website", "_type": "blog", "_id": "123", "_version": 5, "status": 200 }} ] }
这也意味着
bulk
请求不是原子的: 不能用它来实现事务控制。每个请求是单独处理的,因此一个请求的成功或失败不会影响其他的请求。
也许你正在批量索引日志数据到相同的 index
和 type
中。
但为每一个文档指定相同的元数据是一种浪费。相反,可以像 mget
API 一样,在 bulk
请求的 URL 中接收默认的 /_index
或者 /_index/_type
:
POST /website/_bulk { "index": { "_type": "log" }} { "event": "User logged in" }
你仍然可以覆盖元数据行中的 _index
和 _type
, 但是它将使用 URL 中的这些元数据值作为默认值:
POST /website/log/_bulk { "index": {}} { "event": "User logged in" } { "index": { "_type": "blog" }} { "title": "Overriding the default type" }
整个批量请求都需要由接收到请求的节点加载到内存中,因此该请求越大,其他请求所能获得的内存就越少。 批量请求的大小有一个最佳值,大于这个值,性能将不再提升,甚至会下降。 但是最佳值不是一个固定的值。它完全取决于硬件、文档的大小和复杂度、索引和搜索的负载的整体情况。
幸运的是,很容易找到这个 最佳点 :通过批量索引典型文档,并不断增加批量大小进行尝试。 当性能开始下降,那么你的批量大小就太大了。一个好的办法是开始时将 1,000 到 5,000 个文档作为一个批次, 如果你的文档非常大,那么就减少批量的文档个数。
密切关注你的批量请求的物理大小往往非常有用,一千个 1KB 的文档是完全不同于一千个 1MB 文档所占的物理大小。 一个好的批量大小在开始处理后所占用的物理大小约为 5-15 MB。
官方地址:https://www.elastic.co/guide/cn/elasticsearch/guide/current/bulk.html