美文网首页
PySpark操作Hive的常用语句函数封装包

PySpark操作Hive的常用语句函数封装包

作者: 小甜瓜Melon | 来源:发表于2017-07-21 14:45 被阅读0次

    目的:将hive常用的查看函数进行封装。

    #!/usr/bin/env python
    # _*_ coding:utf-8 _*_
    
    # Standard libraries
    import sys
    import os
    import time
    
    # PyData stack
    import pandas as pd
    import numpy as np
    import matplotlib
    import matplotlib.pyplot as plt
    #matplotlib.use('Agg')
    
    # Spark Libraries
    from pyspark import SparkContext, SparkConf
    
    # Spark sql Libraries
    from pyspark.sql import HiveContext
    from pyspark.sql import SQLContext
    from pyspark.sql import Row
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    import pyspark.sql as sprksql
    
    # Spark ml Libraries
    from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler
    from pyspark.ml import Pipeline
    
    # from utils import *
    # from utils.plotting import *
    
    # Magic!
    from IPython.core.display import display, HTML, Markdown
    display(HTML("<style>.container { width:90% !important; }</style>"))
    
    # 设置jupyter中显示的格式
    pd.options.display.max_columns = 1000
    pd.options.display.max_rows = 1000
    # 等价于pd.set_option('display.max_columns', 100)
    
    hiveContext = HiveContext(sc)
    
    #--*--*----*--*----*--*----*--*----*--*----*--*----*--*----*--*----*--*----*--*----*--*--#
    
    """利用Python查看单个数据库中的所有表"""
    def scanTable(db_name):
        sql = "use %s"%db_name
        hiveContext.sql(sql)
        tables = hiveContext.sql("show tables").collect()
        for i in xrange(len(tables)):
            print tables[i][0]
    # scanTable("source_data")
    
    """查看单个表中的数据示例"""
    # 参数说明:number代表查看几条数据
    def scanData(db_name, table_name, number):
        sql_scan = "select * from %s.%s limit %d"%(db_name, table_name, number)
        return hiveContext.sql(sql_scan).toPandas().T
    # scanData("source_data", "users_basic", 3)
    # hiveContext.sql("select * from user_profile_project.user_pro_address_auto limit 2").toPandas().T
    
    
    """计算单个表中所有字段的记录数"""
    def countColumnsNums(db_name,table_name):
        print u"当前数据库为%s"%db_name
        sql = "SELECT COUNT(*) FROM %s.%s" %(db_name, table_name) 
        # globals()[table_name + '_number'] = hiveContext.sql(sql)
        print u"当前表名称为%s,总记录数为:"%table_name, hiveContext.sql(sql).collect()[0][0]
    
        sql1 = """SHOW COLUMNS FROM %s.%s"""%(db_name, table_name)
        col_name = hiveContext.sql(sql1).collect()
        
        print u"开始计算每一列的记录数"
        for i in xrange(len(col_name)):
            sql2 = "SELECT COUNT(%s) FROM %s.%s" %(col_name[i][0], db_name, table_name)
            print col_name[i][0], hiveContext.sql(sql2).collect()[0][0], '''''''',\
            u"%s表里边一共包含%s个column,已经计算完第%s个column, 列名称为%s" %(table_name, len(col_name), i+1, col_name[i][0])
        print u"计算结束!"
    
    
    """计算所有表中的记录数"""
    def hiveCountTables(db_name):
        sql = "use %s"%db_name
        hiveContext.sql(sql)
        showtables = hiveContext.sql("show tables")
        table_name = showtables.collect()
        #print u"表名称"
        #print table_name
        #print '*-' * 40 + '*'
        
        print u"开始统计"
        if type(table_name) == list:
            for i in xrange(len(table_name)):
                # sql = str("SELECT * FROM tb_source_data.%s" %table_name[i][0])
                sql = "SELECT count(*) FROM source_data.%s" %table_name[i][0]
                #print sql
                # locals()[str(table_name[i][0])] = hiveContext.sql(sql).toPandas()
                # globals()[str(table_name[i][0])] = hiveContext.sql(sql).toPandas() 
                globals()[table_name[i][0] + '_number'] = hiveContext.sql(sql)
                #print table_name[i][0]
                print table_name[i][0] + '_number',hiveContext.sql(sql).collect()[0][0]
                #print u"该database里边一共包含%s个table,正在计算第%s个table, 表名称为%s" %(len(table_name), i+1, table_name[i][0])
                #print '*-' * 40 + '*'
            print u"计算结束!"
        else:
            print "showtables is not a list"
    # hiveCountTables("source_data")
    
    
    
    """计算所有表的总记录数和所有字段的记录数"""
    def hiveCountTablesColumns(db_name):
        sql = "use %s"%db_name
        hiveContext.sql(sql)
        showtables = hiveContext.sql("show tables")
        table_name = showtables.collect()
        #print u"表名称"
        #print table_name
        #print '*-' * 40 + '*'
        
        print u"开始统计"
        if type(table_name) == list:
            for i in xrange(len(table_name)):
                sql = "SELECT COUNT(*) FROM source_data.%s" %table_name[i][0] 
                globals()[table_name[i][0] + '_number'] = hiveContext.sql(sql)
                print table_name[i][0] + '_number',hiveContext.sql(sql).collect()[0][0]
                
                sql1 = "SHOW COLUMNS FROM %s" %table_name[i][0]
                col_name = hiveContext.sql(sql1).collect()
                
                for j in xrange(len(col_name)):
                    sql2 = "SELECT COUNT(%s) FROM %s" %(col_name[j][0],table_name[i][0])
                    print col_name[j][0], hiveContext.sql(sql2).collect()[0][0]    
                print u"该database里边一共包含%s个table,已经计算完第%s个table, 表名称为%s" %(len(table_name), i+1, table_name[i][0])
                print '*-' * 40 + '*'
            print u"计算结束!"
        else:
            print "showtables is not a list"
            
    # hiveCountTablesColumns("source_data")
    
    

    相关文章

      网友评论

          本文标题:PySpark操作Hive的常用语句函数封装包

          本文链接:https://www.haomeiwen.com/subject/hqlekxtx.html