美文网首页
开源爬虫框架crawler4j源码学习

开源爬虫框架crawler4j源码学习

作者: 浪里_个郎 | 来源:发表于2020-04-03 17:52 被阅读0次

    带着问题看源码:
    1,crawler4j使用的HTTP请求工具是什么
    2,crawler4j如何实现设置爬虫种子页面后,不断自动深入爬取
    3,crawler4j如何实现中断后可恢复爬取
    4,多线程爬虫的实现

    1,crawler4j使用的HTTP请求工具是什么

    使用的是Httpclient:

        compile group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.7'
    

    2,如何实现设置爬虫种子页面后,不断自动深入爬取

    首先,设置种子页面:

    CrawlController controller = new CrawlController(config, pageFetcher, robotstxtServer);
    controller.addSeed("http://r.cnki.net/kns/brief/result.aspx?dbprefix=gwkt");
    

    种子页面url被封装到WebURL类,然后传给负责管理待爬取页面的Frontier类,Frontier会将种子页面存入数据库:

        //CrawlController将种子页面交给Frontier
        public void schedule(WebURL url) {
            int maxPagesToFetch = config.getMaxPagesToFetch();
            synchronized (mutex) {
                try {
                    if (maxPagesToFetch < 0 || scheduledPages < maxPagesToFetch) {
                        workQueues.put(url);
                        scheduledPages++;
                        counters.increment(Counters.ReservedCounterNames.SCHEDULED_PAGES);
                    }
                } catch (DatabaseException e) {
                    logger.error("Error while putting the url in the work queue", e);
                }
            }
        }
    
        //WorkQueues的put函数
        public void put(WebURL url) {
            DatabaseEntry value = new DatabaseEntry();
            webURLBinding.objectToEntry(url, value);
            Transaction txn = beginTransaction();
            urlsDB.put(txn, getDatabaseEntryKey(url), value);
            commit(txn);
        }
    

    我们使用crawler4j需要自定义一个继承自WebCrawler的类,而WebCrawler实现了Runnable:

    public class WebCrawler implements Runnable {
    

    使用crawler4j,在代码中配置完各个参数后,需要执行:

    controller.start(MyCrawler.class, numberOfCrawlers);
    

    之后,CrawlController会让我们实现的类在新线程中跑起来,而父类WebCrawler中的run方法中,Frontier会不断从数据库中取出需要解析的url,并在processPage方法中进行解析和处理:

        @Override
        public void run() {
            onStart();
            while (true) {
                List<WebURL> assignedURLs = new ArrayList<>(50);
                isWaitingForNewURLs = true;
                //从数据库中取出url
                frontier.getNextURLs(50, assignedURLs);
                isWaitingForNewURLs = false;
                if (assignedURLs.isEmpty()) {
                    if (frontier.isFinished()) {
                        return;
                    }
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        logger.error("Error occurred", e);
                    }
                } else {
                    for (WebURL curURL : assignedURLs) {
                        if (myController.isShuttingDown()) {
                            logger.info("Exiting because of controller shutdown.");
                            return;
                        }
                        if (curURL != null) {
                            curURL = handleUrlBeforeProcess(curURL);
                            //解析和处理url
                            processPage(curURL);
                            frontier.setProcessed(curURL);
                        }
                    }
                }
            }
        }
    

    processPage方法解析页面时,获取页面中所有可点击的链接,该链接如果通过了可覆盖的shouldVisit方法检验,即属于需要爬取的链接,则将其放入数据库:

    //调用Parser类解析url的HTML,会解析出页面中所有的链接
    parser.parse(page, curURL.getURL());
    //判断是否爬取页面中的链接,默认直接返回true
    if (shouldFollowLinksIn(page.getWebURL())) {
        ParseData parseData = page.getParseData();
        List<WebURL> toSchedule = new ArrayList<>();
        int maxCrawlDepth = myController.getConfig().getMaxDepthOfCrawling();
        //获取页面中所有的链接,遍历进行处理
        for (WebURL webURL : parseData.getOutgoingUrls()) {
            webURL.setParentDocid(curURL.getDocid());
            webURL.setParentUrl(curURL.getURL());
            int newdocid = docIdServer.getDocId(webURL.getURL());
            if (newdocid > 0) {
                // This is not the first time that this Url is visited. So, we set the
                // depth to a negative number.
                webURL.setDepth((short) -1);
                webURL.setDocid(newdocid);
            } else {
                webURL.setDocid(-1);
                webURL.setDepth((short) (curURL.getDepth() + 1));
                if ((maxCrawlDepth == -1) || (curURL.getDepth() < maxCrawlDepth)) {
                    //判断链接是否符合爬取条件
                    if (shouldVisit(page, webURL)) {
                        if (robotstxtServer.allows(webURL)) {
                            webURL.setDocid(docIdServer.getNewDocID(webURL.getURL()));
                            toSchedule.add(webURL);
                        } else {
                            logger.debug(
                                "Not visiting: {} as per the server's \"robots.txt\" " +
                                "policy", webURL.getURL());
                        }
                    } else {
                        logger.debug(
                            "Not visiting: {} as per your \"shouldVisit\" policy",
                            webURL.getURL());
                    }
                }
            }
        }
        //将页面中的链接放入待爬取数据库
        frontier.scheduleAll(toSchedule);
    

    parseData.setOutgoingUrls方法中,使用了开源URL检测器项目https://github.com/linkedin/URL-Detector/,具体使用代码如下:

                //input就是HTML字符串
                UrlDetector detector = new UrlDetector(input, getOptions());
                //获得html中所有的链接
                List<Url> urls = detector.detect();
    

    3,crawler4j如何实现中断后可恢复爬取

    Frontier中定义了专门用于存放当前正在爬取的一批链接的数据库InProcessPagesDB。
    每次这一批链接准备进行爬取,就放入InProcessPagesDB:

        public void getNextURLs(int max, List<WebURL> result) {
            while (true) {
                synchronized (mutex) {
                    if (isFinished) {
                        return;
                    }
                    try {
                        List<WebURL> curResults = workQueues.get(max);
                        workQueues.delete(curResults.size());
                        if (inProcessPages != null) {
                            for (WebURL curPage : curResults) {
                                inProcessPages.put(curPage);
                            }
                        }
                        result.addAll(curResults);
    

    当这一批链接全部爬取结束,就将InProcessPagesDB中的数据清除:

        public void setProcessed(WebURL webURL) {
            counters.increment(Counters.ReservedCounterNames.PROCESSED_PAGES);
            if (inProcessPages != null) {
                if (!inProcessPages.removeURL(webURL)) {
                    logger.warn("Could not remove: {} from list of processed pages.", webURL.getURL());
                }
            }
        }
    

    当一次爬取中断后再次执行,Frontier的构造函数中就会从InProcessPagesDB中取上次没爬完的链接继续爬取:

        public Frontier(Environment env, CrawlConfig config) {
            this.config = config;
            this.counters = new Counters(env, config);
            try {
                workQueues = new WorkQueues(env, DATABASE_NAME, config.isResumableCrawling());
                //判断是否设置了中断继续
                if (config.isResumableCrawling()) {
                    scheduledPages = counters.getValue(Counters.ReservedCounterNames.SCHEDULED_PAGES);
                    //存储上一次未爬取页面信息的数据库
                    inProcessPages = new InProcessPagesDB(env);
                    long numPreviouslyInProcessPages = inProcessPages.getLength();
                    if (numPreviouslyInProcessPages > 0) {
                        logger.info("Rescheduling {} URLs from previous crawl.",
                                    numPreviouslyInProcessPages);
                        scheduledPages -= numPreviouslyInProcessPages;
                        //从数据库中获取所有未爬取的url
                        List<WebURL> urls = inProcessPages.get(IN_PROCESS_RESCHEDULE_BATCH_SIZE);
                        while (!urls.isEmpty()) {
                            //将urls放入这次待爬取的数据库
                            scheduleAll(urls);
                            inProcessPages.delete(urls.size());
                            urls = inProcessPages.get(IN_PROCESS_RESCHEDULE_BATCH_SIZE);
                        }
                    }
    

    不过,如果在一批链接爬取结束后InProcessPagesDB刚把数据清除的时候爬取被中断,另一批链接此时还放入InProcessPagesDB,那么就无法从中断处继续爬取了。有兴趣的小伙伴可以确认一下是否如我所说。

    4,多线程爬虫的实现

    根据设置的线程数,CrawlController会启对应个线程:

                for (int i = 1; i <= numberOfCrawlers; i++) {
                    T crawler = crawlerFactory.newInstance();
                    Thread thread = new Thread(crawler, "Crawler " + i);
                    crawler.setThread(thread);
                    crawler.init(i, this);
                    thread.start();
                    crawlers.add(crawler);
                    threads.add(thread);
                    logger.info("Crawler {} started", i);
                }
    

    那么,Crawler4j为多线程同步做了哪些工作呢?答案是...几乎没有。因为没有必要,唯一需要并发处理的数据是需要爬取的页面url,但这部分数据的多线程同步已经交由数据库来做了,所以,各自线程爬属于自己的页面就可以了,互不冲突。

    相关文章

      网友评论

          本文标题:开源爬虫框架crawler4j源码学习

          本文链接:https://www.haomeiwen.com/subject/rrfkphtx.html