带着问题看源码:
1,crawler4j使用的HTTP请求工具是什么
2,crawler4j如何实现设置爬虫种子页面后,不断自动深入爬取
3,crawler4j如何实现中断后可恢复爬取
4,多线程爬虫的实现
1,crawler4j使用的HTTP请求工具是什么
使用的是Httpclient:
compile group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.7'
2,如何实现设置爬虫种子页面后,不断自动深入爬取
首先,设置种子页面:
CrawlController controller = new CrawlController(config, pageFetcher, robotstxtServer);
controller.addSeed("http://r.cnki.net/kns/brief/result.aspx?dbprefix=gwkt");
种子页面url被封装到WebURL类,然后传给负责管理待爬取页面的Frontier类,Frontier会将种子页面存入数据库:
//CrawlController将种子页面交给Frontier
public void schedule(WebURL url) {
int maxPagesToFetch = config.getMaxPagesToFetch();
synchronized (mutex) {
try {
if (maxPagesToFetch < 0 || scheduledPages < maxPagesToFetch) {
workQueues.put(url);
scheduledPages++;
counters.increment(Counters.ReservedCounterNames.SCHEDULED_PAGES);
}
} catch (DatabaseException e) {
logger.error("Error while putting the url in the work queue", e);
}
}
}
//WorkQueues的put函数
public void put(WebURL url) {
DatabaseEntry value = new DatabaseEntry();
webURLBinding.objectToEntry(url, value);
Transaction txn = beginTransaction();
urlsDB.put(txn, getDatabaseEntryKey(url), value);
commit(txn);
}
我们使用crawler4j需要自定义一个继承自WebCrawler的类,而WebCrawler实现了Runnable:
public class WebCrawler implements Runnable {
使用crawler4j,在代码中配置完各个参数后,需要执行:
controller.start(MyCrawler.class, numberOfCrawlers);
之后,CrawlController会让我们实现的类在新线程中跑起来,而父类WebCrawler中的run方法中,Frontier会不断从数据库中取出需要解析的url,并在processPage方法中进行解析和处理:
@Override
public void run() {
onStart();
while (true) {
List<WebURL> assignedURLs = new ArrayList<>(50);
isWaitingForNewURLs = true;
//从数据库中取出url
frontier.getNextURLs(50, assignedURLs);
isWaitingForNewURLs = false;
if (assignedURLs.isEmpty()) {
if (frontier.isFinished()) {
return;
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
logger.error("Error occurred", e);
}
} else {
for (WebURL curURL : assignedURLs) {
if (myController.isShuttingDown()) {
logger.info("Exiting because of controller shutdown.");
return;
}
if (curURL != null) {
curURL = handleUrlBeforeProcess(curURL);
//解析和处理url
processPage(curURL);
frontier.setProcessed(curURL);
}
}
}
}
}
processPage方法解析页面时,获取页面中所有可点击的链接,该链接如果通过了可覆盖的shouldVisit方法检验,即属于需要爬取的链接,则将其放入数据库:
//调用Parser类解析url的HTML,会解析出页面中所有的链接
parser.parse(page, curURL.getURL());
//判断是否爬取页面中的链接,默认直接返回true
if (shouldFollowLinksIn(page.getWebURL())) {
ParseData parseData = page.getParseData();
List<WebURL> toSchedule = new ArrayList<>();
int maxCrawlDepth = myController.getConfig().getMaxDepthOfCrawling();
//获取页面中所有的链接,遍历进行处理
for (WebURL webURL : parseData.getOutgoingUrls()) {
webURL.setParentDocid(curURL.getDocid());
webURL.setParentUrl(curURL.getURL());
int newdocid = docIdServer.getDocId(webURL.getURL());
if (newdocid > 0) {
// This is not the first time that this Url is visited. So, we set the
// depth to a negative number.
webURL.setDepth((short) -1);
webURL.setDocid(newdocid);
} else {
webURL.setDocid(-1);
webURL.setDepth((short) (curURL.getDepth() + 1));
if ((maxCrawlDepth == -1) || (curURL.getDepth() < maxCrawlDepth)) {
//判断链接是否符合爬取条件
if (shouldVisit(page, webURL)) {
if (robotstxtServer.allows(webURL)) {
webURL.setDocid(docIdServer.getNewDocID(webURL.getURL()));
toSchedule.add(webURL);
} else {
logger.debug(
"Not visiting: {} as per the server's \"robots.txt\" " +
"policy", webURL.getURL());
}
} else {
logger.debug(
"Not visiting: {} as per your \"shouldVisit\" policy",
webURL.getURL());
}
}
}
}
//将页面中的链接放入待爬取数据库
frontier.scheduleAll(toSchedule);
parseData.setOutgoingUrls方法中,使用了开源URL检测器项目https://github.com/linkedin/URL-Detector/,具体使用代码如下:
//input就是HTML字符串
UrlDetector detector = new UrlDetector(input, getOptions());
//获得html中所有的链接
List<Url> urls = detector.detect();
3,crawler4j如何实现中断后可恢复爬取
Frontier中定义了专门用于存放当前正在爬取的一批链接的数据库InProcessPagesDB。
每次这一批链接准备进行爬取,就放入InProcessPagesDB:
public void getNextURLs(int max, List<WebURL> result) {
while (true) {
synchronized (mutex) {
if (isFinished) {
return;
}
try {
List<WebURL> curResults = workQueues.get(max);
workQueues.delete(curResults.size());
if (inProcessPages != null) {
for (WebURL curPage : curResults) {
inProcessPages.put(curPage);
}
}
result.addAll(curResults);
当这一批链接全部爬取结束,就将InProcessPagesDB中的数据清除:
public void setProcessed(WebURL webURL) {
counters.increment(Counters.ReservedCounterNames.PROCESSED_PAGES);
if (inProcessPages != null) {
if (!inProcessPages.removeURL(webURL)) {
logger.warn("Could not remove: {} from list of processed pages.", webURL.getURL());
}
}
}
当一次爬取中断后再次执行,Frontier的构造函数中就会从InProcessPagesDB中取上次没爬完的链接继续爬取:
public Frontier(Environment env, CrawlConfig config) {
this.config = config;
this.counters = new Counters(env, config);
try {
workQueues = new WorkQueues(env, DATABASE_NAME, config.isResumableCrawling());
//判断是否设置了中断继续
if (config.isResumableCrawling()) {
scheduledPages = counters.getValue(Counters.ReservedCounterNames.SCHEDULED_PAGES);
//存储上一次未爬取页面信息的数据库
inProcessPages = new InProcessPagesDB(env);
long numPreviouslyInProcessPages = inProcessPages.getLength();
if (numPreviouslyInProcessPages > 0) {
logger.info("Rescheduling {} URLs from previous crawl.",
numPreviouslyInProcessPages);
scheduledPages -= numPreviouslyInProcessPages;
//从数据库中获取所有未爬取的url
List<WebURL> urls = inProcessPages.get(IN_PROCESS_RESCHEDULE_BATCH_SIZE);
while (!urls.isEmpty()) {
//将urls放入这次待爬取的数据库
scheduleAll(urls);
inProcessPages.delete(urls.size());
urls = inProcessPages.get(IN_PROCESS_RESCHEDULE_BATCH_SIZE);
}
}
不过,如果在一批链接爬取结束后InProcessPagesDB刚把数据清除的时候爬取被中断,另一批链接此时还放入InProcessPagesDB,那么就无法从中断处继续爬取了。有兴趣的小伙伴可以确认一下是否如我所说。
4,多线程爬虫的实现
根据设置的线程数,CrawlController会启对应个线程:
for (int i = 1; i <= numberOfCrawlers; i++) {
T crawler = crawlerFactory.newInstance();
Thread thread = new Thread(crawler, "Crawler " + i);
crawler.setThread(thread);
crawler.init(i, this);
thread.start();
crawlers.add(crawler);
threads.add(thread);
logger.info("Crawler {} started", i);
}
那么,Crawler4j为多线程同步做了哪些工作呢?答案是...几乎没有。因为没有必要,唯一需要并发处理的数据是需要爬取的页面url,但这部分数据的多线程同步已经交由数据库来做了,所以,各自线程爬属于自己的页面就可以了,互不冲突。
网友评论