美文网首页
py导出csv文件

py导出csv文件

作者: Lupino | 来源:发表于2021-09-29 22:23 被阅读0次

有两张报表,一张记录 item 一张记录 field 名字。
表结构如下

kgrs=# \d items
                                       Table "public.items"
     Column     |         Type          | Collation | Nullable |              Default
----------------+-----------------------+-----------+----------+-----------------------------------
 id             | integer               |           | not null | nextval('items_id_seq'::regclass)
 hash           | character varying(64) |           |          |
 product_number | integer               |           |          |
 section        | integer               |           |          |
 field          | integer               |           |          |
 value          | text                  |           |          |
Indexes:
    "items_pkey" PRIMARY KEY, btree (id)
    "items_hash" UNIQUE, btree (hash)
    "items_product_number_section" btree (product_number, section)

kgrs=# \d item_fields
                                    Table "public.item_fields"
 Column |          Type          | Collation | Nullable |                 Default
--------+------------------------+-----------+----------+-----------------------------------------
 id     | integer                |           | not null | nextval('item_fields_id_seq'::regclass)
 hash   | character varying(64)  |           |          |
 type   | character varying(20)  |           |          |
 field  | character varying(256) |           |          |
Indexes:
    "item_fields_pkey" PRIMARY KEY, btree (id)
    "item_fields_hash" UNIQUE, btree (hash)
    "item_fields_type" btree (type)

kgrs=#

我们需要通过人工来对数据进行分析,所以要整理成能够阅读的样子,csv 是很好的格式,我们导出成csv 后用 excel 打开。

导出处理代码如下:

from asyncio_pool import AioPool
from kgrs.store import db
from kgrs.store import pg_utils
from kgrs.store.pg_utils import cs
import logging
import csv

use_db = True

logger = logging.getLogger(__name__)


async def get_cates():
    return await pg_utils.select(db.get_pg_pool(), db.item_fields,
                                 cs(['id', 'field']),
                                 "type = %s", ('field', ))


async def merge_stage1(pn, cates, writer):
    catids = list(cates.values())
    items = await db.get_list(db.items,
                              pn['id'],
                              field='product_number',
                              size=1000)

    product = {}

    for item in items:
        if item['field'] in catids:
            product['f%s' % item['field']] = item['value']

    out = {}
    for k, i in cates.items():
        out[k] = product.get('f%s' % i, '')

    if out['category_l1'] == 'Capacitors':
        writer.writerow(out)


async def main():
    cates = await get_cates()
    cates = dict([(cate['field'], cate['id']) for cate in cates if not cate['field'].isupper()])
    offset = 0
    size = 100
    catestr = []

    csvfile = open('kgrs.csv', 'w', newline='')
    fieldnames = list(cates.keys())

    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()

    async with AioPool(size=10) as pool:
        while True:
            pns = await db.get_list(db.item_fields,
                                    'product_number',
                                    field='type',
                                    offset=offset,
                                    size=size)

            logger.info('Process export items of %s on [%s, %s]',
                        len(pns), offset, size)
            if len(pns) == 0:
                break

            for pn in pns:
                await pool.spawn(merge_stage1(pn, cates, writer))

            offset += size

    csvfile.close()

我们使用了 dict 的模式导出,这样子可以防止出错,item 的数据并不是所有字段都有,如此导出,可保证csv 上每个栏目都能对得上。

为了使得导出处理速度更快,我们使用了 AioPool 来进行并行处理。

最终导出我们所要的csv文件。

导出记录通过 wc -l 来查看, 共有 52356 条记录。

相关文章

网友评论

      本文标题:py导出csv文件

      本文链接:https://www.haomeiwen.com/subject/ovlsnltx.html