美文网首页
pyspider流程

pyspider流程

作者: comboo | 来源:发表于2017-12-05 20:45 被阅读1061次

    pyspider 执行流程

    process组件,result组件, fetcher组件

    都差不多, 都是从队列读取task, 执行.

    scheduler组件和webui组件

    scheduler负责调度task给fetcher队列(shedulerfetcher2)
    webui负责和scheduler进行通讯,前端交互产生task, 交给scheduler调度.

    scheduler流程

    主要是run_once的代码, 总结来说

    • _update_projects:更新projcet, 从数据库拿task放入scheduler的project.task_queue
    • _check_task_done: 检查status_queue(和process模块有关), 放入project.task_queue
    • _check_request: 检查_postpone_request和newtask_queue, 放入project.task_queue
    • _check_select: 从project.task_queue拿出来task放入fetcher队列
    def run_once(self):
        '''comsume queues and feed tasks to fetcher, once'''
        self._update_projects()
        self._check_task_done()
        self._check_request()
        while self._check_cronjob():
            pass
        self._check_select()
        self._check_delete()
        self._try_dump_cnt()
    
    _update_projects()

    更新project信息, 实例化project并且存入scheduler的self.projects.

    for project in self.projectdb.check_update(self._last_update_project):
               self._update_project(project)
    
    _update_project()

    发送一个taskid为_on_get_info给fetcher队列(用来更新project), 从数据库读取task,插入到self.project.task_queue

    if project._send_on_get_info:
              # update project runtime info from processor by sending a _on_get_info
              # request, result is in status_page.track.save
              project._send_on_get_info = False
    
              self.on_select_task({
                  'taskid': '_on_get_info',
                  'project': project.name,
                  'url': 'data:,_on_get_info',
                  'status': self.taskdb.SUCCESS,
                  'fetch': {
                      'save': self.get_info_attributes,
                  },
                  'process': {
                      'callback': '_on_get_info',
                  },
              })
    
    if project.active:
         if not project.task_loaded:
         # _load_tasks 就是从数据库取task
              self._load_tasks(project)
              project.task_loaded = True
    

    另外, webui更新project的交互的实现就是通过rpc触发修改_force_update_project的function.

    def update_project():
        self._force_update_project = True
    application.register_function(update_project, 'update_project')
    
    _check_task_done()

    检查status_queue. 叫_check_task_done的原因可能是因为这个队列的task是通过process模块产生, 检查是否正确.

    while True:          
    if task.get('taskid') == '_on_get_info' and 'project' in task and 'track' in task:
        if task['project'] not in self.projects:
            continue
        project = self.projects[task['project']]
        project.on_get_info(task['track'].get('save') or {})
        logger.info(
            '%s on_get_info %r', task['project'], task['track'].get('save', {})
        )
            continue
        # 检测task是否满足需求
        elif not self.task_verify(task):
            continue
        # 如果是新的task,
        self.on_task_status(task)
    
    on_task_status

    on_task_status调用on_task_done和on_task_failed, 并且把task插入active_tasks

    if procesok:
        ret = self.on_task_done(task)
    else:
        ret = self.on_task_failed(task)
     self.projects[task['project']].active_tasks.appendleft((time.time(), task))
    

    on_task_done把task放入self.project.status_queue, 并且更新数据库
    on_task_failed判断是next_exetime, 如果小于0插入数据库task的status为fail, 否则插入插入数据
    库放入self.project.task_queue

    _check_request

    从_postpone_request 和 newtask_queue 拿到task执行 on_request, _postpone_request这个队列用来存储正在processing状态的task,
    可能是说, 在执行但是产生修改的task

    for task in self._postpone_request:
      if self.projects[task['project']].task_queue.is_processing(task['taskid']):
             todo.append(task)
         else:
             # 对于老的
             self.on_request(task)
    self._postpone_request = todo
    while len(tasks) < self.LOOP_LIMIT:
               try:
                   task = self.newtask_queue.get_nowait()
               except Queue.Empty:
                   break
    for task in itervalues(tasks):
        self.on_request(task)
    

    on_request从数据库读取oldtask,如果存在执行on_old_request, 如果不存在执行on_new_request

    on_old_request

    判断老的task是否需要重新爬去或者取消, 更新数据库, 插入self.project.task_queue

    on_new_request

    插入task到数据库, 插入task到self.project.task_queue

    _check_cronjob

    插入一个taskid为_on_cronjob的task给fetcher的队列,插入task到self.project.active_tasks

    def _check_cronjob(self):
        """Check projects cronjob tick, return True when a new tick is sended"""
        now = time.time()
        self._last_tick = int(self._last_tick)
        if now - self._last_tick < 1:
            return False
        self._last_tick += 1
        for project in itervalues(self.projects):
            if not project.active:
                continue
            if project.waiting_get_info:
                continue
            if int(project.min_tick) == 0:
                continue
            if self._last_tick % int(project.min_tick) != 0:
                continue
            self.on_select_task({
                'taskid': '_on_cronjob',
                'project': project.name,
                'url': 'data:,_on_cronjob',
                'status': self.taskdb.SUCCESS,
                'fetch': {
                    'save': {
                        'tick': self._last_tick,
                    },
                },
                'process': {
                    'callback': '_on_cronjob',
                },
            })
        return True
    

    _check_select

    从self.project.task_queue拿出task, 插入fetcher队列

    for project, taskid in taskids:
        self._load_put_task(project, taskid)    
    

    剩下两个用来删除project和监控队列数量

    相关文章

      网友评论

          本文标题:pyspider流程

          本文链接:https://www.haomeiwen.com/subject/znchixtx.html