美文网首页
Spark SQL UDF/Join/DataFrame综合使用

Spark SQL UDF/Join/DataFrame综合使用

作者: 喵星人ZC | 来源:发表于2019-06-20 21:00 被阅读0次

    一、Scalikejdbc的配置文件及pom文件如下
    application.conf

    db.default.driver="com.mysql.jdbc.Driver"
    db.default.url="jdbc:mysql://hadoop000:3306/hadoop_train?characterEncoding=utf-8"
    db.default.user="root"
    db.default.password="root"
    dataSourceClassName=com.mysql.jdbc.jdbc2.optional.MysqlDataSource
    

    pom.xml

     <properties>
            <scala.version>2.11.8</scala.version>
            <spark.version>2.4.2</spark.version>
            <hive.version>1.1.0-cdh5.7.0</hive.version>
            <hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
            <scalikejdbc.version>3.3.2</scalikejdbc.version>
        </properties>
    
     <!--scalikejdbc 依赖 -->
            <dependency>
                <groupId>org.scalikejdbc</groupId>
                <artifactId>scalikejdbc_2.11</artifactId>
                <version>${scalikejdbc.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.scalikejdbc</groupId>
                <artifactId>scalikejdbc-config_2.11</artifactId>
                <version>${scalikejdbc.version}</version>
            </dependency>
     <!--Spark 依赖 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
            <!--mysql 依赖 -->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.6</version>
            </dependency>
     <!-- GSON -->
            <dependency>
                <groupId>com.google.code.gson</groupId>
                <artifactId>gson</artifactId>
                <version>2.3.1</version>
            </dependency>
       </dependencies>
    

    二、数据及脚本
    MySQL两张表信息如下
    city_info

    CREATE TABLE `city_info` (
      `city_id` int(11) DEFAULT NULL,
      `city_name` varchar(255) DEFAULT NULL,
      `area` varchar(255) DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
     
    
    insert  into `city_info`(`city_id`,`city_name`,`area`) values (1,'BEIJING','NC'),(2,'SHANGHAI','EC'),(3,'NANJING','EC'),(4,'GUANGZHOU','SC'),(5,'SANYA','SC'),(6,'WUHAN','CC'),(7,'CHANGSHA','CC'),(8,'XIAN','NW'),(9,'CHENGDU','SW'),(10,'HAERBIN','NE');
    
    

    product_info

    CREATE TABLE IF NOT EXISTS product_info(
    product_id INT UNSIGNED AUTO_INCREMENT,
    product_name VARCHAR(100),
    extend_info VARCHAR(100),
    PRIMARY KEY (product_id )
    );
    
    insert  into product_info(product_id,product_name,extend_info) values (1,'product1','{"product_status":1}');
    insert  into product_info(product_id,product_name,extend_info) values (2,'product2','{"product_status":1}');
    insert  into product_info(product_id,product_name,extend_info) values (3,'product3','{"product_status":1}');
    insert  into product_info(product_id,product_name,extend_info) values (4,'product4','{"product_status":1}');
    insert  into product_info(product_id,product_name,extend_info) values (5,'product5','{"product_status":1}');
    insert  into product_info(product_id,product_name,extend_info) values (6,'product6','{"product_status":1}');
    insert  into product_info(product_id,product_name,extend_info) values (7,'product7','{"product_status":1}');
    insert  into product_info(product_id,product_name,extend_info) values (8,'product8','{"product_status":1}');
    insert  into product_info(product_id,product_name,extend_info) values (9,'product9','{"product_status":0}');
    insert  into product_info(product_id,product_name,extend_info) values (10,'product10','{"product_status":1}');
    insert  into product_info(product_id,product_name,extend_info) values (11,'product11','{"product_status":0}');
    insert  into product_info(product_id,product_name,extend_info) values (12,'product12','{"product_status":0}');
    insert  into product_info(product_id,product_name,extend_info) values (13,'product13','{"product_status":0}');
    

    Hive表信息

    create table user_click(
    user_id int,
    session_id string,
    action_time string,
    city_id int,
    product_id int
    ) partitioned by (day string)
    row format delimited fields terminated by ',';
    
    load data local inpath '/home/hadoop/soul/data/user_click.txt' overwrite into table user_click partition(day='2016-05-05');
    
    数据格式
    95,�2bf501a7637549c89cf55342331b15db�,2016-05-05 21:01:56�,1,72
    

    三、使用Spark SQL DataFrame API统计分析得到
    product_id,product_name,product_status,area, click_count,rank,day 信息

    import java.io.File
    import com.google.gson.{JsonObject, JsonParser}
    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.{Row, SparkSession, functions}
    import org.apache.spark.sql.functions._
    import scalikejdbc.config.DBs
    import scalikejdbc.{DB, SQL}
    
    import scala.collection.mutable.ListBuffer
    
    /**
      * 两张MySQL表 一张HIve表进行Join 使用DataFrame API得到
      *
      * product_id,product_name,product_status,area, click_count,rank,day 信息
      */
    object SparkSQLJoinApp {
      def main(args: Array[String]): Unit = {
        val warehouseLocation = new File("spark-warehouse").getAbsolutePath
    
        val spark = SparkSession.builder()
          .appName("SparkSQLJoinApp")
          .master("local[2]")
          .config("spark.sql.warehouse.dir", warehouseLocation)
          .enableHiveSupport().getOrCreate()
        val format = "yyyy-MM-dd'T'HH:mm:ssz"
        import spark.implicits._
    
        /**
          * userclickDF  HiveUDF
          */
        import spark.sql
        sql("use g6_hadoop")
        //    spark.table("user_click").show(10,false)
        val userclickDF = sql("select user_id,city_id,product_id,day from user_click")
    
    
        val dbURL = "jdbc:mysql://hadoop000:3306/hadoop_train"
        val dbUSerName = "root"
        val dbPasswd = "root"
    
    
        /**
          * cityDF  MySQLUDF 1
          */
        val cityDF = spark.read.format("jdbc").option("url", dbURL)
          .option("dbtable", "city_info")
          .option("user", dbUSerName)
          .option("password", dbPasswd).load()
    
    
        //TODO 实现UDF函数
        val splitUDF = udf((extend_info: String) => {
          val obj = new JsonParser().parse(extend_info).getAsJsonObject
          val ele = obj.get("product_status")
          ele.toString.toInt
        })
        spark.udf.register("splitUDF", splitUDF)
    
    
        /**
          * productDF  MySQLUDF 2
          */
        val product_table = spark.read.format("jdbc").option("url", dbURL)
          .option("dbtable", "product_info")
          .option("user", dbUSerName)
          .option("password", dbPasswd).load().createOrReplaceTempView("product_info")
    
        val productDF = spark.sqlContext.sql("select product_id,product_name,splitUDF(extend_info) as product_status  from product_info")
    
    
    
        //TODO 商品信息的各区域的访问次数
        /**
          *
          * select
          * product_id, area, count(1) click_count
          * from
          * tmp_product_click_basic_info
          * group by
          * product_id, area
          *
          */
        val productTempDF = userclickDF.join(cityDF, "city_id").select("product_id", "city_id", "city_name", "area", "day")
    
        val pro_area_countDF = productTempDF.groupBy("product_id", "area").count().join(productDF, "product_id")
          .select($"product_id", $"product_name", $"product_status", $"area", $"count".as("click_count"))
    
        //    pro_area_countDF.show(false)
    
        classOf[com.mysql.jdbc.Driver]
        DBs.setup()
    
        //将商品信息的各区域的访问次数 计算结果保存到MySQL
        pro_area_countDF.foreachPartition(partitionOfRecords => {
    
          val list = ListBuffer[area_prod_info]()
          partitionOfRecords.foreach(record => {
    
            val product_id = record.getAs[Int]("product_id")
            val product_name = record.getAs[String]("product_name")
            val product_status = record.getAs[Int]("product_status")
            val area = record.getAs[String]("area")
            val click_count = record.getAs[Long]("click_count")
            list.append(area_prod_info(product_id, product_name, product_status, area, click_count))
    
            //插入前先删除已有数据
            deleteByID(product_id)
          })
    
    
          insert(list)
        })
    
        DBs.close()
    
        //TODO 获取区域点击top3的商品。使用窗口函数row_number() over()
        //
        val window_spec = Window.partitionBy("area").orderBy($"click_count".desc)
    
        val rankDF = pro_area_countDF.select(pro_area_countDF("product_id")
          , pro_area_countDF("product_name"),
          pro_area_countDF("product_status"),
          pro_area_countDF("area"),
          pro_area_countDF("click_count"),
          row_number().over(window_spec).as("rank")).where("rank <=3")
    
        rankDF.show(100, false) //TODO 保存数据库
    
    
        spark.stop()
      }
    
      def insert(area_prod_infos: ListBuffer[area_prod_info]): Unit = {
        DB.localTx { implicit session =>
          for (area_prod_info <- area_prod_infos) {
            SQL("insert into area_product_click_count_full_info(product_id,product_name,product_status,area,click_count) values(?,?,?,?,?)")
              .bind(area_prod_info.product_id, area_prod_info.product_name, area_prod_info.product_status, area_prod_info.area, area_prod_info.click_count)
              .update().apply()
          }
        }
      }
    
    
      def deleteByID(id: Int): Unit = {
        DB.localTx {
          implicit session =>
            SQL(s"delete from area_product_click_count_full_info where product_id = ${id}").update().apply()
        }
    
      }
    
      case class area_prod_info(product_id: Int, product_name: String, product_status: Int, area: String, click_count: Long)
    
    }
    
    

    相关文章

      网友评论

          本文标题:Spark SQL UDF/Join/DataFrame综合使用

          本文链接:https://www.haomeiwen.com/subject/vvlsqctx.html