美文网首页
pyspider源码-scheduler

pyspider源码-scheduler

作者: comboo | 来源:发表于2017-10-12 00:40 被阅读283次

    思路

     def run(self):
            '''Start scheduler loop'''
            logger.info("scheduler starting...")
    
            while not self._quit:
                try:
                    time.sleep(self.LOOP_INTERVAL)
                    self.run_once()
                    self._exceptions = 0
                except KeyboardInterrupt:
                    break
                except Exception as e:
                    logger.exception(e)
                    self._exceptions += 1
                    if self._exceptions > self.EXCEPTION_LIMIT:
                        break
                    continue
    
            logger.info("scheduler exiting...")
            self._dump_cnt()
    

    通过run发现,scheduler模块从队列拿到信息执行run_once这个方法。而run_once执行了下面的函数

            self._update_projects()
            self._check_task_done()
            self._check_request()
            while self._check_cronjob():
                pass
            self._check_select()
            self._check_delete()
            self._try_dump_cnt()
    

    一个个看这些函数。

    _update_projects

    def _update_projects(self):
            '''Check project update'''
            now = time.time()
            if (
                    not self._force_update_project
                    and self._last_update_project + self.UPDATE_PROJECT_INTERVAL > now
            ):
                return
            for project in self.projectdb.check_update(self._last_update_project):
                self._update_project(project)
                logger.debug("project: %s updated.", project['name'])
            self._force_update_project = False
            self._last_update_project = now
    

    _update_projects 首先会进行一个检查现在和更新的时间差,如果满足条件调用_update_project 更新project状态。

    _update_project

    pyspider很喜欢具体的模块具体来实现,并且名字通过下划线来分别。_update_project就是具体的更新方法。

    _check_task_done

        def _check_task_done(self):
            '''Check status queue'''
            cnt = 0
            try:
                while True:
                    task = self.status_queue.get_nowait()
                    # check _on_get_info result here
                    if task.get('taskid') == '_on_get_info' and 'project' in task and 'track' in task:
                        if task['project'] not in self.projects:
                            continue
                        project = self.projects[task['project']]
                        project.on_get_info(task['track'].get('save') or {})
                        logger.info(
                            '%s on_get_info %r', task['project'], task['track'].get('save', {})
                        )
                        continue
                    elif not self.task_verify(task):
                        continue
                    self.on_task_status(task)
                    cnt += 1
            except Queue.Empty:
                pass
            return cnt
    

    从status_queue拿到task,这里的task长这个样子

    {'taskid': '_on_get_info', 'project': 'baidu', 'url': 'data:,_on_get_info', 'track': {'process': {'time': 0.022366762161254883, 'ok': True, 'exception': None, 'result': None, 'follows': 0, 'logs': ''}, 'fetch': {'error': None, 'redirect_url': None, 'ok': True, 'time': 0, 'encoding': None, 'status_code': 200, 'headers': {}, 'content': None}, 'save': {'retry_delay': {}, 'min_tick': 86400, 'crawl_config': {}}}}
    

    参数说明

    • project,taskid,url 是字面意思。注意scheduler里面有一个self.projects进行 区别
    • track里面的save,fetch, process。save是写爬虫存的参数,fetch应该用来给fetcher模块process用来个process模块。
      之后从self.projects[task['project']]拿到project的信息
    • on_get_info() 从info里面拿到信息
    • on_task_status 判断task的process是否成功

    _check_request

        def _check_request(self):
            '''Check new task queue'''
            # check _postpone_request first
            todo = []
            for task in self._postpone_request:
                if task['project'] not in self.projects:
                    continue
                if self.projects[task['project']].task_queue.is_processing(task['taskid']):
                    todo.append(task)
                else:
                    self.on_request(task)
            self._postpone_request = todo
    
            tasks = {}
            while len(tasks) < self.LOOP_LIMIT:
                try:
                    task = self.newtask_queue.get_nowait()
                except Queue.Empty:
                    break
    
                if isinstance(task, list):
                    _tasks = task
                else:
                    _tasks = (task, )
    
                for task in _tasks:
                    if not self.task_verify(task):
                        continue
    
                    if task['taskid'] in self.projects[task['project']].task_queue:
                        if not task.get('schedule', {}).get('force_update', False):
                            logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task)
                            continue
    
                    if task['taskid'] in tasks:
                        if not task.get('schedule', {}).get('force_update', False):
                            continue
    
                    tasks[task['taskid']] = task
    
            for task in itervalues(tasks):
                self.on_request(task)
    
            return len(tasks)
    

    从头检查到尾,看这个task是否符合要求。
    如果符合要求,加入tasks,并且最后运行

     for task in itervalues(tasks):
                self.on_request(task)
    
    on_request
        def on_request(self, task):
            if self.INQUEUE_LIMIT and len(self.projects[task['project']].task_queue) >= self.INQUEUE_LIMIT:
                logger.debug('overflow task %(project)s:%(taskid)s %(url)s', task)
                return
    
            oldtask = self.taskdb.get_task(task['project'], task['taskid'],
                                           fields=self.merge_task_fields)
            if oldtask:
                return self.on_old_request(task, oldtask)
            else:
                return self.on_new_request(task)
    

    把task分成old和new分别执行

    on_old_request

    相关文章

      网友评论

          本文标题:pyspider源码-scheduler

          本文链接:https://www.haomeiwen.com/subject/qvkwyxtx.html