美文网首页大数据
使用Python操作Hive

使用Python操作Hive

作者: 9c46ece5b7bd | 来源:发表于2018-10-28 17:30 被阅读59次

    Python操作Hive

    注意:想要使用hive,必须要有一个可用的hive集群,同时为了保证可用使用API操作hive,我们需要要求提供hiveserver2服务

    假设我们的hiveserver2地址为10.0.1.18:10000,且用户为hdfs.使用PyHive库链接Hive.

    安装pyhive模块

    # 过程中可能需要依赖sasl,thrift等相关服务,如有需要可以使用系统的包管理器安装(apt-get或yum)
    pip install sasl thrift thrift-sasl PyHive
    

    Python链接Hive以及基本使用

    $ cat pytest_hive.py
    # 导入hive模块
    from pyhive import hive
    
    # 获取一个hive链接对象(链接到HiveServer2上)
    ## Connection类的__init__方法:__init__(self, host=None, port=None, username=None, database=u'default', auth=None, configuration=None, kerberos_service_name=None, password=None, thrift_transport=None)
    hiveconn = hive.Connection(host='10.0.1.18', port=10000, username='hdfs', database='aiops')
    
    # 使用连接的cursor()方法获取一个游标对象
    hivecur = hiveconn.cursor()
    
    # 使用游标对象的execute()方法进行执行hivesql语句
    ## execute(self, operation, parameters=None, **kwargs)
    hivecur.execute("show databases")
    ## executemany(self, operation, seq_of_parameters) method of pyhive.hive.Cursor instance 参数是一个序列
    hivecur.executemany()
    
    # 使用游标对象的fetch类方法获取执行结果(fetchone和fetchall以及fetchmany)
    onedata = hivecur.fetchone()
    alldata = hivecur.fetchall()
    ## fetchmany(self, size=None) method of pyhive.hive.Cursor instance
    manydata = hivecur.fetchmany()
    
    # 关闭cursor游标对象和conn连接对象
    hivecur.close()
    hiveconn.close()
    
    # hive的回滚操作
    hiveconn.rollback()
    
    

    尝试用python脚本进行数据库操作

    1. 数据库查询操作

    # 首先我们使用pyhive库链接hive并查看指定数据库下的表
    $ cat pyhive_test.py
    import sys
    reload(sys)
    sys.setdefaultencoding('utf-8')
    from pyhive import hive
    
    class hiveObj:
        def __init__(self,host,user,dbname=u'default',port=10000):
            self.host = host
            self.dbname = dbname
            self.user = user
            self.port = port
        def hiveConIns(self):
            conn = hive.Connection(host=self.host, port=self.port, username=self.user, database=self.dbname)
            return conn
        #通常查询个别数量的数据建议在sql中进行优化,可以仅使用cursor的fetchall()方法进行批量操作
        def querydata(self,sql,args=None):
            conn = self.hiveConIns()
            cur = conn.cursor()
            cur.execute(sql,args)
            alldata = cur.fetchall()
            cur.close()
            #cur.fetch类方法返回一个[tuple,tuple]
            for data in alldata:
                print(data)
            conn.close()
    
    if __name__ == '__main__':
        #默认database为default,默认port为10000
        hiveobj = hiveObj("10.0.1.18","hdfs")
        #查询数据
        sql = '''show tables'''
        hiveobj.querydata(sql)
    
    $ python pyhive_test.py
    (u'asset',)
    
    
    

    2. 数据库更新操作

    思考:其实数据库可以分为两种操作(读和写),一种为单纯的查询操作,不会对库表结构和数据造成变更,也即为读操作;另外一种为写操作,会对库表结构和数据造成的变更操作,也即写操作.

    # 给我们的hiveObj类增加一个写数据操作方法
    $ cat pyhive_test.py
    ....
    ....
        def changedata(self,sql,args=None):
            conn = self.hiveConIns()
            cur = conn.cursor()
            try:
                #做一个粗暴的判断当args是list时就进行批量插入
                if isinstance(args,list):
                    #executemany(sql,args)方法args支持tuple或者list类型
                    cur.executemany(sql,args)
                else:
                    #execute(sql,args)方法args支持string,tuple,list,dict
                    cur.execute(sql,args)
                conn.commit()
            except Exception as e:
                #因为hive不支持事务,因此虽然提供了rollback()但是是没用的
                #conn.rollback()
                print(e)
            finally:
                cur.close()
                conn.close()
    
    
    
    # 使用创建表来模拟库表变更(实际上库的变更操作应该由专业的管理员进行审核后操作)
    if __name__ == '__main__':
        #默认database为default,默认port为10000
        hiveobj = hiveObj("10.0.1.18","hdfs")
        #查询数据
        sql = '''show tables'''
        hiveobj.querydata(sql)
    
        #hive库表变更操作
        tabledesc = '''
         create table appinfo (
            appname string,
            level string,
            leader string,
            dep string,
            ips  array<string>)
         ROW FORMAT DELIMITED
         FIELDS TERMINATED BY '|'
         COLLECTION ITEMS TERMINATED BY ','
        '''
        print("creating a table....")
        hiveobj.changedata(tabledesc)
        hiveobj.querydata(sql)
    
    $ python pyhive_test.py
    (u'asset',)
    creating a table....
    (u'appinfo',)
    (u'asset',)
    

    3. 进行数据加载和读取操作

    注意:上面其实我们已经封装了两个抽象的读写方法,可以对hive表进行数据加载和读取操作了

    # 假如我们的hdfs上已经存在一份如下结构化的数据
    $ hdfs dfs -cat /ips.txt;
    data-web|p0|bgbiao|ops1|10.0.0.1,10.0.0.2
    data-api|p0|biaoge|sre1|192.168.0.1,192.168.0.2
    data-models|p1|xxbandy|sre1|10.0.0.3,192.168.0.3
    
    $ cat pyhive_test.py
    ...
    ...
    if __name__ == '__main__':
            #首先进行将hdfs中的数据加载到appinfo表中,加载完成后查询appinfo表
        sql1 = "load data  inpath 'hdfs://hdfs-name/ips.txt' overwrite into table appinfo"
        hiveobj.changedata(sql1)
        hiveobj.querydata('select * from appinfo')
    
    $ python pyhive_test.py
    (u'data-web', u'p0', u'bgbiao', u'ops1', u'["10.0.0.1","10.0.0.2"]')
    (u'data-api', u'p0', u'biaoge', u'sre1', u'["192.168.0.1","192.168.0.2"]')
    (u'data-models', u'p1', u'xxbandy', u'sre1', u'["10.0.0.3","192.168.0.3"]')
    
    # 接下来我们对上述表进行一个拆分查询
    $ cat pyhive_test.py
    ...
    ...
    if __name__ == '__main__':
        #对array对象中的元素进行遍历查询
        sql = "select ip,appname,leader,dep from appinfo  LATERAL VIEW explode(ips) appinfo  AS ip"
        hiveobj.querydata(sql)
    
    # 这样子我们就知道每个ip对应的关联关系了
    $ python pyhive_test.py
    (u'10.0.0.1', u'data-web', u'bgbiao', u'ops1')
    (u'10.0.0.2', u'data-web', u'bgbiao', u'ops1')
    (u'192.168.0.1', u'data-api', u'biaoge', u'sre1')
    (u'192.168.0.2', u'data-api', u'biaoge', u'sre1')
    (u'10.0.0.3', u'data-models', u'xxbandy', u'sre1')
    (u'192.168.0.3', u'data-models', u'xxbandy', u'sre1')
    
    # 临时表的创建和使用
        #对array对象中的元素进行遍历查询[临时表的创建第一次必须使用create table name as select ],更新数据需要使用[insert into|overwrite table name select] into是追加数据,overwrite是覆盖数据
        #sql = "create  table tmpapp as select ip,appname,leader,dep from appinfo  LATERAL VIEW explode(ips) appinfo  AS ip"
        #sql = "insert into table tmpapp select ip,appname,leader,dep from appinfo  LATERAL VIEW explode(ips) appinfo  AS ip"
        sql = "insert overwrite table tmpapp select ip,appname,leader,dep from appinfo  LATERAL VIEW explode(ips) appinfo  AS ip"
        hiveobj.changedata(sql)
        hiveobj.querydata('select * from tmpapp limit 1')
    
    

    4. 源码文件

    $ cat pyhive_test.py
    import sys
    reload(sys)
    sys.setdefaultencoding('utf-8')
    from pyhive import hive
    
    class hiveObj:
        def __init__(self,host,user,dbname=u'default',port=10000):
            self.host = host
            self.dbname = dbname
            self.user = user
            self.port = port
        def hiveConIns(self):
            conn = hive.Connection(host=self.host, port=self.port, username=self.user, database=self.dbname)
            return conn
        #通常查询个别数量的数据建议在sql中进行优化,可以仅使用cursor的fetchall()方法进行批量操作
        def querydata(self,sql,args=None):
            conn = self.hiveConIns()
            cur = conn.cursor()
            cur.execute(sql,args)
            alldata = cur.fetchall()
            cur.close()
            #cur.fetch类方法返回一个[tuple,tuple]
            for data in alldata:
                print(data)
            conn.close()
        #注意:hivesql的execute类方法的args是执行过程的参数,而不是sql的参数.比如cursor.execute('SELECT * FROM my_awesome_data LIMIT 10', async=True)表示异步执行
        def changedata(self,sql,args=None):
            conn = self.hiveConIns()
            cur = conn.cursor()
            try:
                #做一个粗暴的判断当args是list时就进行批量插入
                if isinstance(args,list):
                    #executemany(sql,args)方法args支持tuple或者list类型
                    cur.executemany(sql,args)
                else:
                    #execute(sql,args)方法args支持string,tuple,list,dict
                    cur.execute(sql,args)
                conn.commit()
            except Exception as e:
                #因为hive不支持事务,因此虽然提供了rollback()但是是没用的
                #conn.rollback()
                print(e)
            finally:
                cur.close()
                conn.close()
    
    if __name__ == '__main__':
        #默认database为default,默认port为10000
        hiveobj = hiveObj("10.0.1.18","hdfs")
        '''
        #查询数据
        sql = "show tables"
        hiveobj.querydata(sql)
    
        #hive创建表
        tabledesc = "create table appinfo (appname string,level string,leader string,dep string,ips  array<string>) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' COLLECTION ITEMS TERMINATED BY ',' "
        print("creating a table....")
        hiveobj.changedata(tabledesc)
        hiveobj.querydata(sql)
    
        #插入数据
        sql1 = "load data  inpath 'hdfs://hdfs-name/ips.txt' overwrite into table appinfo"
        hiveobj.changedata(sql1)
        hiveobj.querydata('select * from appinfo')
        '''
        #对array对象中的元素进行遍历查询[临时表的创建第一次必须使用create table name as select ],更新数据需要使用[insert into|overwrite table name select] into是追加数据,overwrite是覆盖数据
        #sql = "create  table tmpapp as select ip,appname,leader,dep from appinfo  LATERAL VIEW explode(ips) appinfo  AS ip"
        #sql = "insert into table tmpapp select ip,appname,leader,dep from appinfo  LATERAL VIEW explode(ips) appinfo  AS ip"
        sql = "insert overwrite table tmpapp select ip,appname,leader,dep from appinfo  LATERAL VIEW explode(ips) appinfo  AS ip"
        hiveobj.changedata(sql)
        hiveobj.querydata('select * from tmpapp limit 1')
    

    相关文章

      网友评论

        本文标题:使用Python操作Hive

        本文链接:https://www.haomeiwen.com/subject/kwcmtqtx.html