美文网首页
go_教程_03

go_教程_03

作者: Kate_Blog | 来源:发表于2018-11-29 08:55 被阅读1次

    channels

    channels: routines之前可以传递数据的管道

    • 默认的channel只能有对应的接受者(<- chan)
    • 才能发送数据到这个channel(chan <-)
    • 发送数据的channel (<- chan)
    • 接收数据的channel (chan <-)
        fmt.Println("-----channels------")
        messages := make(chan string)
        go func() {
            fmt.Println("i am routines")
            messages <- "ping"
        }()
        msg := <-messages
        fmt.Println(msg)
    

    buffering channels

    • 可以配置一个接收一定量参数列表的channel
    • 可配置接收一定数量的参数
        fmt.Println("-----buffering channels------")
        messages1 := make(chan string, 2)
        messages1 <- "buffered"
        messages1 <- "channel"
        fmt.Println(<-messages1)
        fmt.Println(<-messages1)
    

    channel synchronization 阻塞等待(同步)

    • 同步不同的goroutines的操作
    • 例如一个goroutine阻塞,等待另外一个goroutine结束
    // 使用channel通知,别的goroutine工作做完了
    func worker(done chan bool) {
        fmt.Println("working....")
        time.Sleep(time.Second)
        fmt.Println("done")
        done <- true
    }
    
        done := make(chan bool, 1)
        go worker(done) //启动一个异步的goroutine,传一个channel进去
        <-done          // 主线程阻塞直到收到这个channel里面的信号,继续往下执行
        // 去掉<- done, 程序会在worker没开始就退出
    

    channel directions 数据流方向

    //把msg写入pings这个channel
    func ping(pings chan<- string, msg string) {
        pings <- msg
    }
    
    //把ping这个channel的消息拿出来,写入pong这个channel
    func pong(pings <-chan string, pongs chan<- string) {
        msg := <-pings
        pongs <- msg
    }
    
        fmt.Println("------channel directions-------")
        //channel directions
        pings := make(chan string, 1)
        pongs := make(chan string, 1)
        ping(pings, "pass message")
        pong(pings, pongs)
        fmt.Println(<-pongs)
    

    select

    • 多个channel操作的等待
    • 结合goroutine和channels功能强大
    c1 := make(chan string)
        c2 := make(chan string)
    
        go func() {
            time.Sleep(1 * time.Second)
            c1 <- "one"
        }()
    
        go func() {
            time.Sleep(2 * time.Second)
            c2 <- "two"
        }()
    
        //同事等待两个并发的goroutines到来
        for i := 0; i < 2; i++ {
            select {
            case msg1 := <-c1:
                fmt.Println("received", msg1)
            case msg2 := <-c2:
                fmt.Println("received", msg2)
            }
        }
        //另一种实现方式
        //msg1 := ""
        //msg2 := ""
        // for {
        //   select {
        //   case msg1 = <-c1:
        //       fmt.Println("received", msg1)
        //   case msg2 = <-c2:
        //       fmt.Println("received",msg2)
        //   }
        //   if msg1 == "one" && msg2 == "two" {
        //       break
        //   }
        // }
    
    

    Timeouts 超时

    • 调用外部资源,或者有执行时间限制的时候,timeout很有用
    • 使用channel和select实现很方便
    • select和timeout的模式,通过channel交流的goroutine实现
    • go的很多重要的功能都是基于channel和select
    c3 := make(chan string, 1)
        go func() {
            time.Sleep(2 * time.Second) // 模拟2秒
            c3 <- "result 1"
        }()
        select {
        case res := <-c3: //获取channel的值
            fmt.Println(res)
        case <-time.After(1 * time.Second): //获取channel的值超时
            fmt.Println("timeout 1")
        }
    
        c4 := make(chan string, 1)
        go func() {
            time.Sleep(2 * time.Second)
            c4 <- "result 2"
        }()
        select {
        case res := <-c4: //获取channel的值
            fmt.Println(res)
        case <-time.After(3 * time.Second): //获取channel的值超时
            fmt.Println("timeout 2")
        }
    
    

    Non-Blocking Channel Operations

    • 非阻塞接收,使用select的default实现:
    • 如果messages2接收到消息,执行case
    • 如果messages2没有接收到消息,执行default
        messages2 := make(chan string)
        signals := make(chan bool)
        select {
            case msg := <-messages2:
                fmt.Println("received message2", msg)
            default:
                fmt.Println("no messages2 received")
            }
        
    
    • 非阻塞发送,使用select的default实现:
    • msg2是无法发送到messages2,因为这个channel没有buffer,
    • 也没有接收者,所以default执行
        msg2 := "hi"
        select {
        case messages2 <- msg2:
            fmt.Println("sent message", msg2)
        default:
            fmt.Println("no message sent")
        }
    
    • 对多个channel实现,非阻塞的接收:
    • 使用多个case实现
    select {
        case msg := <-messages2:
            fmt.Println("received message", msg)
        case sig := <-signals:
            fmt.Println("received signal", sig)
        default:
            fmt.Println("no activity")
        }
    

    Closing Channels

    Closing Channels: 没有值可以发送到这个channel

    • 体现出接收者把工作做完
    • 当主线程没有工作输入到jobs这个channel,就将它关闭
    • 其中goroutine是worker线程
        jobs := make(chan int, 5)
        done1 := make(chan bool)
    
    • work goroutine
    • 当jobs这channel已经关闭并且所有值都被接收后,more=false
    • 当more=false, 把信息输入到done1这个channel通知主线程
        go func() {
            for {
                j, more := <-jobs
                if more {
                    fmt.Println("received job", j)
                } else {
                    fmt.Println("received all jobs")
                    done1 <- true
                    return
                }
            }
    
        }()
    
    • 主线程发送3个job到jobs这个channel
    • 发送完成就关闭这个channel
    for j := 1; j <= 3; j++ {
            jobs <- j
            fmt.Println("sent job", j)
        }
        close(jobs)
        fmt.Println("send all jobs")
    
    • 主线程阻塞直到done1这个channel接收到信息,叫醒主线程
        <-done1
    

    Range over Channels

    • 使用for loop遍历channel里面的值
    • for loop 循环完2次会结束,因为我们前面关闭了channel
    • 说明:关闭一个为不空的channel是可以的,
    • 并且关闭后,值还是可以被接收的
    queue := make(chan string, 2)
        queue <- "one"
        queue <- "two"
        close(queue)
    
        for elem := range queue {
            fmt.Println(elem)
        }
    
    

    Timers

    • 指定在未来的一个时间点执行一次
    • timer1创建的时候往timer1.C这个channel发送一个值
    • 当从timer1.C接收到值的时候这个channel就过期
    • 这个接收到值的时间就是创建timer1的时候设置的时间
        timer1 := time.NewTimer(2 * time.Second)
    
        <-timer1.C // timer.C 是一个channel,接收值
        fmt.Println("timer 1 expired")
    
        timer2 := time.NewTimer(time.Second)
        go func() {
            <-timer2.C //异步接收值
            fmt.Println("timer 2 expired")
        }()
        stop2 := timer2.Stop() //手动停止这个timer
        if stop2 {
            fmt.Println("timer 2 stopped")
        }
    

    Tickers

    • 你想在一个时间间隔内重复做一件事
    • 创建一个ticker的时候就会往ticker.C发送值,
    • 遍历这个ticker.C的channel的值,就可以在间隔的时间做操作
    • 每个遍历出值刚刚好ticker设置好的间隔时间
    fmt.Println("---Tickers---")
        //创建一个500ms的ticker,
        ticker := time.NewTicker(500 * time.Millisecond)
        go func() {
            // 遍历这个channel的range
            for t := range ticker.C {
                fmt.Println("ticker at ", t)
            }
        }()
        //1600ms之后手动停止这个ticker
        time.Sleep(1600 * time.Millisecond)
        ticker.Stop()
        fmt.Println("ticker stopped")
    

    worker pools

    • 使用goroutine和channel实现worker pool
    • <-chan是发送的channel
    • chan<-是接收的channel
    func worker2(id int, jobs <-chan int, results chan<- int) {
        for j := range jobs {
            fmt.Println("worker", id, "started  job", j)
            time.Sleep(time.Second)
            fmt.Println("worker", id, "finished job", j)
            results <- j * 2
        }
    }
    
    
        fmt.Println("---worker pools---")
        /**
        - 使用goroutine和channel实现worker pool
        */
        //jobs1的channel用来发送工作
        //result的channel用来收集结果
        jobs1 := make(chan int, 100)
        result := make(chan int, 100)
        //启动3个worker
        for w := 1; w <= 3; w++ {
            go worker2(w, jobs1, result)
        }
    
        //发送5个工作到jobs1这个channel,发送完后关闭
        for j := 1; j <= 5; j++ {
            jobs1 <- j
        }
        close(jobs1)
    
        //收集所有工作的结果
        for a := 1; a <= 5; a++ {
            <-result
        }
    

    rate limiting

        fmt.Println("---rate limiting---")
        requests := make(chan int, 5)
        for i := 1; i <= 5; i++ {
            requests <- i
        }
        close(requests)
        //limiter channel 每200ms接收一个值
        limiter := time.Tick(200 * time.Millisecond)
    
        for req := range requests {
            //阻塞,通过获取limiter channel的值,再执行req
            //即使每200ms才会执行一次req
            <-limiter
            fmt.Println("requst :", req, time.Now())
        }
        burstyLimiter := make(chan time.Time, 3)
    
        //Fill up the channel to represent allowed bursting
        for i := 0; i < 3; i++ {
            burstyLimiter <- time.Now()
        }
    
        // 异步填充这个channel
        go func() {
            //200ms内,允许添加3个值到burstyLimiter channel
            for t := range time.Tick(200 * time.Millisecond) {
                fmt.Println("t : ", t)
                burstyLimiter <- t
            }
        }()
    
        //请求过来,放入burstyRequest channel
        burstyRequest := make(chan int, 9)
        for i := 1; i <= 9; i++ {
            burstyRequest <- i
        }
        close(burstyRequest)
    
        // 9个请求的前3个请求,可以在200毫秒内处理完
        for req := range burstyRequest {
            <-burstyLimiter
            fmt.Println("request", req, time.Now())
        }
    
        /**
    
        Running our program we see the first batch of requests handled once every ~200 milliseconds as desired.
        For the second batch of requests we serve the first 3 immediately because of the burstable rate limiting, then serve the remaining 2 with ~200ms delays each.
        */
    

    atomic counters

    //被多个goroutines使用的原子化的计数器,使用sync/atomic package
        var ops uint64
        //开启50个goroutines每个线程都不断累加这个变量
        for i := 0; i < 50; i++ {
            go func() {
                //for { //改为每个线程都加1,结果刚刚好50说明,这个变量是并发安全的
                //原子化增加方法,传入变量内存地址
                atomic.AddUint64(&ops, 1)
                fmt.Println("tmp ops: ", atomic.LoadUint64(&ops))
                //等待1ms
                time.Sleep(time.Millisecond)
                //}
            }()
        }
        time.Sleep(time.Second)
        //为了安全地使用这个变量,当这个变量被过个goroutines使用时,
        // 我们从当前值复制一个副本
        opsFinal := atomic.LoadUint64(&ops)
        fmt.Println("ops: ", opsFinal)
    

    Mutexes

    /**
        在被多个goroutines调用时,
        上面使用了atomic operations去管理简单的计数器状态,
        对于更加复杂的情况,我们使用mutex去安全获取数据
        */
    
        var state = make(map[int]int)
    
        var mutex = &sync.Mutex{}
    
        var readOps uint64
        var writeOps uint64
    
        for r := 0; r < 100; r++ {
            go func() {
                total := 0
                for {
                    key := rand.Intn(5)
                    mutex.Lock()
                    total += state[key]
                    mutex.Unlock()
                    atomic.AddUint64(&readOps, 1)
                    time.Sleep(time.Millisecond)
                }
            }()
    
            for w := 0; w < 10; w++ {
                go func() {
                    key := rand.Intn(5)
                    val := rand.Intn(100)
                    mutex.Lock()
                    state[key] = val
                    mutex.Unlock()
                    atomic.AddUint64(&writeOps, 1)
                    time.Sleep(time.Millisecond)
                }()
            }
        }
    
        time.Sleep(time.Second)
    
        readOpsFinal := atomic.LoadUint64(&readOps)
        fmt.Println("readOps: ", readOpsFinal)
        writeOpsFinal := atomic.LoadUint64(&writeOps)
        fmt.Println("writeOps: ", writeOpsFinal)
    
        mutex.Lock()
        fmt.Println("state: ", state)
        mutex.Unlock()
    
    

    Stateful Goroutines

    • 上面使用显式锁和互斥(mutexes)来实现同步代码块在不同的goroutines之间
    • 另外一种方式是使用goroutines和channel内置的同步功能,去实现。
    • channel-based方式的原理:共享内存只属于一个goroutine所有
     // TODO:
    

    sort 排序

    • sort方法对于内置类型有效
    • in-place sorting, 所以返回同一个slice而不是一个新的对象
    strs := []string{"c","a","b"}
        sort.Strings(strs)
        fmt.Println("strings: ",strs)
    
        ints := []int{7,2,4}
        sort.Ints(ints)
        fmt.Println("Ints: ",ints)
    
        //- 判断是否已经排序
        s := sort.IntsAreSorted(ints)
        fmt.Println("sorted:",s)
    

    sort by function 重写sort接口自定义查询

       // todo
    

    panic 异常捕捉

        panic("a problem") //这函数会主断程序
        _, err := os.Create("./tmp/file")
        if err != nil {
            panic(err)
        }
    

    defer 延迟执行

    func createFile(p string) *os.File {
        fmt.Println("creating")
        f, err := os.Create(p)
        if err != nil {
            panic(err)
        }
        return f
    }
    func writeFile(f *os.File) {
        fmt.Println("writing")
        fmt.Fprintf(f,"data")
    }
    func closeFile(f *os.File){
        fmt.Println("closing")
        f.Close()
    }
    
    func main(){
        f := createFile("/tmp/defer.txt")
        defer closeFile(f) //这个方法会在writeFile方法执行后,关闭main主程序之前执行
        writeFile(f)
    
    

    相关文章

      网友评论

          本文标题:go_教程_03

          本文链接:https://www.haomeiwen.com/subject/ijuoqqtx.html