目标
首先CrawlerSQL会包含两个服务:
调度服务,比如每个url的抓取周期。
抓取服务:调度会给url,抓取服务负责实际的抓取
在StreamingPro里,我们仅仅会实现抓取服务,也就是写一段SQL脚本。至于每个脚本什么时候执行是调度服务的事情,这里我们需要区分开来。而且调度也算是一件非常复杂的事情。
我希望尽可能复用MLSQL的语法格式,尽量不添加新的SQL语法。实际上经过基本的尝试,发现是完全可以做到的。
抽象
我这里简单的把抓取分成两个类型:
- url列表抓取,也就是通常我们说的入口页,比如博客首页通常都是一堆文章列表。
- 内容抓取,也就是要把标题,时间,内容扣取出来。
每个入口页,在我看来都是一张表,里面有两个字段: url,root_url。 url 就是入口也里的内容的url,root_url则是入口页的url地址。
具体语法如下:
load crawlersql.`https://www.csdn.net/nav/ai`
options matchXPath="//ul[@id='feedlist_id']//div[@class='title']//a/@href"
and fetchType="list"
as aritle_url_table_source;
内容详情的抓取,在我看来,用UDF就足够了,我提供了三个方法:
crawler_auto_extract_title
crawler_auto_extract_body
crawler_extract_xpath
标题和正文一般可以做到自动抽取。其他比如时间,作者等则需要通过xpath抽取。
这里举一个例子:
select
crawler_auto_extract_title(html) as title,
crawler_auto_extract_body(html) as body,
crawler_extract_xpath(html,"//main/article//span[@class='time']") as created_time
from aritle_list
where html is not null
as article_table;
当然上面都是形式上的,一个爬虫真正要解决的问题有比如:
- IP 代理池,黑白名单。
- 异步加载网页的抓取
- 登录/验证码
- 动态更新周期
- 去重url等
现阶段重点还是考量语法层面的东西。
资源
目前我实现了一个探索版的,可参看这里: streamingpro-crawler,具体的案例有:
set tempStore="/tmp/streamingpro_crawler"
-- 抓取列表页的url
load crawlersql.`https://www.csdn.net/nav/ai`
options matchXPath="//ul[@id='feedlist_id']//div[@class='title']//a/@href"
and fetchType="list"
as aritle_url_table_source;
-- 抓取的url去重
select url,first(root_url) as root_url from aritle_url_table_source
group by url
as aritle_url_table;
-- 获得历史已经抓取的url
load parquet.`${tempStore}`
as aritcle_url_history;
-- 排除掉已经抓去过的url
select aut.url as url ,aut.root_url as root_url from aritle_url_table aut
left join aritcle_url_history auh
on aut.url=auh.url
where auh.url is null
as filter_aritle_url_table;
-- 抓取全文,并且存储
select crawler_request(regexp_replace(url,"http://","https://")) as html
from filter_aritle_url_table
where url is not null
as aritle_list;
save overwrite aritle_list as parquet.`/tmp/streamingpro_crawler_content`;
-- 对内容进行解析
load parquet.`/tmp/streamingpro_crawler_content` as aritle_list;
select
crawler_auto_extract_title(html) as title,
crawler_auto_extract_body(html) as body,
crawler_extract_xpath(html,"//main/article//span[@class='time']") as created_time
from aritle_list
where html is not null
as article_table;
-- 对最后的抓取结果进行保存
save overwrite article_table as json.`/tmp/article_table`;
-- 已经抓取过的url也需要进行增量存储,方便后续过滤
save append filter_aritle_url_table as parquet.`${tempStore}`;
运行时,需要先保证/tmp/streamingpro_crawler
不能为空,你可以通过下面脚本初始化:
select "" as url ,"" as root_url
as em;
save overwrite em parquet.`/tmp/streamingpro_crawler`
网友评论