美文网首页
爬虫究竟是怎样开始的

爬虫究竟是怎样开始的

作者: 老钊 | 来源:发表于2018-06-22 16:08 被阅读0次

    爬虫究竟是怎样开始的?

    爬虫究竟是怎样开始的?这个问题是一个很难的哲学问题。如果看官对技术术语更敏感,那么可以表述为,爬虫,严格说爬虫的scheduler,如果需要保持一个status,到底是poll还是push?
    本文将讨论并解决这个问题。(注:不说人话)

    从“一次性”爬虫开始说起

    image.png

    上图为最简单的一次爬虫过程。究其本质,这个过程的核心是一个"种子页面"->"目标页面"->"目标页面链接"->"目标页面"的循环。也就是说,给定若干种子页面,和一些目标页面链接的匹配规则Rule,一定可以通过不断循环,把这个网站Site上所有被覆盖到的、符合匹配规则的页面集合P={page ∈ Site|Rule}获取到。这个P是一个有穷集合。P获取到了,爬虫结束。

    讨论到这,大概勉强达到大学计算机专业本科期末作业的水平。但是想要及格,或许还应该考虑这么几个方面:

    • 数据库:这个话题能截好多图:)
    • 多线程:P的数量如果很大,我需要把“下载网页、保存数据库”这样的动作放进多线程里跑跑。
    • 前端队列:我需要一个前端队列!“生产者-消费者”模式,呵呵。
    • 历史队列:是的,我还需要一个历史队列!不然爬重复了怎么办?
    • Javascript:淘宝的网页怎么爬?
    • 反爬:这是一个long story,甚至比人类的历史还要久远。

    Done。至此我们已经实现了一个异常牛逼的爬虫程序。如果想把“程序”变成“系统”,则需要考虑更多工程上的东西:如何分布式。

    稳定性迁移

    分布式的终极目的,是要把单机程序(进程)里任何不稳定因素,通过迁移到外部更稳定的程序(进程)的方式,达到系统全局更稳定的目的。另外一方面,通过这几年被“微服务”概念不断地洗脑,工程人员最喜欢用的一个词是“解耦合”。

    更进一步,如果我们希望我们的爬虫是一个“永远在线”的服务(引擎),“爬某一个网站的网页”这样的事情被当成某一个任务(task),随时启动、暂停、停止,这不得不让我们重新在架构上考虑得更多。

    document-service

    之前讨论的几个方面中,最应该第一个抽取出来的服务就是存储服务。无论选择使用MongoDB还是ES来保存网页,对外屏蔽底层的存储方案无疑是个优雅的idea。对外透出save()和saveBatch()操作。

    queue-service

    抽取出“前端队列”和“历史队列”作为队列服务。如果你将Redis作为选型方案,那么“前端队列”是一个List,支持类似fetch(30)这样的操作;“历史队列”则是一个Set,支持hasVisit()和visit(link)这样的操作。

    downloader-service

    作为高阶玩家,抽离出网页下载服务也是有必要的。URL进,Document出。downloader对外屏蔽了诸如“异常重试”、“中文编码自适应”、“代理IP池维护和切换”、“Headless浏览器渲染”、“http连接池复用”等一切跟网络IO有关的细节。

    sql-service

    如前所述,爬取的工作被当成任务来执行。每一项任务自身有着各种meta信息,启动一个任务的示例(instance)也关联运行时的meta信息,这些信息保存在关系数据库中并对外透出CRUD接口,在此不做赘述。

    回到正题:scheduler

    前文所述的各种service作为工具一样的组件封装完备后,为了让整个爬虫run起来,接下来该讨论整个爬虫引擎中最核心的scheduler-worker问题了。

    Master-Worker模式的分布式框架,从Doug Cutting写下Hadoop的第一行代码开始逐渐深入人心。很不幸的是我们并不能从中获得太多启发。

    先说worker。由于爬虫引擎是“永远在线”的,那么worker(一个独立的进程)也是永远在线的。因此我们想到了push方案:

    • worker-push方案。即,worker们在启动时,把自身注册到scheduler中。scheduler中维护了worker们的通讯ip列表,当有任务启动时,scheduler在列表中随机挑选worker,并在queue-service中fetch(N)条待爬取的URLs,然后post给worker去抓取。

    好的,开始自我挑战吧。push最大的一个毛病,就是scheduler需要与worker建立了直接的通信并时刻测试通信。不要试图从Hadoop中找方案,凭直觉我们又想到了poll方案:

    • worker-poll方案。即,worker保持一个while/true循环(sleeps may be),不断的看看queue-service里有没有要爬的网页。没有就continue,有就该干啥干啥。当然,worker们监听一个消息队列也是可取的,这样queue-service就不得不做一些改造。

    Worker以poll的方式监听消息队列自然是一个省事的好方式,但scheduler侧该如何设计呢?先看push方案:

    • scheduler-push方案。worker保持一个while/true循环(sleeps may be),不断的看看种子列表(或队列)里有没有要爬的网页。没有就continue,有就该干啥干啥。

    这样做的好处是:首先容易实现,逻辑简单明了,还可以动态调整sleep的时间,我靠太牛逼了!当然坏处是:不管怎么动态调整sleep的时间,始终是有滞后和开销的矛盾。不太符合对实时性有要求的场景,说好的“事件驱动”呢?

    屌,我们终于说到“事件驱动了”。我们现在要解决的问题是,如何感知到并且以最小的开销让任务启动?那么下面提供一个push+pull的方案,也是本文作为一篇“记叙文”的中心思想,权当抛砖引玉:

    • scheduler-push+pull方案。即,scheduler以较快频率(30秒)poll种子列表的返回数据(网页)是否有变动,如没有变动continue,如有变动解析出任务URL,作为事件push到消息队列中待worker爬取。

    再接再厉,现在似乎剩下的最后一个问题就是,如何以最小开销检测种子列表里的网页是否有变动呢?我们联想到了浏览器+刷新的方式。基于开源框架selenium,甚至可以很容易实现对于“网页局部元素是否有更新”这种监听的动作。

    show me the code

    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.TimeUnit;
    
    import com.google.common.collect.Lists;
    import com.google.common.collect.Maps;
    import io.reactivex.schedulers.Schedulers;
    import jodd.util.StringUtil;
    import org.jsoup.Jsoup;
    import org.jsoup.nodes.Document;
    import org.openqa.selenium.By;
    import org.openqa.selenium.JavascriptExecutor;
    import org.openqa.selenium.PageLoadStrategy;
    import org.openqa.selenium.WebDriver;
    import org.openqa.selenium.WebElement;
    import org.openqa.selenium.chrome.ChromeDriver;
    import org.openqa.selenium.chrome.ChromeOptions;
    import org.openqa.selenium.support.ui.ExpectedConditions;
    import org.openqa.selenium.support.ui.WebDriverWait;
    import utils.MD5;
    
    /**
     * @author craig
     * @since 2018年6月21日 上午10:50:54
     */
    public class SiteMonitor {
    
        private WebDriver driver;
        private Map<SiteBean, Integer> siteBeanMap;
        private Map<SiteBean, String> siteTokenMap;
    
        /**
         * 
         */
        public SiteMonitor(String chromePath) {
            System.setProperty("webdriver.chrome.driver", chromePath);
            ChromeOptions co = new ChromeOptions();
            co.setPageLoadStrategy(PageLoadStrategy.NORMAL);
            co.setHeadless(true);
            driver = new ChromeDriver(co);
            siteBeanMap = Maps.newHashMap();
            siteTokenMap = Maps.newHashMap();
        }
        
        /**
         * @throws InterruptedException 
         * 
         */
        public Document openInNewTab(WebDriver webDriver, SiteBean siteBean) throws InterruptedException {
            List<String> tabs = Lists.newArrayList(driver.getWindowHandles());
            ((JavascriptExecutor) driver).executeScript("window.open('about:blank','_blank');");
            tabs = Lists.newArrayList(driver.getWindowHandles());
            siteBeanMap.put(siteBean, tabs.size() - 1);
            siteTokenMap.put(siteBean, "");
            driver.switchTo().window(tabs.get(tabs.size() - 1));
            driver.navigate().to(siteBean.getSiteURL());
            return Jsoup.parse(driver.getPageSource(), siteBean.getHost());
        }
    
        /**
         * @throws InterruptedException 
         * 
         */
        public void monitoring(List<SiteBean> siteBeanList) throws InterruptedException {
    
            for (int i = 0; i < siteBeanList.size(); i++) {
                openInNewTab(driver, siteBeanList.get(i));
            }
    
            Schedulers.trampoline().createWorker().schedulePeriodically(new Runnable() {
                
                @Override
                public void run() {
    
                    for (int i = 0; i < siteBeanList.size(); i++) {
                        SiteBean sb = siteBeanList.get(i);
                        driver.switchTo().window(Lists.newArrayList(driver.getWindowHandles()).get(siteBeanMap.get(sb)));
                        
                        WebElement newContent = new WebDriverWait(driver, 60)
                                .until(ExpectedConditions.presenceOfElementLocated(By.cssSelector(sb.getElementLocated())));
    
                        String newToken = MD5.getMD5(newContent.getText());
                        if (!StringUtil.equals(siteTokenMap.get(sb), newToken)) {
                            siteTokenMap.put(sb, newToken);
                            
                            String html = newContent.getAttribute("outerHTML");
                            Document doc = Jsoup.parseBodyFragment(html);
                            System.out.println(sb.getSiteName() + "有更新:" + doc.text());
                            System.out.println("~~~~~~~~~~~~~~~~~~~~~~update!~~~~~~~~~~~~~~~~~~`");
                        } else {
                            System.out.println(sb.getSiteName() + "无更新");
                        }
                    }
                }
            }, 0, 1, TimeUnit.MINUTES);
        }
    
        public static void main(String[] args) throws Exception {
    
            SiteMonitor sm = new SiteMonitor("/Users/craig/chromedriver");
            //
            List<SiteBean> siteBeanList = Lists.newArrayList();
            
            SiteBean sb = new SiteBean();
            sb.setSiteName("财经");
            sb.setElementLocated("#instantPanel");
            sb.setHost("163.com");
            sb.setSiteURL("http://money.163.com/latest/");
            siteBeanList.add(sb);
            
            SiteBean sb2 = new SiteBean();
            sb2.setSiteName("体育");
            sb2.setElementLocated("#instantPanel");
            sb2.setHost("163.com");
            sb2.setSiteURL("http://sports.163.com/latest");
            siteBeanList.add(sb2);
            
            sm.monitoring(siteBeanList);
        }
    }
    

    如你所见,通过以上代码我们做到了近实时监听种子页面的变化,从而做到了让爬虫引擎永远在线,通过消息队列的方式解耦合了scheduler和worker,从而让爬虫朝稳定性上又迈进了一步。所以这也是本文作为一篇杂文的一个推论。也所以作为一篇杂文如果连推论都写了出来,可想而知这是什么屌杂文。周末了不如回家吃饭好了^^。

    相关文章

      网友评论

          本文标题:爬虫究竟是怎样开始的

          本文链接:https://www.haomeiwen.com/subject/cswkyftx.html