美文网首页Golang
redigo源码解析

redigo源码解析

作者: solohunter | 来源:发表于2018-12-09 14:13 被阅读0次

    前言

    redigo是用Go语言开发的Redis客户端,受到Redis官方的推荐

    redigo的源码量比较少,也比较清晰易读。redigo主要做了以下事项

    • 与Redis server建立连接
    • 按照RESP协议组装指令
    • 向Redis server发用指令
    • 接收Redis server返回的数据
    • 将返回数据解析成Go的数据类型
    • 连接池(Pool),支持设置最大的活动连接数,最大的空闲连接数

    源码概述

    redigo主要使用了Go官方的net、io和bufio包,这些包是Go对网络连接,I/O,Buffered I/O的抽象实现。

    在看Pool代码的过程中,对于如何控制最大连接数,笔者多花了一些时间才看明白。Pool使用buffered channel (可缓冲的管道,以下简称 buff chan) 来控制最大连接数。

    首先看一下活动连接和空闲连接的定义

    • 活动连接:经Pool创建,未被关闭。这里的关闭是指关闭网络连接,也就是断开与Redis server的连接
    • 空闲连接:活动连接使用完之后,需交还给Pool,Pool会将连接放入空闲连接列表,列表中的连接数量就是空闲连接数

    可以看出,活动连接是包含空闲连接的。

    Pool原理解读

    概述部分提到,Pool使用buff chan来控制最大连接数,这里也是源码中比较难以理解的地方,需要对Go的channel有较深的理解。

    Go有两种channel,chan和 buff chan,它们的特性如下

    • chan,读写都是同步、阻塞操作。写操作阻塞,等待写入的数据被读取;读操作阻塞,等待数据写入。
    • buff chan,读操作和chan类似;写操作在缓冲区未满时非阻塞,否则阻塞等待缓冲区有空位。

    Pool利用buff chan的缓冲区长度固定,及读操作的阻塞等待来控制最大连接数。

    源码解析

    redis/
    | -- redis.go // 定义接口
    | -- conn.go // 实现redis.go中定义的接口,用于与Redis server建立连接
    | -- -- type conn struct // 链接结构体,实现了 redis.go中定义的Conn接口
    | -- -- type dialOptions struct // 拨号选项,包括拨号器、拨号函数,超时配置,TLS配置等
    | -- -- type DialOption struct // 拨号选项设置结构体,只包含函数f,用于修改拨号选项
    | -- -- func Dial // 拨号函数,参数:网络协议,网络地址,拨号选项设置结构体,返回 conn
    | -- -- func write* // 写命令函数,用于将命Redis指令写入buffer io
    | -- reply.go // 将redis返回数据解析成go的数据类型

    连接池如何用channel控制打开的连接数

    redis/
    | -- pool.go // Get()用于获取连接,Close()用于归还连接
    | -- -- Pool.active // 活动连接数,每建立一个连接(Dial)active加1,每关闭(Close)一个连接,active减1
    | -- -- Pool.ch chan struct{} // 用来控制最大的活动连接数
    | -- -- Pool.lazyInit() // 将Pool.ch 初始化为buffered channel,长度为Pool.MaxActive,并将ch填满
    | -- -- Pool.get() // 先从Pool.ch中取出一个元素,如果最终建立新连接(Dial)失败,再往ch写入一个元素
    | -- -- Pool.put() // 往Pool.ch写入数据,所以get读取数据,put写入数据

    IF Wait == 1 AND MaxActive > 0
    AND MaxActive >= MaxIdle
    AND MaxActive == 3 AND MaxIdle == 2

    初始化:ch长度等于MaxActive, len(ch)=3
    Pool.Get() 首先从ch读出数据, 如果新建连接失败,再向ch写入数据,len(ch)不变,否则len(ch)-1
    GET#1-没有空闲连接,新建连接,len(ch)=2
    GET#2-没有空闲连接,新建连接,len(ch)=1
    GET#3-没有空闲连接,新建连接,len(ch)=0
    GET#4-ch空了,所以ch读操作阻塞,直到有连接被归还,put()往ch中写入数据,ch读阻塞解除,继续往下获取空闲连接,否则只能等待。

    这保证了最大活动连接数不会超过MaxActive。

    部分源代码

    func (p *Pool) Get() Conn {
        pc, err := p.get(nil)
        if err != nil {
            return errorConn{err}
        }
        return &activeConn{p: p, pc: pc}
    }
    
    func (p *Pool) get(ctx interface {
        Done() <-chan struct{}
        Err() error
    }) (*poolConn, error) {
    
        // Handle limit for p.Wait == true.
        if p.Wait && p.MaxActive > 0 {
            p.lazyInit()        // 初始化ch 
            if ctx == nil {
                <-p.ch          // 从ch读数据,如果ch为空则阻塞等待
            } else {
                select {
                case <-p.ch:
                case <-ctx.Done():
                    return nil, ctx.Err()
                }
            }
        }
        // ... ...
    }
    
    func (ac *activeConn) Close() error {
        // ... ...
        ac.p.put(pc, ac.state != 0 || pc.c.Err() != nil)
        return nil
    }
    
    func (p *Pool) put(pc *poolConn, forceClose bool) error {
        p.mu.Lock()
        
        // ... ...
    
        if p.ch != nil && !p.closed {
            p.ch <- struct{}{}    // 归还连接后往ch写入数据
        }
        p.mu.Unlock()
        return nil
    }
    

    参考链接

    相关文章

      网友评论

        本文标题:redigo源码解析

        本文链接:https://www.haomeiwen.com/subject/ztahhqtx.html