美文网首页
爬虫究竟是怎样开始的

爬虫究竟是怎样开始的

作者: 老钊 | 来源:发表于2018-06-22 16:08 被阅读0次

爬虫究竟是怎样开始的?

爬虫究竟是怎样开始的?这个问题是一个很难的哲学问题。如果看官对技术术语更敏感,那么可以表述为,爬虫,严格说爬虫的scheduler,如果需要保持一个status,到底是poll还是push?
本文将讨论并解决这个问题。(注:不说人话)

从“一次性”爬虫开始说起

image.png

上图为最简单的一次爬虫过程。究其本质,这个过程的核心是一个"种子页面"->"目标页面"->"目标页面链接"->"目标页面"的循环。也就是说,给定若干种子页面,和一些目标页面链接的匹配规则Rule,一定可以通过不断循环,把这个网站Site上所有被覆盖到的、符合匹配规则的页面集合P={page ∈ Site|Rule}获取到。这个P是一个有穷集合。P获取到了,爬虫结束。

讨论到这,大概勉强达到大学计算机专业本科期末作业的水平。但是想要及格,或许还应该考虑这么几个方面:

  • 数据库:这个话题能截好多图:)
  • 多线程:P的数量如果很大,我需要把“下载网页、保存数据库”这样的动作放进多线程里跑跑。
  • 前端队列:我需要一个前端队列!“生产者-消费者”模式,呵呵。
  • 历史队列:是的,我还需要一个历史队列!不然爬重复了怎么办?
  • Javascript:淘宝的网页怎么爬?
  • 反爬:这是一个long story,甚至比人类的历史还要久远。

Done。至此我们已经实现了一个异常牛逼的爬虫程序。如果想把“程序”变成“系统”,则需要考虑更多工程上的东西:如何分布式。

稳定性迁移

分布式的终极目的,是要把单机程序(进程)里任何不稳定因素,通过迁移到外部更稳定的程序(进程)的方式,达到系统全局更稳定的目的。另外一方面,通过这几年被“微服务”概念不断地洗脑,工程人员最喜欢用的一个词是“解耦合”。

更进一步,如果我们希望我们的爬虫是一个“永远在线”的服务(引擎),“爬某一个网站的网页”这样的事情被当成某一个任务(task),随时启动、暂停、停止,这不得不让我们重新在架构上考虑得更多。

document-service

之前讨论的几个方面中,最应该第一个抽取出来的服务就是存储服务。无论选择使用MongoDB还是ES来保存网页,对外屏蔽底层的存储方案无疑是个优雅的idea。对外透出save()和saveBatch()操作。

queue-service

抽取出“前端队列”和“历史队列”作为队列服务。如果你将Redis作为选型方案,那么“前端队列”是一个List,支持类似fetch(30)这样的操作;“历史队列”则是一个Set,支持hasVisit()和visit(link)这样的操作。

downloader-service

作为高阶玩家,抽离出网页下载服务也是有必要的。URL进,Document出。downloader对外屏蔽了诸如“异常重试”、“中文编码自适应”、“代理IP池维护和切换”、“Headless浏览器渲染”、“http连接池复用”等一切跟网络IO有关的细节。

sql-service

如前所述,爬取的工作被当成任务来执行。每一项任务自身有着各种meta信息,启动一个任务的示例(instance)也关联运行时的meta信息,这些信息保存在关系数据库中并对外透出CRUD接口,在此不做赘述。

回到正题:scheduler

前文所述的各种service作为工具一样的组件封装完备后,为了让整个爬虫run起来,接下来该讨论整个爬虫引擎中最核心的scheduler-worker问题了。

Master-Worker模式的分布式框架,从Doug Cutting写下Hadoop的第一行代码开始逐渐深入人心。很不幸的是我们并不能从中获得太多启发。

先说worker。由于爬虫引擎是“永远在线”的,那么worker(一个独立的进程)也是永远在线的。因此我们想到了push方案:

  • worker-push方案。即,worker们在启动时,把自身注册到scheduler中。scheduler中维护了worker们的通讯ip列表,当有任务启动时,scheduler在列表中随机挑选worker,并在queue-service中fetch(N)条待爬取的URLs,然后post给worker去抓取。

好的,开始自我挑战吧。push最大的一个毛病,就是scheduler需要与worker建立了直接的通信并时刻测试通信。不要试图从Hadoop中找方案,凭直觉我们又想到了poll方案:

  • worker-poll方案。即,worker保持一个while/true循环(sleeps may be),不断的看看queue-service里有没有要爬的网页。没有就continue,有就该干啥干啥。当然,worker们监听一个消息队列也是可取的,这样queue-service就不得不做一些改造。

Worker以poll的方式监听消息队列自然是一个省事的好方式,但scheduler侧该如何设计呢?先看push方案:

  • scheduler-push方案。worker保持一个while/true循环(sleeps may be),不断的看看种子列表(或队列)里有没有要爬的网页。没有就continue,有就该干啥干啥。

这样做的好处是:首先容易实现,逻辑简单明了,还可以动态调整sleep的时间,我靠太牛逼了!当然坏处是:不管怎么动态调整sleep的时间,始终是有滞后和开销的矛盾。不太符合对实时性有要求的场景,说好的“事件驱动”呢?

