美文网首页
爬虫中 阻塞队列和 线程池 造成的类死锁问题

爬虫中 阻塞队列和 线程池 造成的类死锁问题

作者: shu2man | 来源:发表于2018-11-18 15:15 被阅读0次

    很早就想尝试一下爬虫,相关的博文已经很多,这里记下几个困扰了我挺久的问题。
    既然说的是死锁,我们来复习一下死锁的四个条件:

    • 循环等待
    • 占有且请求(请求与持有)
    • 互斥(资源有限,每次只能被一个或一类线程使用)
    • 不可抢占(不可剥夺,无优先级)
      四个条件中不可被破坏的是互斥条件,即多进程同时访问会有数据的不一致性。

    言归正传,首先在我的实现中自定义了线程池

        public ThreadPoolExecutor getFixedThreadPool(int corePoolSize,int maxPoolSize,int waitingQueuesize) {
            return new ThreadPoolExecutor(corePoolSize, maxPoolSize, 60L, TimeUnit.SECONDS, 
                    new ArrayBlockingQueue<Runnable>(waitingQueuesize),new RejectedExecutionHandler() {
                        @Override
                        public void rejectedExecution(Runnable run, ThreadPoolExecutor executor) {
                            if(!executor.isTerminated())
                                try {
                                    executor.getQueue().put(run);
                                } catch (Exception e) {
                                    LOG.error("Inner Exception in workQueue, putting task  error",e);
                                }
                        }
                    });
            
            //return Executors.newFixedThreadPool(2*nThread);
        }
    

    这是一个类newFixedThreadPool的操作,自定义了等待队列的大小,同时队列满时阻塞了入队操作,避免Int.maxValue造成的溢出或是过多的任务堆积,兼具fixedThreadPool和cachedThreadPool的优点,可这里阻塞的拒绝策略让我在后续的实现中饱受折磨...

    然后为了避免任务都被阻塞在线程池,我又额外开了一个阻塞队列存储爬出来的待爬URL,然后用一个监控线程监控这个队列,爬虫任务相当于生产者生产待爬URL,监控线程相当于消费者消费产生的URL,而这个队列就是仓库了,完美的设计。。。

    public class WatchTask implements Runnable{
            @Override
            public void run() {
                while(isCrawl) {
                    try{
                        String url=urlQueue.poll(1000, TimeUnit.MILLISECONDS);
                        while(StringUtils.isNotBlank(url) && isCrawled(url)) url=urlQueue.poll(100, TimeUnit.MILLISECONDS);
                        if(StringUtils.isNotBlank(url)) executor.execute(new WorkTask(url));
                    }catch(Exception e) {
                        LOG.error("Taking url from blocking queue error, urlQueue size:"+urlQueue.size(),e);
                    }
                    lastCur=System.currentTimeMillis();
                    LOG.info("WatchTask running, urlQueue:"+urlQueue.size());
                }
            }
            
        }
        
        public class WorkTask implements Runnable{
            private String seedUrl=null;
            
            public WorkTask(String seedUrl) {
                this.seedUrl=seedUrl;
            }
            
            @Override
            public void run() {
                List<String> urls;
                try {
                    urls=crawler.doCrawl(seedUrl);
                    if(urls==null || urls.size()==0) return;
                    for(String url:urls) {
                        if(StringUtils.isNotBlank(url) && !isCrawled(url)) {
                            urlQueue.put(url);
                        }
                    }
                }catch(Exception e) {
                    LOG.error("Puting url to blocking queue error, size:"+urlQueue.size(),e);
                }
            }
        }
    

    在程序中URL队列的大小要远大于线程池等待队列,明眼的朋友到这里应该看出我的操作问题在哪里了:


    死锁示意图

    于是,将额外的阻塞队列和监控任务去掉,工作线程改成这样,颇有种自给自足的感觉:

        
        public class WorkTask implements Runnable{
            private String seedUrl=null;
            
            public WorkTask(String seedUrl) {
                this.seedUrl=seedUrl;
            }
            
            @Override
            public void run() {
                List<String> urls;
                try {
                    urls=crawler.doCrawl(seedUrl);
                    if(urls==null || urls.size()==0) return;
                    for(String url:urls) {
                        if(StringUtils.isNotBlank(url) && !isCrawled(url)) {
                            executor.execute(new WorkTask(url));
                        }
                    }
                }catch(Exception e) {
                    LOG.error("Puting url to blocking queue error, size:"+urlQueue.size(),e);
                }
            }
        }
    

    然而,实际运行中发现能爬取的数据条数在线程池最大线程数左右,往后程序就像挂掉一样虽然在跑但什么输出都没有,肯定又是阻塞了!
    经过一番分析,发现问题回到了线程池本身的等待队列,圆圈代表线程池,黑点表示线程非空闲:


    线程池死锁示意图

    就这样,又一个死锁创造出来了,其原因归根到底还是一个种子url能爬取出来的子URL太多了——几百甚至几千上万个(没错我在爬某网用户信息,子url是用户的粉丝或其关注的人,因为一些需求不能进行部分舍弃),既然如此那就把等待队列设至大一点,对子url太多的,全部舍弃,至于何为多大家自有判断,我用子url数和等待队列大小关系来决定,当等待队列中url数量超过等待队列容量的一半,或子url数量超过队列数量一半退出:

        public class WorkTask implements Runnable{
            private String seedUrl=null;
            
            public WorkTask(String seedUrl) {
                this.seedUrl=seedUrl;
            }
            
            @Override
            public void run() {
                List<String> urls;
                try {
                    urls=crawler.doCrawl(seedUrl);
                    int size=tpe.getQueue().size();
                    //无子url,或队列中任务数量超过容量一半,或url数量超过队列数量一半,避免崩掉故退出
                    if(urls==null || urls.size()==0 || size>halfQueueSize || urls.size()>halfQueueSize) return;
                    System.out.println("****************add to queue with size"+urls.size());
                    for(String url:urls) {
                        if(StringUtils.isNotBlank(url) && !isCrawled(url)) {
                            executor.execute(new WorkTask(url));
                        }
                    }
                }catch(Exception e) {
                    LOG.error("Puting url to blocking queue error, size:"+urlQueue.size(),e);
                }
            }
        }
    

    至此,终于把死锁的问题解决了,但是爬虫跑了一会ip就被封了,下一步是使用代理。

    本文为本人解决实际问题的记录,有任何高见欢迎留言。

    相关文章

      网友评论

          本文标题:爬虫中 阻塞队列和 线程池 造成的类死锁问题

          本文链接:https://www.haomeiwen.com/subject/ufwefqtx.html