美文网首页大数据学习笔记
慕课网Spark SQL日志分析 - 4.从Hive平滑过渡到S

慕课网Spark SQL日志分析 - 4.从Hive平滑过渡到S

作者: 9c0ddf06559c | 来源:发表于2018-07-11 22:03 被阅读128次

    4.1 SQLContext/HiveContext/SparkSesson

    1.SQLContext

    image.png

    老版本文档:http://spark.apache.org/docs/1.6.1/

    • SQLContext示例文件:
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.SQLContext
    
    /**
    * SQLContext使用
    * 注意:IDEA是在本地,而测试数据是在服务器上 ,能不能在本地进行开发测试的?
    */
    object SQLContextApp {
    
    def main(args: Array[String]): Unit = {
    
    val path = args(0)
    
    //1)创建相应的Context
    val sparkConf = new SparkConf()
    
    //在测试或者生产中,AppName和Master我们是通过脚本进行指定
    sparkConf.setAppName("SQLContextApp").setMaster("local[2]").set("spark.driver.bindAddress","127.0.0.1")
    
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    
    //2)相关的处理: json
    val people = sqlContext.read.format("json").load(path)
    people.printSchema()
    people.show()
    
    
    
    //3)关闭资源
    sc.stop()
    }
    }
    
    • 打包:
    mvn clean package -DSkipTests
    
    ./bin/spark-submit \
    --class <main-class>
    --master <master-url> \
    --deploy-mode <deploy-mode> \
    --conf <key>=<value> \
    ... # other options
    <application-jar> \
    [application-arguments]
    ./bin/spark-submit \
    --class com.gwf.spark.SQLContextApp
    --master local[2] \
    /Users/gaowenfeng/Downloads/MySparkSqlProject/target/sql-1.0.jar \
    file:///Users/gaowenfeng/software/spark-2.2.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json
    
    
    • 脚本提交:
      将上面的命令做成shell脚本,赋予执行权限即可执行

    2.HiveContext使用

    To use a HiveContext, you do not need to have an existing Hive setup

    代码上面代码类似,只是把SQLContext改成HiveContext。不过使用时需要通过--jars 把mysql的驱动传递到classpath

    3.SparkSession

    def main(args: Array[String]): Unit = {
    val path = args(0)
    
    val spark = SparkSession
    .builder()
    .appName("SQLContextApp")
    .config("spark.driver.bindAddress","127.0.0.1")
    .master("local[2]")
    .getOrCreate()
    
    
    val people = spark.read.format("json").load(path)
    people.printSchema()
    people.show()
    spark.stop()
    }
    

    4.2 spark-shell/spark-sql的使用

    1. 在conf目录添加hive-site.xml
    2. --jars 传递mysql驱动包
    # shell
    spark-shell --master local[2] --jars /Users/gaowenfeng/.m2/repository/mysql/mysql-connector-java/5.1.45/mysql-connector-java-5.1.45.jar
    
    # spark.sql('sql语句').show
    
    # mysql
    spark-sql --master local[2] --jars /Users/gaowenfeng/.m2/repository/mysql/mysql-connector-java/5.1.45/mysql-connector-java-5.1.45.jar
    # 可以直接执行SQL
    

    分析执行计划理解sparksql的架构

    create table t(key string,value string);
    explain extended select a.key * (2+3),b.value from t a join t b on a.key = b.key and a.key > 3;
    
    
    # 解析成一个逻辑执行计划
    == Parsed Logical Plan ==
    # unresolvedalias:并没有解析全
    'Project [unresolvedalias(('a.key * (2 + 3)), None), 'b.value] # select 的两个字段
    +- 'Join Inner, (('a.key = 'b.key) && ('a.key > 3)) # or后面的条件
    :- 'SubqueryAlias a
    : +- 'UnresolvedRelation `t`
    +- 'SubqueryAlias b
    +- 'UnresolvedRelation `t`
    
    # 解析操作(需要与底层的metastore打交道)
    == Analyzed Logical Plan ==
    (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE)): double, value: string # 将a.key , (2+3) 分别转换成double类型
    Project [(cast(key#8 as double) * cast((2 + 3) as double)) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#12, value#11] # select 的两个字段
    +- Join Inner, ((key#8 = key#10) && (cast(key#8 as int) > 3))
    :- SubqueryAlias a
    : +- SubqueryAlias t # 已经解析出了使元数据中的哪张表
    : +- CatalogRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#8, value#9]
    +- SubqueryAlias b
    +- SubqueryAlias t
    +- CatalogRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#10, value#11]
    
    # 优化操作
    == Optimized Logical Plan ==
    Project [(cast(key#8 as double) * 5.0) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#12, value#11]
    +- Join Inner, (key#8 = key#10)
    :- Project [key#8]
    : +- Filter (isnotnull(key#8) && (cast(key#8 as int) > 3)) # 把a.key>3 提到前面来,先过滤,
    : +- CatalogRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#8, value#9]
    +- Filter (isnotnull(key#10) && (cast(key#10 as int) > 3))
    +- CatalogRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#10, value#11]
    
    # 物理执行计划
    == Physical Plan ==
    *Project [(cast(key#8 as double) * 5.0) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#12, value#11]
    +- *SortMergeJoin [key#8], [key#10], Inner
    :- *Sort [key#8 ASC NULLS FIRST], false, 0
    : +- Exchange hashpartitioning(key#8, 200)
    : +- *Filter (isnotnull(key#8) && (cast(key#8 as int) > 3))
    : +- HiveTableScan [key#8], CatalogRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#8, value#9]
    +- *Sort [key#10 ASC NULLS FIRST], false, 0
    +- Exchange hashpartitioning(key#10, 200)
    +- *Filter (isnotnull(key#10) && (cast(key#10 as int) > 3))
    +- HiveTableScan [key#10, value#11], CatalogRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#10, value#11]
    

    4.3 thriftserver/beeline的使用

    1. 启动thriftserver,默认端口是10000
    ./sbin/start-thriftserver.sh \
    # 修改端口
    --hiveconf hive.server2.thrift.port=<listening-port> \
    # 修改host
    --hiveconf hive.server2.thrift.bind.host=<listening-host> \
    --master <master-uri>
    
    1. 启动beeline
      beeline -u jdbc:hive2://localhost:10000 -n gaowenfeng
    image.png image.png

    3.thriftserver 和 spark-shell/spark-sql 的区别:

    1. spark-shell,spark-sql都是一个spark application
    2. thriftserver不管你启动了多少个客户端(beeline/code),永远都是一个spark application,解决了一个数据共享的问题,多个客户端可以共享数据

    4.4 jdbc方式编程访问

    1.添加maven依赖

    <dependency>
    <groupId>org.spark-project.hive</groupId>
    <artifactId>hive-jdbc</artifactId>
    <version>1.2.1.spark2</version>
    </dependency>
    

    2.开发代码访问thriftserver

    注意事项:在使用jdbc开发时,一定要先启动thriftserver

    def main(args: Array[String]): Unit = {
    Class.forName("org.apache.hive.jdbc.HiveDriver")
    try{}
    val conn = DriverManager.getConnection("jdbc:hive2://localhost:10000","gaowenfeng","")
    val pstmt = conn.prepareStatement("select * from emp")
    val rs = pstmt.executeQuery()
    
    while (rs.next()){
    print(rs.getInt("id")+"\t"+rs.getString("name"))
    }
    
    rs.close()
    pstmt.close()
    conn.close()
    }
    

    相关文章

      网友评论

        本文标题:慕课网Spark SQL日志分析 - 4.从Hive平滑过渡到S

        本文链接:https://www.haomeiwen.com/subject/lfmkpftx.html