美文网首页
pyspider源码-run.py

pyspider源码-run.py

作者: comboo | 来源:发表于2017-04-05 15:55 被阅读174次

    终于到了看,run.py的时候了.

    这篇文章的目的仅仅是初步的了解pyspider,具体作者的写法,为什么这样写,都不在讨论的范围之内.

    我想如果可以的话,以后我也要跟着作者的思路,重新写一遍pyspider.

    read_config(ctx, param, value)

    里面有一个underline_dict(d)方法.这个方法返回一个词典,词典key的'_'用'-'代替

    return dict((k.replace('-', '_'), underline_dict(v)) for k, v in six.iteritems(d))
    

    click会调用read_config,用来解析参数.其实这里就是读取了配置文件.

    ctx是一个对象,<click.core.Context object at 0x7f8dc6a5be50>,应该是只要通过click就会自动生成这样的对象,具体的我也不了解.

    connect_db(ctx, param, value)

    读取出来数据库连接.进行连接.这里作者的处理是直接在click接受参数的时候,直接创建连接

    具体代码暂时不分析.在libs文件夹下面

    load_cls(ctx, param, value)

    这个函数之前说过.

    从另一个包中导入对象的属性.这里的对象是广义的对象,比如,包算一种对象,包里的方法,可以看作属性.

    这个方法主要用在启动各个组件的时候.比如webui的flask对象,就是通过这个方法导入

    connect_rpc

    连接到rpc服务.

    fetcher和scheduler部分都有生成连接的代码.具体功能应该要到实现组件的代码里面看

    cli(ctx, **kwargs)

    这是run的第一个函数,也是入口函数.
    因为run的main函数为

    def main():
        cli()
    
    if __name__ == '__main__':
        main()
    

    ctx是click上下文对象
    **kwargs是click通过命令行接受的参数
    这里主要建立以下连接

    • taskdb
    • projectdb
    • resultdb
    • data_path
    • newtask_queue
    • status_queue
    • scheduler2fetcher
    • fetcher2processor
    • processor2result

    并且这些连接,全部存在kwargs之中,之后在把kwargs更新到ctx

      ctx.obj.update(kwargs)  #这里不确定click方法,其实查找click的api,也没有找到ctx对应的对象
    

    之后执行

    ctx.invoke(all)
    

    开始all方法.

    all(ctx,fetcher_num,processor_num,result_worker_num, run_in)

    all的代码很短.特别容易看.

    g = ctx.obj
    

    第一步把之前创建的连接和对象赋值给g

    之后讲各个组建通过线程或者进程的方式启动起来.

    #抽出一段fetcher的代码,其他组件差不多
      fetcher_config = g.config.get('fetcher', {})
            fetcher_config.setdefault('xmlrpc_host', '127.0.0.1')
            for i in range(fetcher_num):
                threads.append(run_in(ctx.invoke, fetcher, **fetcher_config))
    

    最后,当所有的组件关闭的时候一次性关闭.

    finally:
            # exit components run in threading
            for each in g.instances:
                each.quit()
    
            # exit components run in subprocess
            for each in threads:
                if not each.is_alive():
                    continue
                if hasattr(each, 'terminate'):
                    each.terminate()
                each.join()
    

    然后就是各个组建部分的启动了.

    def scheduler(ctx,...)

    参数太长了...
    赋值给g.实例化scheduler对象.启动调动器
    '''
    if xmlrpc:
    utils.run_in_thread(scheduler.xmlrpc_run, port=xmlrpc_port, bind=xmlrpc_host)
    '''
    这里的一段代码现在还不知道怎么回事

    def fetcher(ctx, ...)

    def phantomjs(ctx,...)

    def bench(ctx,...)

    def one(ctx,...)

    def send_message(ctx, ...)

    都差不多

    相关文章

      网友评论

          本文标题:pyspider源码-run.py

          本文链接:https://www.haomeiwen.com/subject/uxmtattx.html