美文网首页
btc tx 多进程脚本

btc tx 多进程脚本

作者: 程序里的小仙女 | 来源:发表于2020-07-13 20:13 被阅读0次

多进程脚本实例:注意多进程调用函数时没有()


# -*- coding: utf-8 -*-
"""
 @Time   : 2020/7/10 17:22
 @Athor   : LinXiao
 @功能   :
"""
# ------------------------------
import asyncio
import json
import multiprocessing
import os
import platform
import timeit
from copy import deepcopy
from pprint import pprint

from loguru import logger

from redis import Redis

SAVE_PATH=r"/backup/btc/btc_outputList_tx"
que=multiprocessing.Queue()
redis=Redis(db=1)


def file_path(dirname):
    for root, _, files in os.walk(dirname):
        for f in files:
            file_path=os.path.join(root, f)
            # 将每一个json path存入redis
            redis.lpush("json_path", file_path)
            logger.info(f"{file_path}  to  redis!")
    length=redis.llen("json_path")
    logger.info(f"总共有{length} 个文件")


# 清洗新的btc
def get_outputList(tx):
    voutValues=0
    vinValues=0

    outputValue=0
    vout=tx["vout"]  # list
    vin=tx["vin"]  # list
    for vi in vin:
        outputList=[]
        if vi['id'] == 'coinbase':
            for vout_data in vout:
                try:
                    if vout_data["address"] != "":
                        dic={
                            "address": vout_data.get("address")[0],
                            "value": vout_data.get("value")
                        }
                        outputValue=dic.get("value")
                        outputList.append(dic)

                        btc_data={

                            "outputList": outputList,
                            "voutValues": outputValue,
                            "vinValues": 0,
                            "outputValue": dic.get("value"),
                            "fee": dic.get("value"),
                        }
                        tx.update(btc_data)
                        return tx
                except Exception as e:
                    logger.error(e)
                    continue

        else:
            # 判断vin中的地址是否在vout中出现,若出现在vout中,则删除掉vout中的这笔交易
            try:
                vout1=deepcopy(vout)
                for vi in vin:
                    vinValues+=vi.get("value")
                    for vo in vout1:
                        if vi["address"] == vo.get("address"):
                            vout1.remove(vo)

                for vo1 in vout1:
                    output={
                        "address": vo1.get("address")[0],
                        "value": vo1.get("value")
                    }
                    if vo1.get("value"):
                        outputValue+= float(vo1.get("value"))
                    outputList.append(output)
            except Exception as e:
                logger.error(tx)
                continue

        for vot in vout:
            voutValues+=vot.get("value")

        fee=float(vinValues - voutValues)

        btc_data={

            "outputList": outputList,
            "voutValues": voutValues,
            "vinValues": vinValues,
            "outputValue": outputValue,
            "fee": fee,
        }
        tx.update(btc_data)
        return tx


# 读文件
def read_in_line(file_path):
    with open(file_path, "r") as f:
        while True:
            line=f.readline()
            if line:
                yield line
            else:
                return  # 如果读取到文件末尾,则退出


# 存为json文件
def save_json(tx, path):
    # 先将字典对象转化为可写入文本的字符串
    item=json.dumps(tx)
    try:
        if not os.path.exists(path):
            with open(path, "w", encoding='utf-8') as f:
                f.write(item + "\n")

        else:
            with open(path, "a", encoding='utf-8') as f:
                # f.write(item + ",\n")   # 这里不能有",",不然会导致mongo import的时候报错:
                f.write(item + "\n")
    except Exception as e:
        print(e)


def get_btc_data():
    # async def get_btc_data(dirname):
    # file_list=await file_path(dirname)
    # for path in file_list:  # todo 开多进程的时候,每个进程从redis中取一个path
    process=os.getpid()

    while True:

        json_path=redis.rpop("json_path")

        if json_path is None:
            break
        path=bytes.decode(json_path)

        logger.info(f"process_{process} start handle | {path} ")
        start=timeit.default_timer()

        with open(path, 'r') as f:

            try:
                while True:
                    data=f.readline()
                    if not data:
                        break
                    tx=json.loads(data)
                    btc_data=get_outputList(tx)

                    if platform.system() == 'Linux' or platform.system() == 'Darwin':
                        save_path=SAVE_PATH
                        name=path.split("/")[-1]  # Linux下
                    else:
                        save_path=r"D:\Project\etc\Chain-Storage\src\core\test"
                        name=path.split("\\")[-1]  # windows下

                    # 存为新json
                    new_path=f"{save_path}/new_{name}"
                    save_json(btc_data, new_path)
                    elapsed=(timeit.default_timer() - start)
            except Exception as e:
                logger.error(e)
                que.put(json.dumps(tx))
                continue
        logger.info(f"process_{process} write {name} success! | used {elapsed} s")


if __name__ == '__main__':
    if platform.system() == 'Linux' or platform.system() == 'Darwin':
        dirname=r"/backup/btc/btc_new_tx11"
    else:
        dirname=r"D:\Project\etc\Chain-Storage\src\core\test"

    file_path(dirname)

    # 多进程
    process_count=30
    # pool=multiprocessing.Pool(process_count)
    with multiprocessing.Pool(process_count) as  pool:
        for i in range(process_count):
            # pool.apply_async(asyncio.get_event_loop().run_until_complete(Url().save_url_to_redis()), (i,))  # 多进程调用协程 ,将协程函数重复执行三次,
            pool.apply_async(get_btc_data)  # 多进程调用普通函数

        pool.close()
        pool.join()

    # que.put("1")
    #     # que.put("255")
    # que.put("1wqert")
    # if platform.system() == 'Linux' or platform.system() == 'Darwin':
    #     with open(r"/backup/btc/btc_erro/erro_message.json", "w", encoding='utf-8') as f:
    #         while not que.empty():
    #             res = que.get()
    #             if res:
    #                 f.write(res+"\n")
    #             else:
    #                 break
    # else:
    #     with open(r"err.json", "w", encoding='utf-8') as f:
    #         while not que.empty():
    #             res = que.get()
    #             if res:
    #                 f.write(res+"\n")
    #             else:
    #                 break

相关文章

网友评论

      本文标题:btc tx 多进程脚本

      本文链接:https://www.haomeiwen.com/subject/abpzcktx.html