学习总结:
- 设计的过程是简单到复杂的演化过程
- 每一次的变化都应该是需求决定的
- 抽象的过程是清晰的概念和逻辑划分的过程
单任务版爬虫架构
- 第一步 将请求放入到engine的队列中
- 第二步 engine重队列取出请求交给fetcher去下载,fetcher将下载结果交给engine
- 第三步 engine将下载结果交给parser去解析,解析后结果交给engine进行下一步调度
![](https://img.haomeiwen.com/i2119458/77a6a0b722a35843.png)
package engine
import (
"hans/learn/spider/crawler/fetcher"
"log"
)
func Run(seeds ...Request){
var requests []Request
for _,r := range seeds {
requests = append(requests,r)
}
for len(requests) > 0 {
r := requests[0]
requests =requests[1:]
log.Printf("Fetching %s",r.Url)
body,err :=fetcher.Fetch(r.Url)
if err != nil {
log.Printf("Fetcher: error " +
"fetching url %s: %v",r.Url,err)
continue
}
parseResult := r.ParserFunc(body)
requests = append(requests,parseResult.Requests...)
for _,item := range parseResult.Items {
log.Printf("Got item %v",item)
}
}
}
改进一
上图中的parser和fetcher可以合并成一步,叫做worker
![](https://img.haomeiwen.com/i2119458/9de5a9ff05252745.png)
func Run(seeds ...Request){
var requests []Request
for _,r := range seeds {
requests = append(requests,r)
}
for len(requests) > 0 {
r := requests[0]
requests =requests[1:]
log.Printf("Fetching %s",r.Url)
parseResult,err := worker(r)
if err != nil {
continue
}
requests = append(requests,parseResult.Requests...)
for _,item := range parseResult.Items {
log.Printf("Got item %v",item)
}
}
}
func worker(request Request) (ParseResult,error){
body,err := fetcher.Fetch(request.Url)
if err != nil {
log.Printf("Fetcher: error " +
"fetching url %s: %v",r.Url,err)
return ParseResult{},err
}
return request.ParserFunc(body),nil
}
改进二 并发版爬虫
- 第一步,将request传递给engine
- 第二步,engine将request交给scheduler调度
- 第三步,scheduler将request放入channel,然后由worker进行解析
将解析结果再放入channel,scheduler从channel中获取
parseResult继续调度任务
engine做了哪些事情?
- 根据用户配置WorkerCount,开启多个goroutine实现并发
- 创建了供所有worker使用的channel
- 循环消费channel中的request,有就交给scheduler进行调度
scheduler做了哪些事情?如何履行调度的指责?
- 不停的把request放入channel中,供worker消费.......
![](https://img.haomeiwen.com/i2119458/18522d8b437670ca.png)
engine.go
package engine
import "log"
type ConcurrentEngine struct {
Scheduler Scheduler
WorkerCount int
}
type Scheduler interface {
Submit(Request)
ConfigMasterWorkerChan(chan Request)
}
func (e *ConcurrentEngine) Run(seeds ...Request){
//scheduler创建多个worker去处理任务
in := make(chan Request)
out := make(chan ParseResult)
e.Scheduler.ConfigMasterWorkerChan(in)
for i :=0;i<e.WorkerCount;i++{
createWorker(in,out)
}
//将request扔给scheduler处理
for _,r := range seeds {
e.Scheduler.Submit(r)
}
//循环channel 将channel中的request交给scheduler去调度
countItem :=0
result := <- out
//循环处理channel
for {
//对item计数
for _,item := range result.Items{
log.Printf("Got item #%d %v \n",countItem,item)
countItem ++
}
//将request交给scheduler继续调度
for _, request := range result.Requests {
log.Printf("Get Url %s",request.Url)
e.Scheduler.Submit(request)
}
}
}
func createWorker(in chan Request,out chan ParseResult){
//每一个worker都是一个goroutine,worker功能是fetcher和parser
go func(){
for {
//从in 这个channel中获取request, fetcher处理后,将结果放回out 这个channel中
request := <- in
result,err :=worker(request)
if err != nil {
continue
}
out <- result
}
}()
}
scheduler.go
package scheduler
import "hans/learn/spider/crawler/engine"
type SimpleScheduler struct {
WorkerChan chan engine.Request
}
func (s *SimpleScheduler) Submit (r engine.Request) {
go func(){
s.WorkerChan <- r
}()
}
func (s *SimpleScheduler) ConfigMasterWorkerChan(c chan engine.Request){
s.WorkerChan = c
}
注: 代码示例地址
网友评论