并行

作者: petit_kayak | 来源:发表于2018-06-15 22:18 被阅读5次

    并行能力是GO语言的一大强项,甚至我个人认为并行能力是GO最重要的竞争力,没有之一。

    启动一个并行任务

    启动一个并行任务在GO语言里简单到了不能再简单的程度,只需要go关键词加上需要在并行线程中启动的函数就可以了。例如:

    package main
    
    import (
        "fmt"
        "time"
    )
    
    func say(s string) {
        for i := 0; i < 5; i++ {
            time.Sleep(100 * time.Millisecond)
            fmt.Println(s)
        }
    }
    
    func main() {
        go say("world")
        go say("hello")
        
        time.Sleep(1000 * time.Millisecond)
    }
    

    这段代码在主程序中启动两个并行任务,分别打印helloworld两个字符串五次,由于是并行启动的,所以打印出来的顺序每一次运行都不同。

    Tip: 调用time模块的睡眠函数Sleep,参数需要明确的写成“数量*单位”的模式,这样就可以非常明确的指出睡眠的时间,避免了C/C++Java等语言中msleepusleepsleep调用乱七八糟的问题,更别提Windows上面的坑爹的Sleep函数了。

    通道

    GO语言中设计的这个通道简直是并行同步的神奇。首先,就如它的名字那样,通道可以实现数据在不同并行任务中的传递,例如:

    package main
    
    import "fmt"
    
    func sum(s []int, c chan int) {
        sum := 0
        for _, v := range s {
            sum += v
        }
        fmt.Printf("Finished %d\n", sum)
        c <- sum // send sum to c
    }
    
    func main() {
        s := []int{7, 2, 8, -9, 4, 0}
    
        c := make(chan int)
        go sum(s[:len(s)/2], c)
        go sum(s[len(s)/2:], c)
        x, y := <-c, <-c // receive from c
    
        fmt.Println(x, y, x+y)
    }
    

    从这个例子可以看出,由于通过通道获取数据的时候,要等待其他并行任务向通道放入数据,因此,通道在这里实际上还承担了join的功能。同样的功能,如果使用C/C++实现,需要至少定义一个存储结果的变量和一个用于保护这个变量的互斥量,同时还需要在主函数中调用join函数等待子线程结束。

    不止于此,通道同时还可以当作缓冲区使用,例如:

    package main
    
    import "fmt"
    
    func fibonacci(n int, c chan int) {
        x, y := 0, 1
        for i := 0; i < n; i++ {
            c <- x
            x, y = y, x+y
        }
        close(c)
    }
    
    func main() {
        c := make(chan int, 5)
        go fibonacci(20, c)
        index := 0
        for val := range c {
            fmt.Println(index,val)
            index += 1
        }
    }
    

    从这个例子里,我仿佛看到了通道在网络通信中的逆天使用:

    • 使用通道定义缓冲区
    • 在网络接收的并行任务中,将接收到的数据包塞入通道
    • 在协议解析任务中,从通道中获取数据包,直到通道关闭

    下面就是我设想的伪代码:

    func net_process(..., ch chan BUFFER_TYPE) {
        ...
        for {
            if ... {//received a new datagram
                ch <- new_datagram
            }
            if ... {//escape condition
                break
            }
        }
        close(ch)
    }
    
    func protocol_process(..., ch chan BUFFER_TYPE) {
        for datagram := range ch {
            //process protocol
        }
    }
    
    func main() {
        ...
        ch := make(chan BUFFER_TYPE, BUFFER_SIZE)
        go net_process(...,ch)
        go protocol_process(...,ch)
        ...
    }
    
    

    GO语言异常简洁的并行运行指令go加上强大的通道,让它并行处理能力卓尔不群。

    互斥量

    当然,只有通道对于并行任务之间的数据保护仍然是不够的,GO语言同样给出了互斥量sync.Mutex。互斥量的使用和其他语言没有什么不同,也是LockUnlock函数的配对使用,唯一不同的是可以利用GO语言的defer语句,在调用Lock函数之后就可以写下Unlock函数,例如:

    // SafeCounter is safe to use concurrently.
    type SafeCounter struct {
        v   map[string]int
        mux sync.Mutex
    }
    
    // Inc increments the counter for the given key.
    func (c *SafeCounter) Inc(key string) {
        c.mux.Lock()
        // Lock so only one goroutine at a time can access the map c.v.
        c.v[key]++
        c.mux.Unlock()
    }
    
    // Value returns the current value of the counter for the given key.
    func (c *SafeCounter) Value(key string) int {
        c.mux.Lock()
        // Lock so only one goroutine at a time can access the map c.v.
        defer c.mux.Unlock()
        return c.v[key]
    }
    

    例子:网络爬虫

    下面是我在gotour最后的网络爬虫任务中作出的答案,因为是练习并行处理,所以尽可能将go、通道和互斥量都用了起来,实现了功能,但并不是最优的实现方式:

    type Fetcher interface {
        // Fetch returns the body of URL and
        // a slice of URLs found on that page.
        Fetch(url string) (body string, urls []string, err error)
    }
    
    type UrlCaches struct {
        cached *list.List
        mux sync.Mutex
    }
    
    func (caches *UrlCaches) TryCache(url string) bool {
        caches.mux.Lock()
        defer caches.mux.Unlock()
        for url_c := caches.cached.Front(); url_c!=nil; url_c = url_c.Next() {
            if url_c.Value==url {
                return false
            }
        }
        caches.cached.PushBack(url)
        return true
    }
    
    type FetchCounter struct {
        count int
        mux sync.Mutex
    }
    
    func (c *FetchCounter) Inc() {
        c.mux.Lock()
        c.count = c.count+1
        c.mux.Unlock()
    }
    
    func (c *FetchCounter) Dec() {
        c.mux.Lock()
        if c.count>0 {
            c.count = c.count-1
        }
        c.mux.Unlock()
    }
    
    func (c *FetchCounter) Value() int {
        c.mux.Lock()
        defer c.mux.Unlock()
        return c.count
    }
    
    func Crawl_p(url string, depth int, fetcher Fetcher, counter *FetchCounter, caches *UrlCaches) {
        defer counter.Dec()
        if depth<=0 {
            return
        }
        body, urls, err := fetcher.Fetch(url)//fetch
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Printf("found: %s %q\n", url, body)
        for _, u := range urls {
            if caches.TryCache(u) {
                counter.Inc()
                go Crawl_p(u,depth-1,fetcher,counter,caches)
            }
        }
    }
    
    // Crawl uses fetcher to recursively crawl
    // pages starting with url, to a maximum of depth.
    func Crawl(url string, depth int, fetcher Fetcher) {
        if depth<=0 {
            return
        }
        counter := FetchCounter{count: 0}
        cached := UrlCaches{cached: list.New()}
        sleep_cnt := 0
        if cached.TryCache(url) {
            counter.Inc()
            go Crawl_p(url,depth,fetcher,&counter,&cached)
        }
        for {
            if counter.Value()==0 {
                break
            }
            time.Sleep(time.Millisecond)
            sleep_cnt++
        }
        fmt.Println("crawling finished, used ", sleep_cnt, " ms")
    }
    

    到此,我就完成了全部的gotour内容,后面打算转到Effective GO进行学习,并且思考如何使用GO语言设计和实现微服务架构。

    相关文章

      网友评论

        本文标题:并行

        本文链接:https://www.haomeiwen.com/subject/johveftx.html