美文网首页
py-elasticsearch的stream_bulk、par

py-elasticsearch的stream_bulk、par

作者: RedB | 来源:发表于2021-04-29 17:13 被阅读0次

    最近的爬虫项目里涉及往ES中大量写入数据,因此做了一些调研。
    总而言之,py-elasticsearch库推荐使用helper.bulk相关函数进行批量写入(实际是批量执行,不仅限于写入),而bulk有三个相关函数:

    • parallel_bulk():并发批量执行;
    • streaming_bulk():流式批量执行;
    • bulk():在源码里可以看到,本质上是对streaming_bulk()的封装,返回了统计结果,方便处理。

    * 先附上三个函数的代码范例

    官方文档:https://elasticsearch-py.readthedocs.io/en/master/helpers.html#example

    from elasticsearch.helpers import streaming_bulk, parallel_bulk, bulk, scan
    def generate_actions():
        for doc in doc_list:
            yield doc
    
    # 1. parallel_bulk(还可以用类似streaming_bulk的for循环)
    deque(parallel_bulk(client=self.es, index=index, doc_type="doc", actions=generate_actions(), chunk_size=3000, thread_count=32), maxlen=0)
    
    # 2. streaming_bulk
    for ok, action in streaming_bulk(client=self.es, index=index, doc_type="doc", actions=generate_actions(), max_retries=5):
        pass
    
    # 3. bulk
    bulk(client=self.es, doc_type="doc", index=index, actions=generate_actions())
    

    一次性写入5w条数据的耗时测试(多次实验)

    设备 parallel_bulk streaming_bulk bulk 逐条写入
    i5-7200u的笔记本 4-5秒 15秒 15秒+ 2000秒+
    8核32线程64G的服务器 2秒 4秒 4秒 50秒

    结论:如果对写入耗时要求不高,用bulk()即可;如果需要判断每一条写入的返回状态,用streaming_bulk();追求速度的话,用parallel_bulk()。

    相关文章

      网友评论

          本文标题:py-elasticsearch的stream_bulk、par

          本文链接:https://www.haomeiwen.com/subject/uzrwrltx.html