美文网首页
Spark Sql日志分析项目实战

Spark Sql日志分析项目实战

作者: kangapp | 来源:发表于2019-03-15 10:45 被阅读0次
    项目简介
    • 统计主站最受欢迎的课程Top N 访问次数
    • 按地市统计主站最受欢迎的Top N 课程
    • 按流量统计主站最受欢迎的Top N 课程
    环境安装

    CDH相关软件下载地址

    Spark环境搭建

    1、官网下载相应版本源码包
    参考编译过程
    2、spark源码编译中的坑
    pom.xml添加

    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
    

    设置内存

    export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
    

    选择scala版本

    ./dev/change-scala-version.sh 2.10
    
    • 环境搭建
      设置环境变量

    local模式启动:spark-shell --master local[2]

    standalone模式:

    • 修改spark-env.sh配置文件
      SPARK_MASTER_HOST=master
      SPARK_WORKER_CORES=2
      SPARK_WORKER_CORES=2g
      SPARK_WORKER_INSTANCES=1
    • 启动
      sbin/start-all.sh:启动slaves配置的所有节点的worker
      spark-shell --master spark://master:7077:启动spark
      spark-shell --help 可以查看启动参数
      --total-executor-cores 1 指定core总数量

    Spark on Yarn
    spark-env.sh 添加 Hadoop conf 的目录

    • [Client]
      Driver运行在Client端
      Client会和请求到的Container进行通信来完成作业的调度和执行,Client不能退出
      日志信息会在控制台输出,方便调试
    • [Cluster]
      Driver运行在ApplicationMaster中
      Client只要提交完作业之后就可以关闭
      日志在终端看不到,可以通过yarn logs -applicationId <app ID>查看日志
      ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ [--deploy-mode cluster \] //默认client模式 --executor-memory 1G \ --num-executors 1 \ /home/kang/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.1.0.jar \ 4
    Spark SQL 框架介绍

    Spark SQL is Apache Spark's module for working with structured(结构化) data.

    • Integrated(集成)
      Seamlessly mix(无缝混合) SQL queries with Spark programs.
      Spark SQL lets you query structured data inside Spark programs, using either SQL or a familiar DataFrame API. Usable in Java, Scala, Python and R.
    • Uniform Data Access(统一的数据访问)
      Connect to any data source the same way.
      DataFrames and SQL provide a common way to access a variety of data sources, including Hive, Avro, Parquet, ORC, JSON, and JDBC. You can even join data across these sources.
    • Hive Integration(Hive集成)
      Run SQL or HiveQL queries on existing warehouses
      Spark SQL supports the HiveQL syntax as well as Hive SerDes and UDFs, allowing you to access existing Hive warehouses.
    • Standard Connectivity(标准连接)
      Connect through JDBC or ODBC.
      A server mode provides industry standard JDBC and ODBC connectivity for business intelligence tools.
    从Hive平滑过渡到Spark SQL
    • Spark1.x中Spark SQL的入口点:SQLContext
    val sc: SparkContext // An existing SparkContext.
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    

    本地可直接运行

    package com.test
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.SQLContext
    
    /**
      * SQLContext的使用
      */
    object SQLContextApp {
      def main(args: Array[String]): Unit = {
    
        //创建相应的Context
        val sparkConf = new SparkConf()
    
        //在测试和或者生产中,参数一般通过脚本进行指定
        sparkConf.setAppName("SQLContext").setMaster("spark://192.168.247.100:7077")//测试通常采用本地模式“local[2]”
        val sc = new SparkContext(sparkConf)
        val sqlContext = new SQLContext(sc)
    
        //相应的处理:json
        val path = args(0)
        val people = sqlContext.read.format("json").load(path)
        people.printSchema()
        people.show()
    
        //关闭资源
        sc.stop()
      }
    }
    

    打包上服务器运行

    spark-submit \
      --name SQLContext \
      --class com.test.SQLContextApp \
      --master spark://192.168.247.100:7077 \
      /home/kang/lib/SparkTest-1.0.jar \
      /home/kang/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json
    
    • Spark1.x中Spark SQL的入口点:HiveContext
      要获取hive中的元数据信息,需把hive-site.xml配置文件复制到spark的/conf目录下
    // sc is an existing SparkContext.
    val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
    
    package com.test
    
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.sql.hive.HiveContext
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * HiveContext的使用
      */
    object HiveContextApp {
      def main(args: Array[String]): Unit = {
    
        //创建相应的Context
        val sparkConf = new SparkConf()
    
        //在测试和或者生产中,参数一般通过脚本进行指定
        sparkConf.setAppName("HiveContext").setMaster("spark://192.168.247.100:7077")//测试通常采用本地模式“local[2]”
        val sc = new SparkContext(sparkConf)
        val hiveContext = new HiveContext(sc)
    
        //相应的处理:json
        hiveContext.table("test").show
    
        //关闭资源
        sc.stop()
      }
    }
    
    

    打包上传提交时要添加MySQL连接包

    spark-submit \
      --name HiveContext \
      --class com.test.HiveContextApp \
      --master spark://master:7077\
      --jars /home/kang/lib/mysql-connector-java-5.1.34.jar \
      /home/kang/lib/HiveContext.jar \
    
    • Spark2.x中Spark SQL的入口点:SparkSession
    import org.apache.spark.sql.SparkSession
    
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    
    spark-shell/spark-sql的使用
    • 添加hive-site.xml配置文件
    • -- jars传递mysql驱动包
    • spark-shell --master local[2] --jars ~/lib/mysql-connector-java-5.1.34.jar
      spark.sql("show tables").show
    • spark-sql --master local[2] --jars ~/lib/mysql-connector-java-5.1.34.jar
      直接输入sql语句
      explain extended + sql(查看详细执行计划)
    thriftserver/beeline的使用
    • 启动thriftserver
      默认端口是10000,可以修改
    ./sbin/start-thriftserver.sh --master local[2] \
    --jars ~/lib/mysql-connector-java-5.1.34.jar \
    --hiveconf hive.server2.thrift.port=10040
    
    • 启动beeline
    ./bin/beeline -u jdbc:hive2://localhost:10040 -n kang
    
    • thriftserver和普通的spark-shell/spark-sql有什么区别?
      spark-shell、spark-sql都是一个spark application
      thriftserver,不管启动多少个客户端(beeline/code),永远只有一个spark application,多个客户端可以共享缓存数据。
    • code连接thriftserver
      添加相关的依赖包
    <dependency>
          <groupId>org.spark-project.hive</groupId>
          <artifactId>hive-jdbc</artifactId>
          <version>1.2.1.spark2</version>
    </dependency>
    
    package com.test
    
    import java.sql.DriverManager
    
    object SparkSQLThriftServerApp {
    
      def main(args: Array[String]): Unit = {
        Class.forName("org.apache.hive.jdbc.HiveDriver")
        val conn = DriverManager.getConnection("jdbc:hive2://192.168.247.100:10040","kang","")
        val pstmt = conn.prepareStatement("select * from test")
        val rs = pstmt.executeQuery()
        while (rs.next()){
          println("context:" + rs.getString("context"))
        }
        rs.close()
        pstmt.close()
        conn.close()
      }
    }
    
    用户行为日志概述
    • 用户行为日志:用户每次访问网站时所有的行为数据(访问、浏览、搜索、点击)

    • 日志数据内容
      访问的系统属性:操作系统、浏览器等等
      访问特征:点击的url,从哪个url跳转过来、页面工停留的时间等
      访问信息:session_id、访问ip(访问城市)等

    • 分析的意义

    离线数据处理架构(流程)
    • 数据采集
      nginx记录日志信息
      Flume:web日志写入HDFS
    • 数据清洗
      spark、Hive、Mapreduce等
    • 使用Spark SQL解析访问日志
    • 解析出课程编号、类型
    • 根据IP解析出城市信息
    • 使用Spark SQL将访问时间按天进行分区输出

    输入:访问时间、访问url、耗费的流量、访问的IP信息
    输出:URL、cmsType(video/article)、cmsId(编号)、流量、ip、城市信息、访问时间、天

    ip地址解析
    下载:https://github.com/kangapp/ipdatabase
    编译:mvn clean package -DskipTests
    jar包入库:mvn install:install-file -Dfile=F:\ipdatabase-master\target\ipdatabase-1.0-SNAPSHOT.jar -DgroupId=com.ggstar -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar
    pom文件引入,resources文件两个表格文件引入

    • 数据处理
      spark、Hive、Mapreduce进行业务统计和分析
      任务调度:Oozie、Azkaban

    调优点:
    1)控制文件输出大小:coalesce
    2)分区字段的数据类型调整
    .config("spark.sql.sources.partitionColumnTypeInference.enabled","false")

    • 处理结果入库
      RDBMS、NoSQL

    需求一
    create table day_video_access_topn_stat (
    day varchar(8) not null,
    cms_id bigint(10) not null,
    times bigint(10) not null,
    primary key (day,cms_id)
    )

    需求二
    create table day_video_city_access_topn_stat (
    day varchar(8) not null,
    cms_id bigint(10) not null,
    city varchar(20) not null,
    times bigint(10) not null,
    times_rank int not null,
    primary key (day,city,cms_id)
    )
    需求三
    create table day_video_traffics_access_topn_stat (
    day varchar(8) not null,
    cms_id bigint(10) not null,
    traffics bigint(20) not null,
    primary key (day,cms_id)
    )

    • 数据可视化
      Echarts、HUE、Zeppelin
    项目需求
    • 统计imooc主站最受欢迎的课程/手记Top N访问次数
    • 按地市统计imooc主站最受欢迎Top N课程
      根据IP提取城市信息
      窗口函数在Spark SQL中的使用
    • 按流量统计imocc主站最受欢迎的Top N课程
    项目打包

    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
    <archive>
    <manifest>
    <mainClass></mainClass>
    </manifest>
    </archive>
    <descriptorRefs>
    <descriptorRef>jar-with-dependencies</descriptorRef>
    </descriptorRefs>
    </configuration>
    </plugin>
    mvn assembly:assembly
    spark-submit \ --class com.test.SparkStatCleanJobYARN \ --name SparkStatCleanJobYARN \ --master yarn \ --executor-memory 1G \ --num-executors 1 \ --files /home/kang/project/SprakSQL/resource/ipDatabase.csv,/home/kang/project/SprakSQL/resource/ipRegion.xlsx \ /home/kang/project/SprakSQL/lib/sparksql.jar \ hdfs://192.168.247.100:9000/data/spark/output/* hdfs://192.168.247.100:9000/data/spark/partitionByDay

    项目性能调优

    https://segmentfault.com/a/1190000014876069

    代码优化
    • 选用高性能的算子
    • 复用已有的数据
    参数优化

    并行度:spark.sql.shuffle.partitions
    分区字段类型推测:spark.sql.sources.partitionColumnTypeInference.enabled

    相关文章

      网友评论

          本文标题:Spark Sql日志分析项目实战

          本文链接:https://www.haomeiwen.com/subject/gmhpkqtx.html