屌,我们终于说到“事件驱动了”。我们现在要解决的问题是,如何感知到并且以最小的开销让任务启动?那么下面提供一个push+pull的方案,也是本文作为一篇“记叙文”的中心思想,权当抛砖引玉:

  • scheduler-push+pull方案。即,scheduler以较快频率(30秒)poll种子列表的返回数据(网页)是否有变动,如没有变动continue,如有变动解析出任务URL,作为事件push到消息队列中待worker爬取。

再接再厉,现在似乎剩下的最后一个问题就是,如何以最小开销检测种子列表里的网页是否有变动呢?我们联想到了浏览器+刷新的方式。基于开源框架selenium,甚至可以很容易实现对于“网页局部元素是否有更新”这种监听的动作。

show me the code

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.reactivex.schedulers.Schedulers;
import jodd.util.StringUtil;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.openqa.selenium.By;
import org.openqa.selenium.JavascriptExecutor;
import org.openqa.selenium.PageLoadStrategy;
import org.openqa.selenium.WebDriver;
import org.openqa.selenium.WebElement;
import org.openqa.selenium.chrome.ChromeDriver;
import org.openqa.selenium.chrome.ChromeOptions;
import org.openqa.selenium.support.ui.ExpectedConditions;
import org.openqa.selenium.support.ui.WebDriverWait;
import utils.MD5;

/**
 * @author craig
 * @since 2018年6月21日 上午10:50:54
 */
public class SiteMonitor {

    private WebDriver driver;
    private Map<SiteBean, Integer> siteBeanMap;
    private Map<SiteBean, String> siteTokenMap;

    /**
     * 
     */
    public SiteMonitor(String chromePath) {
        System.setProperty("webdriver.chrome.driver", chromePath);
        ChromeOptions co = new ChromeOptions();
        co.setPageLoadStrategy(PageLoadStrategy.NORMAL);
        co.setHeadless(true);
        driver = new ChromeDriver(co);
        siteBeanMap = Maps.newHashMap();
        siteTokenMap = Maps.newHashMap();
    }
    
    /**
     * @throws InterruptedException 
     * 
     */
    public Document openInNewTab(WebDriver webDriver, SiteBean siteBean) throws InterruptedException {
        List<String> tabs = Lists.newArrayList(driver.getWindowHandles());
        ((JavascriptExecutor) driver).executeScript("window.open('about:blank','_blank');");
        tabs = Lists.newArrayList(driver.getWindowHandles());
        siteBeanMap.put(siteBean, tabs.size() - 1);
        siteTokenMap.put(siteBean, "");
        driver.switchTo().window(tabs.get(tabs.size() - 1));
        driver.navigate().to(siteBean.getSiteURL());
        return Jsoup.parse(driver.getPageSource(), siteBean.getHost());
    }

    /**
     * @throws InterruptedException 
     * 
     */
    public void monitoring(List<SiteBean> siteBeanList) throws InterruptedException {

        for (int i = 0; i < siteBeanList.size(); i++) {
            openInNewTab(driver, siteBeanList.get(i));
        }

        Schedulers.trampoline().createWorker().schedulePeriodically(new Runnable() {
            
            @Override
            public void run() {

                for (int i = 0; i < siteBeanList.size(); i++) {
                    SiteBean sb = siteBeanList.get(i);
                    driver.switchTo().window(Lists.newArrayList(driver.getWindowHandles()).get(siteBeanMap.get(sb)));
                    
                    WebElement newContent = new WebDriverWait(driver, 60)
                            .until(ExpectedConditions.presenceOfElementLocated(By.cssSelector(sb.getElementLocated())));

                    String newToken = MD5.getMD5(newContent.getText());
                    if (!StringUtil.equals(siteTokenMap.get(sb), newToken)) {
                        siteTokenMap.put(sb, newToken);
                        
                        String html = newContent.getAttribute("outerHTML");
                        Document doc = Jsoup.parseBodyFragment(html);
                        System.out.println(sb.getSiteName() + "有更新:" + doc.text());
                        System.out.println("~~~~~~~~~~~~~~~~~~~~~~update!~~~~~~~~~~~~~~~~~~`");
                    } else {
                        System.out.println(sb.getSiteName() + "无更新");
                    }
                }
            }
        }, 0, 1, TimeUnit.MINUTES);
    }

    public static void main(String[] args) throws Exception {

        SiteMonitor sm = new SiteMonitor("/Users/craig/chromedriver");
        //
        List<SiteBean> siteBeanList = Lists.newArrayList();
        
        SiteBean sb = new SiteBean();
        sb.setSiteName("财经");
        sb.setElementLocated("#instantPanel");
        sb.setHost("163.com");
        sb.setSiteURL("http://money.163.com/latest/");
        siteBeanList.add(sb);
        
        SiteBean sb2 = new SiteBean();
        sb2.setSiteName("体育");
        sb2.setElementLocated("#instantPanel");
        sb2.setHost("163.com");
        sb2.setSiteURL("http://sports.163.com/latest");
        siteBeanList.add(sb2);
        
        sm.monitoring(siteBeanList);
    }
}

如你所见,通过以上代码我们做到了近实时监听种子页面的变化,从而做到了让爬虫引擎永远在线,通过消息队列的方式解耦合了scheduler和worker,从而让爬虫朝稳定性上又迈进了一步。所以这也是本文作为一篇杂文的一个推论。也所以作为一篇杂文如果连推论都写了出来,可想而知这是什么屌杂文。周末了不如回家吃饭好了^^。

相关文章

网友评论

      本文标题:爬虫究竟是怎样开始的

      本文链接:https://www.haomeiwen.com/subject/cswkyftx.html