美文网首页我爱编程
Hbase之happybase[TTransportExcept

Hbase之happybase[TTransportExcept

作者: vcancy | 来源:发表于2018-05-23 17:58 被阅读0次

Hbase提供Thrift实现多语言的支持,python的happybase就是其中一个实现。

最近在使用happybase出现TTransportException和Broken pipe的异常,记录一下解决方案。

环境:
python:3.6.2
happbase:1.1.0
hbase:1.2.4

现象:要每几分钟左右往HBase插入一条数据,但是,在大约1分钟过后,被服务器断开了连接,然后再尝试插入的时候,抛出broken pipe.

复现代码:

import happybase
import time

def put(pool,data):
    with pool.connection() as conn:
        table = conn.table('Log')
        row_key = str(int(time.time())) + '000001'
        data = data
        table.put(row_key, data)

if __name__ == '__main__':
    pool = happybase.ConnectionPool(size=3, host='127.0.0.1', table_prefix='TL')
    put(pool,{})
    time.sleep(60) # 模拟间隔时间
    put(pool, {})

分析:Broken pipe往往是因为客户端和服务器的TCP连接断开引起的。

通过google发现hbase thrift server有一个超时的配置hbase.thrift.server.socket.read.timeout,当客户端和服务器的连接超过该值没有收到消息的时候,该连接会被断开,默认值为60秒,正好跟本次现象发生的场景一致,每隔60秒连接断开。参考[HBASE-14926]https://issues.apache.org/jira/browse/HBASE-14926

查看happybase连接池相关的代码发现在with完成后connection并没有close,直接返回到连接池当中。

    @contextlib.contextmanager
    def connection(self, timeout=None):
        connection = getattr(self._thread_connections, 'current', None)
        return_after_use = False
        if connection is None:
            return_after_use = True
            connection = self._acquire_connection(timeout)
            with self._lock:
                self._thread_connections.current = connection
        try:
            connection.open()
            yield connection
        except (TException, socket.error):
            logger.info("Replacing tainted pool connection")
            connection._refresh_thrift_client()
            connection.open()
            raise
        finally:
            if return_after_use:
                del self._thread_connections.current
                self._return_connection(connection)

而问题就出现在这里,返回到连接池中的connection在1分钟后没有使用,被断开了连接,当下一次再拿这个connection去请求资源就会出现Broken pipe的异常。

接下来就到了解决问题的时候,一般来讲可以有两个解决方案:

1.配置hbase.thrift.server.socket.read.timeout,增加超时时间

2.在使用connection后关闭连接

import happybase
import time

def put(pool,data):
    with pool.connection() as conn:
        table = conn.table('Log')
        row_key = str(int(time.time())) + '000001'
        data = data
        table.put(row_key, data)
        conn.close() # 关闭连接

if __name__ == '__main__':
    pool = happybase.ConnectionPool(size=3, host='127.0.0.1', table_prefix='TL')
    put(pool,{})
    time.sleep(60) # 模拟间隔时间
    put(pool, {})

相关文章

网友评论

    本文标题:Hbase之happybase[TTransportExcept

    本文链接:https://www.haomeiwen.com/subject/osfdjftx.html