主从设计模式的Go实现

作者: 山中散人的博客 | 来源:发表于2019-05-11 19:08 被阅读9次

    在流水线设计模式之外,主从模式(Boss-worker)也是一种重要的多线程设计模式。在主从模式中,存在一个主人线程(Boss),它负责将工作分成同样的几份,并分配给从线程(Worker),Worker各自分头完成工作,最后Boss负责将多个Worker线程的工作成果合并,下面用Go来演示一下这种设计模式。

    演示用的例子实现的是类似于Linux中grep的功能,搜索文本中匹配的字符,并列出匹配的行。文本搜索的目标可能是多个文件,对每个文件的搜索是独立的,因此可以利用主从模式提高在多核CPU上提高多文件搜索的效率。

    首先定义数据结构,Worker线程的数目取决于CPU核的数目,因为文件的搜索涉及文件I/O和大量内存计算,Worker线程的数目超过CPU核数时会带来线程切换的额外负担,而对提高搜索效率没有效果。

    Result结构定义了搜索结果,文件名-匹配行数-匹配行内容。Job结构用于Boss线程和Worker线程的通信,搜索文件-搜索结果。

    var workers = runtime.NumCPU() //number of workers
    
    type Result struct {
        filename string //file name
        lino     int    //line number
        line     string //string content of line
    }
    
    type Job struct {
        filename string        //the name of the file on procesing
        results  chan<- Result //channel that any result to be sent
    }
    

    首先设置Go会使用的CPU核数,然后解析命令行参数,获得 搜索超时,匹配用的正则表达式和搜索目标文件。正则表达式在编译后获得一个表达式指针,这个指针会被Worker线程共享使用。通常,使用共享指针不是线程安全的,但是*regexp.Regexp在Go文档中已经被声明为线程安全了,因此可以放心使用。最后,调用grep()开始工作。

        runtime.GOMAXPROCS(runtime.NumCPU()) // Use all the machine's cores
        ...
        //Compile the input regex
        // lineRx is a shared pointer to value, which shall be a cause of
        //  concern since it's not thread safe, but Go doc *regexp.Regexp is
        //  safe to be shared in as many routines
        if lineRx, err := regexp.Compile(pattern); err != nil {
            log.Fatalf("invalid regexp: %s\n", err)
        } else {
            var timeout int64 = 1e9 * 60 * 10 // 10 minutes!
            if *timeoutOpt != 0 {
                timeout = *timeoutOpt * 1e9
            }
            grep(timeout, lineRx, commandLineFiles(files))
        }
    
    

    在grep()中,创建三个双向的channel,jobs用于Boss线程分配工作给Worker, results用于Worker线程汇报搜索结果,done中是标志结束的channel。results channel设置了最长1000的缓冲区,当缓冲区满,而Worker线程需要向results中添加数据时, Worker线程会被阻塞直到results的数据被处理缓冲区有空余。

    func grep(timeout int64, lineRx *regexp.Regexp, filenames []string) {
        //create channels
        jobs := make(chan Job, workers)
        results := make(chan Result, minimum(1000, len(filenames)))
        done := make(chan struct{}, workers)
    
        go addJobs(jobs, filenames, results) //boss assign jobs
        for i := 0; i < workers; i++ {
            go doJobs(done, lineRx, jobs) //worker do jobs
        }
        //wait for work to submit work result
        waitAndProcessResults(timeout, done, results)
    }
    

    addJobs()将文件名和result channel发送到job channel。doJobs()接受job channel和正则表达式,进行搜索。所有的工作完成后,通过done channel发送完成标志。

    func addJobs(jobs chan<- Job, filenames []string, results chan<- Result) {
        for _, filename := range filenames {
            jobs <- Job{filename, results}
        }
        close(jobs)
    }
    
    func doJobs(done chan<- struct{}, lineRx *regexp.Regexp, jobs <-chan Job) {
        for job := range jobs {
            job.Do(lineRx)
        }
        done <- struct{}{}
    }
    

    搜索的实现按下不表, waitAndProcessResults()等待Worker线程完成并打印结果。select会阻塞Boss线程直到接收到result, finish 或者 done channel 数据。阻塞的Boss线程会睡眠,从而不会消耗CPU资源来死等。每次收到一个done, 表明一个Worker线程的工作完成,当所有的Worker线程都完成后,Boss线程不需要阻塞,可以顺畅地打印所有的结果。

    func waitAndProcessResults(timeout int64, done <-chan struct{},
        results <-chan Result) {
        finish := time.After(time.Duration(timeout))
        for working := workers; working > 0; {
            select { // Blocking
            case result := <-results:
                fmt.Printf("%s:%d:%s\n", result.filename, result.lino,
                    result.line)
            case <-finish:
                fmt.Println("timed out")
                return // Time's up so finish with what results there were
            case <-done:
                working--
            }
        }
        for {
            select { // Nonblocking
            case result := <-results:
                fmt.Printf("%s:%d:%s\n", result.filename, result.lino,
                    result.line)
            case <-finish:
                fmt.Println("timed out")
                return // Time's up so finish with what results there were
            default:
                return
            }
        }
    }
    

    搜索的具体实现对于本文的主题没有太大的意义,因此做简要说明,将文件全部读入缓存后,按行检测匹配。

    func (job Job) Do(lineRx *regexp.Regexp) {
        file, err := os.Open(job.filename)
        if err != nil {
            log.Printf("error: %s\n", err)
            return
        }
        defer file.Close()
        reader := bufio.NewReader(file)
        for lino := 1; ; lino++ {
            line, err := reader.ReadBytes('\n')
            line = bytes.TrimRight(line, "\n\r")
            if lineRx.Match(line) {
                job.results <- Result{job.filename, lino, string(line)}
            }
            if err != nil {
                if err != io.EOF {
                    log.Printf("error:%d: %s\n", lino, err)
                }
                break
            }
        }
    }
    

    最后验证一下程序。

    ./cgrep runtime.GOOS cgrep.go
    cgrep.go:90:    if runtime.GOOS == "windows" {
    

    代码清单:[https://github.com/KevinACoder/gobook/blob/master/src/cgrep3/cgrep.go]

    相关文章

      网友评论

        本文标题:主从设计模式的Go实现

        本文链接:https://www.haomeiwen.com/subject/thcfaqtx.html