有两张报表,一张记录 item 一张记录 field 名字。
表结构如下
kgrs=# \d items
Table "public.items"
Column | Type | Collation | Nullable | Default
----------------+-----------------------+-----------+----------+-----------------------------------
id | integer | | not null | nextval('items_id_seq'::regclass)
hash | character varying(64) | | |
product_number | integer | | |
section | integer | | |
field | integer | | |
value | text | | |
Indexes:
"items_pkey" PRIMARY KEY, btree (id)
"items_hash" UNIQUE, btree (hash)
"items_product_number_section" btree (product_number, section)
kgrs=# \d item_fields
Table "public.item_fields"
Column | Type | Collation | Nullable | Default
--------+------------------------+-----------+----------+-----------------------------------------
id | integer | | not null | nextval('item_fields_id_seq'::regclass)
hash | character varying(64) | | |
type | character varying(20) | | |
field | character varying(256) | | |
Indexes:
"item_fields_pkey" PRIMARY KEY, btree (id)
"item_fields_hash" UNIQUE, btree (hash)
"item_fields_type" btree (type)
kgrs=#
我们需要通过人工来对数据进行分析,所以要整理成能够阅读的样子,csv 是很好的格式,我们导出成csv 后用 excel 打开。
导出处理代码如下:
from asyncio_pool import AioPool
from kgrs.store import db
from kgrs.store import pg_utils
from kgrs.store.pg_utils import cs
import logging
import csv
use_db = True
logger = logging.getLogger(__name__)
async def get_cates():
return await pg_utils.select(db.get_pg_pool(), db.item_fields,
cs(['id', 'field']),
"type = %s", ('field', ))
async def merge_stage1(pn, cates, writer):
catids = list(cates.values())
items = await db.get_list(db.items,
pn['id'],
field='product_number',
size=1000)
product = {}
for item in items:
if item['field'] in catids:
product['f%s' % item['field']] = item['value']
out = {}
for k, i in cates.items():
out[k] = product.get('f%s' % i, '')
if out['category_l1'] == 'Capacitors':
writer.writerow(out)
async def main():
cates = await get_cates()
cates = dict([(cate['field'], cate['id']) for cate in cates if not cate['field'].isupper()])
offset = 0
size = 100
catestr = []
csvfile = open('kgrs.csv', 'w', newline='')
fieldnames = list(cates.keys())
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
async with AioPool(size=10) as pool:
while True:
pns = await db.get_list(db.item_fields,
'product_number',
field='type',
offset=offset,
size=size)
logger.info('Process export items of %s on [%s, %s]',
len(pns), offset, size)
if len(pns) == 0:
break
for pn in pns:
await pool.spawn(merge_stage1(pn, cates, writer))
offset += size
csvfile.close()
我们使用了 dict 的模式导出,这样子可以防止出错,item 的数据并不是所有字段都有,如此导出,可保证csv 上每个栏目都能对得上。
为了使得导出处理速度更快,我们使用了 AioPool 来进行并行处理。
最终导出我们所要的csv文件。
导出记录通过 wc -l
来查看, 共有 52356 条记录。
网友评论