美文网首页
python读文件保存json保存mongo,多进程,协程,计时

python读文件保存json保存mongo,多进程,协程,计时

作者: 程序里的小仙女 | 来源:发表于2020-07-14 20:00 被阅读0次
    # -*- coding: utf-8 -*-
    """
     @Time   : 2020/7/10 17:22
     @Athor   : LinXiao
     @功能   :
    """
    # ------------------------------
    import asyncio
    import json
    import multiprocessing
    import os
    import platform
    import timeit
    from copy import deepcopy
    from pprint import pprint
    
    from loguru import logger
    from motor.motor_asyncio import AsyncIOMotorClient
    
    from redis import Redis
    
    SAVE_PATH=r"/backup/btc/btc_outputList_tx"
    que=multiprocessing.Queue()
    redis=Redis(db=2)
    
    
    def file_path(dirname):
        redis.expire("json_path", 0)
        for root, _, files in os.walk(dirname):
            for f in files:
                file_path=os.path.join(root, f)
                # 将每一个json path存入redis
                redis.lpush("json_path", file_path)
                logger.info(f"{file_path}  to  redis!")
        length=redis.llen("json_path")
        logger.info(f"总共有{length} 个文件")
    
    
    # 清洗新的btc
    async def get_outputList(tx):
        voutValues=0
        vinValues=0
    
        outputValue=0
        vout=tx["vout"]  # list
        vin=tx["vin"]  # list
        for vi in vin:
            outputList=[]
            if vi['id'] == 'coinbase':
                for vout_data in vout:
                    try:
                        if vout_data["address"] != "":
                            dic={
                                "address": vout_data.get("address")[0],
                                "value": vout_data.get("value")
                            }
                            outputValue=dic.get("value")
                            outputList.append(dic)
    
                            btc_data={
    
                                "outputList": outputList,
                                "voutValues": outputValue,
                                "vinValues": 0,
                                "outputValue": dic.get("value"),
                                "fee": dic.get("value"),
                            }
                            tx.update(btc_data)
                            return tx
                    except Exception as e:
                        logger.error(e)
                        continue
    
            else:
                # 判断vin中的地址是否在vout中出现,若出现在vout中,则删除掉vout中的这笔交易
                try:
                    vout1=deepcopy(vout)
                    for vi in vin:
                        vinValues+=vi.get("value")
                        for vo in vout1:
                            if vi["address"] == vo.get("address"):
                                vout1.remove(vo)
    
                    for vo1 in vout1:
                        # 这里要判断vout的address的值得类型,大多数时候是[],但是612001块就出现了""空字符串
                        output_address=str(vo1.get("address")[0]) if isinstance(vo1.get("address"), list) else ""
                        output={
                            "address": output_address,
                            "value": vo1.get("value")
                        }
                        outputList.append(output)
                        if vo1.get("value"):
                            outputValue+=float(vo1.get("value"))
    
                except Exception as e:
                    logger.error(tx)
                    continue
    
            for vot in vout:
                voutValues+=vot.get("value")
    
            fee=float(vinValues - voutValues)
    
            btc_data={
    
                "outputList": outputList,
                "voutValues": voutValues,
                "vinValues": vinValues,
                "outputValue": outputValue,
                "fee": fee,
            }
            tx.update(btc_data)
            return tx
    
    
    # 读文件
    async def read_in_line(file_path):
        with open(file_path, "r") as f:
            while True:
                line=f.readline()
                if line:
                    yield line
                else:
                    return  # 如果读取到文件末尾,则退出
    
    
    # 存为json文件
    async def save_json(tx, path):
        # 先将字典对象转化为可写入文本的字符串
        item=json.dumps(tx)
        try:
            if not os.path.exists(path):
                with open(path, "w", encoding='utf-8') as f:
                    f.write(item + "\n")
    
            else:
                with open(path, "a", encoding='utf-8') as f:
                    # f.write(item + ",\n")   # 这里不能有",",不然会导致mongo import的时候报错:
                    f.write(item + "\n")
        except Exception as e:
            print(e)
    
    
    # 存入mongo库
    def save_to_mongo(data):
        # 保存到mongo数据库中
        db_url='39.99.160.162'
        db_port=27017
        db_name="btc_tx_new"
        db_collection="transaction"
        # 建立连接
        client=AsyncIOMotorClient(db_url, db_port)
    
        # 连接某个库名字
        db=client[db_name][db_collection]
    
        # db.insert_many([i for i in data])
        db.insert_one(data)
    
    
    async def get_btc_data():
        # async def get_btc_data(dirname):
        # file_list=await file_path(dirname)
        # for path in file_list:  # todo 开多进程的时候,每个进程从redis中取一个path
        process=os.getpid()
    
        while True:
    
            json_path=redis.rpop("json_path")
    
            if json_path is None:
                break
            path=bytes.decode(json_path)
    
            logger.info(f"process_{process} start handle | {path} ")
            start=timeit.default_timer()
    
            with open(path, 'r') as f:
                save_list=[]
    
                while True:
                    data=f.readline()
                    if not data:
                        break
                    tx=json.loads(data)
                    btc_data=await get_outputList(tx)
    
                    if platform.system() == 'Linux' or platform.system() == 'Darwin':
                        save_path=SAVE_PATH
                        name=path.split("/")[-1]  # Linux下
                    else:
                        save_path=r"D:\Project\etc\Chain-Storage\src\core\test"
                        name=path.split("\\")[-1]  # windows下
    
                    # 存为新json
                    new_path=f"{save_path}/new_{name}"
                    logger.info(f"save {btc_data}")
                    await save_json(btc_data, new_path)
    
                    # todo 直接存入mongo数据库 太慢了,每秒200条,共5亿条,则需要722小时
                    # save_to_mongo(btc_data)
    
                elapsed=(timeit.default_timer() - start)
            logger.info(f"process_{process} write {name} success! | used {elapsed} s")
    
    
    def main():
        loop=asyncio.get_event_loop()
        loop.run_until_complete(get_btc_data())
        loop.close()
    
    
    if __name__ == '__main__':
        if platform.system() == 'Linux' or platform.system() == 'Darwin':
            dirname=r"/backup/btc/btc_new_tx11"
        else:
            dirname=r"D:\Project\etc\Chain-Storage\src\core\test"
            # dirname=r"D:\Project\etc\Chain-Storage\src\core\test\test1"
    
        file_path(dirname)
    
        # 多进程
        process_count=3
        # pool=multiprocessing.Pool(process_count)
        with multiprocessing.Pool(process_count) as  pool:
            for i in range(process_count):
                # pool.apply_async(asyncio.get_event_loop().run_until_complete(Url().save_url_to_redis()), (i,))  # 多进程调用协程 ,将协程函数重复执行三次,
                pool.apply_async(main)  # 多进程调用普通函数
    
            pool.close()
            pool.join()
    
        # 进程队列
        # que.put("1")
        #     # que.put("255")
        # que.put("1wqert")
        # if platform.system() == 'Linux' or platform.system() == 'Darwin':
        #     with open(r"/backup/btc/btc_erro/erro_message.json", "w", encoding='utf-8') as f:
        #         while not que.empty():
        #             res = que.get()
        #             if res:
        #                 f.write(res+"\n")
        #             else:
        #                 break
        # else:
        #     with open(r"err.json", "w", encoding='utf-8') as f:
        #         while not que.empty():
        #             res = que.get()
        #             if res:
        #                 f.write(res+"\n")
        #             else:
        #                 break
    
    

    相关文章

      网友评论

          本文标题:python读文件保存json保存mongo,多进程,协程,计时

          本文链接:https://www.haomeiwen.com/subject/htzfhktx.html