最近的爬虫项目里涉及往ES中大量写入数据,因此做了一些调研。
总而言之,py-elasticsearch库推荐使用helper.bulk相关函数进行批量写入(实际是批量执行,不仅限于写入),而bulk有三个相关函数:
- parallel_bulk():并发批量执行;
- streaming_bulk():流式批量执行;
- bulk():在源码里可以看到,本质上是对streaming_bulk()的封装,返回了统计结果,方便处理。
* 先附上三个函数的代码范例
官方文档:https://elasticsearch-py.readthedocs.io/en/master/helpers.html#example
from elasticsearch.helpers import streaming_bulk, parallel_bulk, bulk, scan
def generate_actions():
for doc in doc_list:
yield doc
# 1. parallel_bulk(还可以用类似streaming_bulk的for循环)
deque(parallel_bulk(client=self.es, index=index, doc_type="doc", actions=generate_actions(), chunk_size=3000, thread_count=32), maxlen=0)
# 2. streaming_bulk
for ok, action in streaming_bulk(client=self.es, index=index, doc_type="doc", actions=generate_actions(), max_retries=5):
pass
# 3. bulk
bulk(client=self.es, doc_type="doc", index=index, actions=generate_actions())
一次性写入5w条数据的耗时测试(多次实验)
设备 | parallel_bulk | streaming_bulk | bulk | 逐条写入 |
---|---|---|---|---|
i5-7200u的笔记本 | 4-5秒 | 15秒 | 15秒+ | 2000秒+ |
8核32线程64G的服务器 | 2秒 | 4秒 | 4秒 | 50秒 |
结论:如果对写入耗时要求不高,用bulk()即可;如果需要判断每一条写入的返回状态,用streaming_bulk();追求速度的话,用parallel_bulk()。
网友评论