美文网首页网络编程魔法
Redispy 源码学习(四) --- 创建连接

Redispy 源码学习(四) --- 创建连接

作者: 人世间 | 来源:发表于2017-05-02 22:36 被阅读555次

    了解RESP协议,并实现了pack编码,下一步就是发送命令给redis服务器,等待结果返回就好啦。

    当然,想要发送命令,还需要先创建连接。如果连接都不存在,就无所谓发送命令了。我们已经了解发送网络数据需要先创建socket连接。因此下面写了一个简单的客户端代码,基于前面编码RESP的代码:

        command = Connection().pack_command(*args)
        print(command)
    
        import socket
    
        address = ('127.0.0.1', 6379)
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect(address)
        sock.sendall(command[0])
        print(sock.recv(1024))
    

    可以看到服务返回了并打印了b'+PONG\r\n'结果。一切工作正常。等等,如果每次我们发送数据都需要这么原始的创建socket,然后调用连接,发送数据,那么这个脚本也太丑了。因此,我们的主题就是学习redispy是如何抽象封装连接管理的。

    连接

    redispy定义了一个Connection类用于连接的管理,而定义了一个PythonParser类用来适配Hiredis。同时还实现了一个SocketBuffer方法用于收发处理socket数据。当然,本篇只讨论创建连接发送数据。

    我们的客户端代码为

        conn = Connection()
        conn.send_command("PING")
        print(conn.read_response())
    

    创建一个Connection实例,然后发送命令到redis服务器,最后再读取服务器的响应。当然,conn.read_response的源码分析暂时可以忽略。

    send_command 方法内会先打包编码命令,然后再调用send_packed_command方法发送命令。send_packed_command方法的源码如下:

        def send_packed_command(self, command):
            """将编码后的redis命令发送到redis服务器"""
            if not self._sock:
                self.connect()
            try:
                if isinstance(command, str):
                    command = [command]
                for item in command:
                    self._sock.sendall(item)
            except socket.timeout:
                self.disconnect()
                raise TimeoutError('Timeout writing to socket')
            except socket.error:
                e = sys.exc_info()[1]
                self.disconnect()
                if len(e.args) == 1:
                    errno, errmsg = 'UNKNOWN', e.args[0]
                else:
                    errno = e.args[0]
                    errmsg = e.args[1]
                raise ConnectionError("Error {} while writing to socket. {}.".format(errno, errmsg))
            except:
                self.disconnect()
                raise
    

    Connection实例化的时候会初始一些熟悉,例如 _sock和_parser对象,两者分别是用于通信的socket和解析resp协议的适配器。

    send_packed_command首先会检查_sock,即socket是否创建。如果尚未创建,则会调用connect方法创建连接。connect返回创建的socket连接。

    一旦有了socket,就可以直接调用sendall方法发送命令了。再socket交互的过程中,还得注意可能产生的异常和错误。

    创建连接

    Conneciton使用connect方法创建连接,该方法如下:

        def connect(self):
            if self._sock:
                return
            try:
                sock = self._connect()
            except socket.error:
                e = sys.exc_info()[1]
                raise ConnectionError(self._error_message(e))
    
            self._sock = sock
    
            try:
                self.on_connect()
            except RedisError:
                self.disconnect()
                raise
    
    

    同样也会先检查一下_sock对象,然后调用_connect方法创建socket。再创建完毕socket之后,还调用了一个on_connect方法做一些连接后的工作。

    _connect的源码略多,但是很清晰:

        def _connect(self):
            """ 创建 socket 连接,并返回socket对象
            """
            err = None
            for res in socket.getaddrinfo(self.host, self.port, 0, socket.SOCK_STREAM):
                family, socktype, proto, canonname, socket_address = res
                try:
                    # 创建 tcp socket 对象
                    sock = socket.socket(family, socktype, proto)
                    # TCP_NODELAY
                    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
    
                    if self.socket_keepalive:
                        sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
                        for k, v in iteritems(self.socket_keepalive_options):
                            sock.setsockopt(socket.SOL_TCP, k, v)
                    # 设置创建连接的超时时间
                    sock.settimeout(self.socket_connect_timeout)
                    sock.connect(socket_address)
                    # 设置连接之后的超时时间
                    sock.settimeout(self.socket_timeout)
                    return sock
                except socket.error as _:
                    err = _
                    if sock is not None:
                        sock.close()
            if err is not None:
                raise err
            raise socket.error("socket.getaddrinfo returned an empty list")
    

    首先调用socket的getaddrinfo方法,确定通信的族协议和socket类型。即IP4和tcp通信方式。然后设置socket选项,开启TCP_NODELAY模式,即禁用Nagle算法。TCP为了保证网络的性能,通常会把缓冲区的数据积累到一定程度再一起发送。而对于redis这种一问一答的通信方式,需要你尽可能的发送消息,而不用等待,因此会禁用掉该算法。

    Nagle算法的基本定义是任意时刻,最多只能有一个未被确认的小段。 所谓“小段”,指的是小于MSS尺寸的数据块,
    所谓“未被确认”,是指一个数据块发送出去后,没有收到对方发送的ACK确认该数据已收到。
    Nagle算法只允许一个未被ACK的包存在于网络,它并不管包的大小,因此它事实上就是一个扩展的停-等协议,只不过它是基于包停-等的,而不是基于字节停-等的。
    TCP_NODELAY 选项
    默认情况下,发送数据采用Nagle 算法。这样虽然提高了网络吞吐量,但是实时性却降低了,在一些交互性很强的应用程序来说是不允许的,
    使用TCP_NODELAY选项可以禁止Nagle 算法。

    然后就是对长连接(keepalive)的选项设置,最后设置连接的超时时间,然后调用socket的connect方法创建连接。创建连接后,自然要把连接对象的socket返回。

    连接后初始化

    redispy在创建了连接之后,会调用on_connect 方法做一些初始化工作:

        def on_connect(self):
            
            self._parser.on_connect(self)
            
            if self.password:
                self.send_command('AUTH', self.password)
                if nativestr(self.read_response()) != 'OK':
                    raise AuthenticationError('Invalid Password')
    
    
            if self.db:
                self.send_command('SELECT', self.db)
                if nativestr(self.read_response()) != 'OK':
                    raise ConnectionError('Invalid Database')
    

    on_connect会先通过PythonParser的on_connect方法,初始化ScoketBuffer,这样做的目的就是为了接下来接受redis的回复。暂时可以忽略,主要看其他的逻辑。

    如果需要认证,就再调用 send_command 方法发送认证的命令,如果创建连接指定了数据库,就发送选择数据库的指令,默认是选择数据库0。

    这里的代码很有意思,我们的入口就是 send_command 方法。send_command 内的流程中又会先检查连接,连接没有就创建,连接有了才返回。此时再次调用send_command方法,显然已经有了连接,因此代码会走到self._sock.sendall(item)发送命令。

    从这里也可以看出,redispy创建连接是惰性的,只有发送查询的命令的时候,才会检查连接,从而决定是否创建socket连接。

    总结

    redispy中创建连接并没有很多高深的内容。通过封装一个连接对象管理连接。代码不复杂并不代码没有用。实际上,正式因为设计了这么简单的连接方式,才为后面的连接池实现带来了便利。

    收发数据都是基于socket,socket创建是表示可以进入数据的收发状态,但这只是最底层的连接。redis的逻辑连接还需要我们认证或者选择数据库。因为后者已经涉及了socket数据通信,因此也是再连接创立之后的操作。redispy为了考虑这种方法,很好的复用了send_command 方法。通过on_connect的方式创建redis的逻辑连接。

    至于read_response的内容,将会是RESP协议的解析,和redis回复内容的话题。我们将会在接下来的文中分析。

    相关文章

      网友评论

        本文标题:Redispy 源码学习(四) --- 创建连接

        本文链接:https://www.haomeiwen.com/subject/gqbhtxtx.html