Hbase提供Thrift实现多语言的支持,python的happybase就是其中一个实现。
最近在使用happybase出现TTransportException和Broken pipe的异常,记录一下解决方案。
环境:
python:3.6.2
happbase:1.1.0
hbase:1.2.4
现象:要每几分钟左右往HBase插入一条数据,但是,在大约1分钟过后,被服务器断开了连接,然后再尝试插入的时候,抛出broken pipe.
复现代码:
import happybase
import time
def put(pool,data):
with pool.connection() as conn:
table = conn.table('Log')
row_key = str(int(time.time())) + '000001'
data = data
table.put(row_key, data)
if __name__ == '__main__':
pool = happybase.ConnectionPool(size=3, host='127.0.0.1', table_prefix='TL')
put(pool,{})
time.sleep(60) # 模拟间隔时间
put(pool, {})
分析:Broken pipe往往是因为客户端和服务器的TCP连接断开引起的。
通过google发现hbase thrift server有一个超时的配置hbase.thrift.server.socket.read.timeout,当客户端和服务器的连接超过该值没有收到消息的时候,该连接会被断开,默认值为60秒,正好跟本次现象发生的场景一致,每隔60秒连接断开。参考[HBASE-14926]https://issues.apache.org/jira/browse/HBASE-14926
查看happybase连接池相关的代码发现在with完成后connection并没有close,直接返回到连接池当中。
@contextlib.contextmanager
def connection(self, timeout=None):
connection = getattr(self._thread_connections, 'current', None)
return_after_use = False
if connection is None:
return_after_use = True
connection = self._acquire_connection(timeout)
with self._lock:
self._thread_connections.current = connection
try:
connection.open()
yield connection
except (TException, socket.error):
logger.info("Replacing tainted pool connection")
connection._refresh_thrift_client()
connection.open()
raise
finally:
if return_after_use:
del self._thread_connections.current
self._return_connection(connection)
而问题就出现在这里,返回到连接池中的connection在1分钟后没有使用,被断开了连接,当下一次再拿这个connection去请求资源就会出现Broken pipe的异常。
接下来就到了解决问题的时候,一般来讲可以有两个解决方案:
1.配置hbase.thrift.server.socket.read.timeout,增加超时时间
2.在使用connection后关闭连接
import happybase
import time
def put(pool,data):
with pool.connection() as conn:
table = conn.table('Log')
row_key = str(int(time.time())) + '000001'
data = data
table.put(row_key, data)
conn.close() # 关闭连接
if __name__ == '__main__':
pool = happybase.ConnectionPool(size=3, host='127.0.0.1', table_prefix='TL')
put(pool,{})
time.sleep(60) # 模拟间隔时间
put(pool, {})
网友评论