美文网首页
视频笔记二:RPC 和线程

视频笔记二:RPC 和线程

作者: xiaohao204 | 来源:发表于2020-07-14 17:04 被阅读0次

    为什么用 Go

    1. 语法先进。在语言层面支持线程(goroutine)和管道(channel)。对线程间的加锁、同步支持良好。
    2. 类型安全(type safe)。内存访问安全(memory safe),很难写出像 C++ 一样内存越界访问等 bug。
    3. 支持垃圾回收(GC)。不需要用户手动管理内存,这一点在多线程编程中尤为重要,因为在多线程中你很容易引用某块内存,然后忘记了在哪引用过。
    4. 简洁直观。没 C++ 那么多复杂的语言特性,并且在报错上很友好。

    线程(Threads)

    线程为什么这么重要?因为他是我们控制并发的主要手段,而并发是构成分布式系统的基础。在 Go 中,你可以将 goroutine 认为是线程,以下这两者混用。 每个线程可以有自己的内存栈、寄存器,但是他们可以共享一个地址空间。

    使用原因

    IO concurrency(IO 并发):一个历史说法,以前单核时,IO 是主要瓶颈,为了充分利用 CPU,一个线程在进行 IO 时,可以让出 CPU,让另一个线程进行计算、读取或发送网络消息等。在这里可以理解为:你可以通过多个线程并行的发送多个网络请求(比如 RPC、HTTP 等),然后分别等待其回复。

    Parallelism(并行):充分利用多核 CPU。

    关于并发(concurrency)和并行(parallelism)的区别和联系,可以看这篇文章。记住两个关键词即可:逻辑并发设计 vs 物理并行执行。

    Convenience(方便):比如可以在后台启动一个线程,定时执行某件事、周期性的检测什么东西(比如心跳)。

    Q&A:

    1. 不使用线程还能如何处理并发?基于事件驱动的异步编程。但是多线程模型更容易理解一些,毕竟每个线程内执行顺序和你的代码顺序是大体一致的。
    2. 进程和线程的区别?进程是操作系统提供的一种包含有独立地址空间的一种抽象,一个 Go 程序启动时作为一个进程,可以启动很多线程(不过我记得 Goroutine 是用户态的执行流)。

    使用难点(challenges)

    共享内存易出错。一个经典的问题是,多个线程并行执行语句:n = n + 1 时,由于该操作不是原子操作,在不加锁时,很容易出现 n 为非期望值。

    我们称这种情况为竞态 (race):即两个以上的线程同时试图改变某个共享变量。

    解决的方法是加锁,但如何科学的加锁以兼顾性能避免死锁又是一门学问。

    Q&A:

    1. Go 是否知道锁和资源(一些共享的变量)间的映射?Go 并不知道,它仅仅就是等待锁、获取锁、释放锁。需要程序员在脑中、逻辑上来自己维护。
    2. Go 会锁上一个 Object 的所有变量还是部分?和上个问题一样,Go 不知道任何锁与变量之间的关系。Lock 本身的源语很简单,goroutine0 调用 mu.Lock 时,没有其他 goroutine 持有锁,则 goroutine0 获取锁;如果其他 goroutine 持有锁,则一直等待直到其释放锁;当然,在某些语言,如 Java 里,会将对象或者实例等与锁绑定,以指明锁的作用域。
    3. Lock 应该是某个对象的私有变量?如果可以的话,最好这样做。但如果由跨对象的加锁需求,就需要拿出来了,但要注意避免死锁。

    线程协调(Coordination)

    1. channels:go 中比较推荐的方式,分阻塞和带缓冲。
    2. sync.Cond:信号机制。
    3. waitGroup:阻塞知道一组 goroutine 执行完毕,后面还会提到。

    死锁(DeadLock)

    产生条件:多个锁,循环依赖,占有并等待。

    如果你的程序不干活了,但是又没死,那你就需要看看是否死锁了。

    爬虫(Web crawler)

    1. 从一个种子网页 URL 开始
    2. 通过 HTTP 请求,获取其内容文本
    3. 解析其内容包含的所有 URL,针对所有 URL 重复过程 2,3
      为了避免重复抓取,需要记下所有抓取过的 URL。

    由于:

    1. 网页数量巨大

    2. 网络请求较慢

      一个接一个的抓取用时太长,因此需要并行抓取。这里面有个难点,就是如何判断已经抓取完所有网页,并需要结束抓取。

    抓取代码

    代码在https://pdos.csail.mit.edu/6.824/notes/crawler.go中有。

    串行爬取。深度优先遍历(DFS )全部网页构成的图结构,利用一个名为 fetched 的 set 来保存所有已经抓取过的 URL。

    func Serial(url string, fetcher Fetcher, fetched map[string]bool) {
      if fetched[url] {
        return
      }
      fetched[url] = true
      urls, err := fetcher.Fetch(url)
      if err != nil {
        return
      }
      for _, u := range urls {
        Serial(u, fetcher, fetched)
      }
      return
    }
    

    并行爬取

    1. 将抓取部分使用 go 关键字变为并行。但如果仅这么改造,不利用某些手段(sync.WaitGroup)等待子 goroutine,而直接返回,那么可能只会抓取到种子 URL,同时造成子 goroutine 的泄露。
    2. 如果访问已经抓取的 URL 集合 fetched 不加锁,很可能造成多次拉取同一个网页。

    WaitGroup

    var done sync.WaitGroup
    for _, u := range urls {
      done.Add(1)
      go func(u string) {
        defer done.Done()
        ConcurrentMutex(u, fetcher, f)
      }(u) // u 被拷贝
    }
    done.Wait()
    

    WaitGroup 内部维护了一个计数器:调用 wg.Add(n) 时候会增加 n;调用 wait.Done() 时候会减少 1。这时候调用 wg.Wait() 会一直阻塞直到当计数器变为 0 。所以 WaitGroup 很适合等待一组 goroutine 都结束的场景。

    Q&A

    1. 如果 goroutine 异常退出没有调用 wg.Done() 怎么办?可以使用 defer 将其写在 goroutine 开始:defer wg.Done()

    2. 两个 goroutine 同时调用 wg.Done() 会有竞争(race),以至于内部计数器不能正确减少两次吗?WaitGroup 应该有相应机制(锁什么的)来保证 Done() 的原子性。

    3. 定义匿名函数时,匿名函数中变量和外层函数同名变量间的关系?这是个闭包(closure)问题。如果匿名函数中变量没有被参数覆盖(如上述代码中 fetcher),就会和外层同名变量引用同一个地址。如果通过传参传递(如上述代码中 u),哪怕参数和外层变量看起来一样,但匿名函数使用的也是传进来的参数,而非外层变量;尤其针对 for 循环变量,我们通常通过参数来将其在调用时拷贝一次,否则 for 循环启动的所有 goroutine 都会指向这个不断被 for 循环赋值改变的变量。

      对于闭包,go 中有个“变量逃逸”(Variable Escape)的说法,如果某个变量在函数声明周期结束时仍被引用,则将其分被到堆而非函数栈上。对闭包来说,某个变量同时被内层和外层函数引用,则其会被分配到堆上。

    4. 既然字符串 u 是不可变(immutable)的,为什么所有 goroutine 还会引用到不断变化的值?string 的确是不可变的,但是 u 的值一直在变,而 goroutine 和外层 goroutine 共享 u 的引用。

    去掉锁

    如果在更新 map 的时候去掉锁,运行几次发现并没有什么异常,因为 race 其实很难检测。好在 go 提供了竞态分析工具帮你来找到潜在含有竞态的地方:go run -race crawler.go

    注意该工具没有做静态分析,而是在动态执行过程中观察、记录各个 goroutine 的执行轨迹,进行分析。

    线程数量

    Q&A

    1. 该代码在整个运行中会同时多少线程在运行(goroutine)?
      该代码并没有做明显的限制,但是其明显和 URL 数量、抓取时间正相关。例子中输入只有五个 URL,因此没有什么问题。但在现实中,这么做可能会同时启动上百万个 goroutine。因此一个改进是,实现启动一个固定数量的 worker 池子,每个 worker 干完后就去要 / 被分配下一个任务。

    使用 channel 通信

    我们可以实现一个新的爬虫版本,不用锁 + 共享变量,而用 go 中内置的语法:channel 来通信。具体做法类似实现一个生产者消费者模型,使用 channel 做消息队列。

    1. 初始将种子 url 塞进 channel。
    2. 消费者:master 不断从 channel 中取出 urls,判断是否抓取过,然后启动新的 worker goroutine 去抓取。
    3. 生产者:worker goroutine 抓取到给定的任务 url,并将解析出的结果 urls 塞回 channel。
    4. master 使用一个变量 n 来追踪发出的任务数;往发出一份任务增加一;从 channel 中获取并处理完一份结果(即将其再安排给 worker)减掉一;当所有任务都处理完时,退出程序。
    func worker(url string, ch chan []string, fetcher Fetcher) {
      urls, err := fetcher.Fetch(url)
      if err != nil {
        ch <- []string{}
      } else {
        ch <- urls
      }
    }
    
    func master(ch chan []string, fetcher Fetcher) {
      n := 1
      fetched := make(map[string]bool)
      for urls := range ch {
        for _, u := range urls {
          if fetched[u] == false {
            fetched[u] = true
            n += 1
            go worker(u, ch, fetcher)
          }
        }
        n -= 1
        if n == 0 {
          break
        }
      }
    }
    
    func ConcurrentChannel(url string, fetcher Fetcher) {
      ch := make(chan []string)
      go func() {
        ch <- []string{url}
      }()
      master(ch, fetcher)
    }
    

    Q&A:

    1. master 读 channel,多 worker 写 channel,不会有竞争问题吗?channel 是线程安全的。
    2. channel 不需要最后 close 吗?我们用 n 追踪了所有执行中的任务数,因此当 n 为 0 退出时,channel 中不存在任何任务 / 结果,因此 master/worker 都不会对 channel 存在引用,稍后 gc collector 会将其回收。
    3. 为什么在 ConcurrentChannel 需要用 goroutine 往 channel 中写一个 url?否则 master 在读取的时候会一直阻塞。并且 channel 是一个非缓冲 channel,如果不用 goroutine,将会永远阻塞在写的时候。

    相关文章

      网友评论

          本文标题:视频笔记二:RPC 和线程

          本文链接:https://www.haomeiwen.com/subject/peyfhktx.html