美文网首页
谈谈 happybase 的连接池

谈谈 happybase 的连接池

作者: throwsterY | 来源:发表于2017-12-17 22:22 被阅读584次

    我们先来看下 happybase 连接池的使用:

    import happybase
    hbase_pool = happybase.ConnectionPool(host=conf['hbase']['host'], port=conf['hbase']['port'], size=100)
    with hbase_pool.connection() as conn:
        # do sth
    

    可以看到一开始通过指定 host 和 port 初始化了一个大小为 100 的 hbase 连接池。使用 with 关键词从池子里取出了一个连接,通过这个连接我们可以完成对 hbase 的 CRUD 操作。知道怎么使用是远远不够的,遇到一些复杂问题可能会无从下手,所以看了下 happybase 连接池的源码,了解了连接池是怎么对 hbase 的连接进行管理的。

    class ConnectionPool(object):
        """
        Thread-safe connection pool.
    
        .. versionadded:: 0.5
    
        The `size` argument specifies how many connections this pool
        manages. Additional keyword arguments are passed unmodified to the
        :py:class:`happybase.Connection` constructor, with the exception of
        the `autoconnect` argument, since maintaining connections is the
        task of the pool.
    
        :param int size: the maximum number of concurrently open connections
        :param kwargs: keyword arguments passed to
                       :py:class:`happybase.Connection`
        """
        def __init__(self, size, **kwargs):
            if not isinstance(size, int):
                raise TypeError("Pool 'size' arg must be an integer")
    
            if not size > 0:
                raise ValueError("Pool 'size' arg must be greater than zero")
    
            logger.debug(
                "Initializing connection pool with %d connections", size)
    
            self._lock = threading.Lock()
            self._queue = queue.LifoQueue(maxsize=size)
            self._thread_connections = threading.local()
    
            connection_kwargs = kwargs
            connection_kwargs['autoconnect'] = False
    
            for i in range(size):
                connection = Connection(**connection_kwargs)
                self._queue.put(connection)
    
            # The first connection is made immediately so that trivial
            # mistakes like unresolvable host names are raised immediately.
            # Subsequent connections are connected lazily.
            with self.connection():
                pass
    
        def _acquire_connection(self, timeout=None):
            """Acquire a connection from the pool."""
            try:
                return self._queue.get(True, timeout)
            except queue.Empty:
                raise NoConnectionsAvailable(
                    "No connection available from pool within specified "
                    "timeout")
    
        def _return_connection(self, connection):
            """Return a connection to the pool."""
            self._queue.put(connection)
    
        @contextlib.contextmanager
        def connection(self, timeout=None):
            """
            Obtain a connection from the pool.
    
            This method *must* be used as a context manager, i.e. with
            Python's ``with`` block. Example::
    
                with pool.connection() as connection:
                    pass  # do something with the connection
    
            If `timeout` is specified, this is the number of seconds to wait
            for a connection to become available before
            :py:exc:`NoConnectionsAvailable` is raised. If omitted, this
            method waits forever for a connection to become available.
    
            :param int timeout: number of seconds to wait (optional)
            :return: active connection from the pool
            :rtype: :py:class:`happybase.Connection`
            """
    
            connection = getattr(self._thread_connections, 'current', None)
    
            return_after_use = False
            if connection is None:
                # This is the outermost connection requests for this thread.
                # Obtain a new connection from the pool and keep a reference
                # in a thread local so that nested connection requests from
                # the same thread can return the same connection instance.
                #
                # Note: this code acquires a lock before assigning to the
                # thread local; see
                # http://emptysquare.net/blog/another-thing-about-pythons-
                # threadlocals/
                return_after_use = True
                connection = self._acquire_connection(timeout)
                with self._lock:
                    self._thread_connections.current = connection
    
            try:
                # Open connection, because connections are opened lazily.
                # This is a no-op for connections that are already open.
                connection.open()
    
                # Return value from the context manager's __enter__()
                yield connection
    
            except (TException, socket.error):
                # Refresh the underlying Thrift client if an exception
                # occurred in the Thrift layer, since we don't know whether
                # the connection is still usable.
                logger.info("Replacing tainted pool connection")
                connection._refresh_thrift_client()
                connection.open()
    
                # Reraise to caller; see contextlib.contextmanager() docs
                raise
    
            finally:
                # Remove thread local reference after the outermost 'with'
                # block ends. Afterwards the thread no longer owns the
                # connection.
                if return_after_use:
                    del self._thread_connections.current
                    self._return_connection(connection)
    

    连接池最核心的代码就在上面,我们下面来分析一下。
    init方法内部,

    self._lock = threading.Lock()
    声明了一个线程锁
    self._queue = queue.LifoQueue(maxsize=size)
    声明了一个线程安全的先入后出队列,大小就是初始化的池子大小,用来存储 hbase 连接的
    self._thread_connections = threading.local()
    为不同线程对象保存一个本地变量
    
    for i in range(size):
        connection = Connection(**connection_kwargs)
        self._queue.put(connection)
    根据 size 大小,初始化 size 个连接,并放入到 queue 中
    

    那取连接怎么取呢?以及如何保存线程安全?我们看下 connection 方法

    @contextlib.contextmanager
        def connection(self, timeout=None):
            """
            Obtain a connection from the pool.
    
            This method *must* be used as a context manager, i.e. with
            Python's ``with`` block. Example::
    
                with pool.connection() as connection:
                    pass  # do something with the connection
    
            If `timeout` is specified, this is the number of seconds to wait
            for a connection to become available before
            :py:exc:`NoConnectionsAvailable` is raised. If omitted, this
            method waits forever for a connection to become available.
    
            :param int timeout: number of seconds to wait (optional)
            :return: active connection from the pool
            :rtype: :py:class:`happybase.Connection`
            """
    
            connection = getattr(self._thread_connections, 'current', None)
    
            return_after_use = False
            if connection is None:
                # This is the outermost connection requests for this thread.
                # Obtain a new connection from the pool and keep a reference
                # in a thread local so that nested connection requests from
                # the same thread can return the same connection instance.
                #
                # Note: this code acquires a lock before assigning to the
                # thread local; see
                # http://emptysquare.net/blog/another-thing-about-pythons-
                # threadlocals/
                return_after_use = True
                connection = self._acquire_connection(timeout)
                with self._lock:
                    self._thread_connections.current = connection
    
            try:
                # Open connection, because connections are opened lazily.
                # This is a no-op for connections that are already open.
                connection.open()
    
                # Return value from the context manager's __enter__()
                yield connection
    
            except (TException, socket.error):
                # Refresh the underlying Thrift client if an exception
                # occurred in the Thrift layer, since we don't know whether
                # the connection is still usable.
                logger.info("Replacing tainted pool connection")
                connection._refresh_thrift_client()
                connection.open()
    
                # Reraise to caller; see contextlib.contextmanager() docs
                raise
    
            finally:
                # Remove thread local reference after the outermost 'with'
                # block ends. Afterwards the thread no longer owns the
                # connection.
                if return_after_use:
                    del self._thread_connections.current
                    self._return_connection(connection)
    

    可以看到 connection 方法用@contextlib.contextmanager 装饰器修饰了,保证了在使用连接池的时候必须使用 with 关键词,在看连接池如何拿到一个连接之前,我们先看下连接的 yield 和释放相关代码:

    try:
        # Open connection, because connections are opened lazily.
        # This is a no-op for connections that are already open.
        connection.open()
    
        # Return value from the context manager's __enter__()
        yield connection
    
    except (TException, socket.error):
        # Refresh the underlying Thrift client if an exception
        # occurred in the Thrift layer, since we don't know whether
        # the connection is still usable.
        logger.info("Replacing tainted pool connection")
        connection._refresh_thrift_client()
        connection.open()
    
        # Reraise to caller; see contextlib.contextmanager() docs
        raise
    
    finally:
        # Remove thread local reference after the outermost 'with'
        # block ends. Afterwards the thread no longer owns the
        # connection.
        if return_after_use:
            del self._thread_connections.current
            self._return_connection(connection)
    

    可以看到拿到一个连接后,会 yield 出去,finally 里会把连接归还连接池,中间的 except 异常需要注意下,当某个连接在执行的时候出现问题时,会捕获异常,并 refresh 一个新的连接,保证最后 finally 归还给连接池的连接是可用的连接。except 捕获的异常必然是 with 代码内的,代码外的异常是无法捕获的,所以需要保证 with 代码块结束了,对连接的使用就结束了,不然就会出现多个线程占用同一个连接这种情况。类似 scan 操作,返回结果是生成器,最好转成 list 在 with 内部返回,不然直接返回生成器的话,with 代码外部遍历时候,其实还是在用这个连接,而其实 with 已结束,连接池就会认为连接已经用完了,会回收掉分配给其他的线程。
    下面看下连接的获得:

    connection = getattr(self._thread_connections, 'current', None)
    
    return_after_use = False
    if connection is None:
        # This is the outermost connection requests for this thread.
        # Obtain a new connection from the pool and keep a reference
        # in a thread local so that nested connection requests from
        # the same thread can return the same connection instance.
        #
        # Note: this code acquires a lock before assigning to the
        # thread local; see
        # http://emptysquare.net/blog/another-thing-about-pythons-
        # threadlocals/
        return_after_use = True
        connection = self._acquire_connection(timeout)
        with self._lock:
            self._thread_connections.current = connection
    

    会首先获取 _thread_connections 线程本地变量的 current 属性,每个线程的 current 属性都是独立的。注意不同线程的 _thread_connections 都会指向同一个对象,因为这个变量在连接池初始化的时候就确定了。但是 python 的 thread_local 重写了 getattr 方法,里面会调用一个 patch 方法,保证每个线程 local 变量的设置和读取都是独立的。
    下面就好理解了,如果连接为空,就去队列取一下,然后 set 到本地变量中。

    connection = self._acquire_connection(timeout)
    with self._lock:
        self._thread_connections.current = connection
    

    考虑一个问题,如果是协程模型,这个连接池模型还能 work 吗?
    如果是 gevent patch 的,是可以的,因为 gevet 会把 threading.local 这一套重写掉,每个协程拿到的对象都是不一样的。

    相关文章

      网友评论

          本文标题:谈谈 happybase 的连接池

          本文链接:https://www.haomeiwen.com/subject/ayeuwxtx.html