# -*- coding: utf-8 -*-
"""
@Time : 2020/7/10 17:22
@Athor : LinXiao
@功能 :
"""
# ------------------------------
import asyncio
import json
import multiprocessing
import os
import platform
import timeit
from copy import deepcopy
from pprint import pprint
from loguru import logger
from motor.motor_asyncio import AsyncIOMotorClient
from redis import Redis
SAVE_PATH=r"/backup/btc/btc_outputList_tx"
que=multiprocessing.Queue()
redis=Redis(db=2)
def file_path(dirname):
redis.expire("json_path", 0)
for root, _, files in os.walk(dirname):
for f in files:
file_path=os.path.join(root, f)
# 将每一个json path存入redis
redis.lpush("json_path", file_path)
logger.info(f"{file_path} to redis!")
length=redis.llen("json_path")
logger.info(f"总共有{length} 个文件")
# 清洗新的btc
async def get_outputList(tx):
voutValues=0
vinValues=0
outputValue=0
vout=tx["vout"] # list
vin=tx["vin"] # list
for vi in vin:
outputList=[]
if vi['id'] == 'coinbase':
for vout_data in vout:
try:
if vout_data["address"] != "":
dic={
"address": vout_data.get("address")[0],
"value": vout_data.get("value")
}
outputValue=dic.get("value")
outputList.append(dic)
btc_data={
"outputList": outputList,
"voutValues": outputValue,
"vinValues": 0,
"outputValue": dic.get("value"),
"fee": dic.get("value"),
}
tx.update(btc_data)
return tx
except Exception as e:
logger.error(e)
continue
else:
# 判断vin中的地址是否在vout中出现,若出现在vout中,则删除掉vout中的这笔交易
try:
vout1=deepcopy(vout)
for vi in vin:
vinValues+=vi.get("value")
for vo in vout1:
if vi["address"] == vo.get("address"):
vout1.remove(vo)
for vo1 in vout1:
# 这里要判断vout的address的值得类型,大多数时候是[],但是612001块就出现了""空字符串
output_address=str(vo1.get("address")[0]) if isinstance(vo1.get("address"), list) else ""
output={
"address": output_address,
"value": vo1.get("value")
}
outputList.append(output)
if vo1.get("value"):
outputValue+=float(vo1.get("value"))
except Exception as e:
logger.error(tx)
continue
for vot in vout:
voutValues+=vot.get("value")
fee=float(vinValues - voutValues)
btc_data={
"outputList": outputList,
"voutValues": voutValues,
"vinValues": vinValues,
"outputValue": outputValue,
"fee": fee,
}
tx.update(btc_data)
return tx
# 读文件
async def read_in_line(file_path):
with open(file_path, "r") as f:
while True:
line=f.readline()
if line:
yield line
else:
return # 如果读取到文件末尾,则退出
# 存为json文件
async def save_json(tx, path):
# 先将字典对象转化为可写入文本的字符串
item=json.dumps(tx)
try:
if not os.path.exists(path):
with open(path, "w", encoding='utf-8') as f:
f.write(item + "\n")
else:
with open(path, "a", encoding='utf-8') as f:
# f.write(item + ",\n") # 这里不能有",",不然会导致mongo import的时候报错:
f.write(item + "\n")
except Exception as e:
print(e)
# 存入mongo库
def save_to_mongo(data):
# 保存到mongo数据库中
db_url='39.99.160.162'
db_port=27017
db_name="btc_tx_new"
db_collection="transaction"
# 建立连接
client=AsyncIOMotorClient(db_url, db_port)
# 连接某个库名字
db=client[db_name][db_collection]
# db.insert_many([i for i in data])
db.insert_one(data)
async def get_btc_data():
# async def get_btc_data(dirname):
# file_list=await file_path(dirname)
# for path in file_list: # todo 开多进程的时候,每个进程从redis中取一个path
process=os.getpid()
while True:
json_path=redis.rpop("json_path")
if json_path is None:
break
path=bytes.decode(json_path)
logger.info(f"process_{process} start handle | {path} ")
start=timeit.default_timer()
with open(path, 'r') as f:
save_list=[]
while True:
data=f.readline()
if not data:
break
tx=json.loads(data)
btc_data=await get_outputList(tx)
if platform.system() == 'Linux' or platform.system() == 'Darwin':
save_path=SAVE_PATH
name=path.split("/")[-1] # Linux下
else:
save_path=r"D:\Project\etc\Chain-Storage\src\core\test"
name=path.split("\\")[-1] # windows下
# 存为新json
new_path=f"{save_path}/new_{name}"
logger.info(f"save {btc_data}")
await save_json(btc_data, new_path)
# todo 直接存入mongo数据库 太慢了,每秒200条,共5亿条,则需要722小时
# save_to_mongo(btc_data)
elapsed=(timeit.default_timer() - start)
logger.info(f"process_{process} write {name} success! | used {elapsed} s")
def main():
loop=asyncio.get_event_loop()
loop.run_until_complete(get_btc_data())
loop.close()
if __name__ == '__main__':
if platform.system() == 'Linux' or platform.system() == 'Darwin':
dirname=r"/backup/btc/btc_new_tx11"
else:
dirname=r"D:\Project\etc\Chain-Storage\src\core\test"
# dirname=r"D:\Project\etc\Chain-Storage\src\core\test\test1"
file_path(dirname)
# 多进程
process_count=3
# pool=multiprocessing.Pool(process_count)
with multiprocessing.Pool(process_count) as pool:
for i in range(process_count):
# pool.apply_async(asyncio.get_event_loop().run_until_complete(Url().save_url_to_redis()), (i,)) # 多进程调用协程 ,将协程函数重复执行三次,
pool.apply_async(main) # 多进程调用普通函数
pool.close()
pool.join()
# 进程队列
# que.put("1")
# # que.put("255")
# que.put("1wqert")
# if platform.system() == 'Linux' or platform.system() == 'Darwin':
# with open(r"/backup/btc/btc_erro/erro_message.json", "w", encoding='utf-8') as f:
# while not que.empty():
# res = que.get()
# if res:
# f.write(res+"\n")
# else:
# break
# else:
# with open(r"err.json", "w", encoding='utf-8') as f:
# while not que.empty():
# res = que.get()
# if res:
# f.write(res+"\n")
# else:
# break
网友评论