一、Scalikejdbc的配置文件及pom文件如下
application.conf
db.default.driver="com.mysql.jdbc.Driver"
db.default.url="jdbc:mysql://hadoop000:3306/hadoop_train?characterEncoding=utf-8"
db.default.user="root"
db.default.password="root"
dataSourceClassName=com.mysql.jdbc.jdbc2.optional.MysqlDataSource
pom.xml
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.4.2</spark.version>
<hive.version>1.1.0-cdh5.7.0</hive.version>
<hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
<scalikejdbc.version>3.3.2</scalikejdbc.version>
</properties>
<!--scalikejdbc 依赖 -->
<dependency>
<groupId>org.scalikejdbc</groupId>
<artifactId>scalikejdbc_2.11</artifactId>
<version>${scalikejdbc.version}</version>
</dependency>
<dependency>
<groupId>org.scalikejdbc</groupId>
<artifactId>scalikejdbc-config_2.11</artifactId>
<version>${scalikejdbc.version}</version>
</dependency>
<!--Spark 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--mysql 依赖 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.6</version>
</dependency>
<!-- GSON -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.3.1</version>
</dependency>
</dependencies>
二、数据及脚本
MySQL两张表信息如下
city_info
CREATE TABLE `city_info` (
`city_id` int(11) DEFAULT NULL,
`city_name` varchar(255) DEFAULT NULL,
`area` varchar(255) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
insert into `city_info`(`city_id`,`city_name`,`area`) values (1,'BEIJING','NC'),(2,'SHANGHAI','EC'),(3,'NANJING','EC'),(4,'GUANGZHOU','SC'),(5,'SANYA','SC'),(6,'WUHAN','CC'),(7,'CHANGSHA','CC'),(8,'XIAN','NW'),(9,'CHENGDU','SW'),(10,'HAERBIN','NE');
product_info
CREATE TABLE IF NOT EXISTS product_info(
product_id INT UNSIGNED AUTO_INCREMENT,
product_name VARCHAR(100),
extend_info VARCHAR(100),
PRIMARY KEY (product_id )
);
insert into product_info(product_id,product_name,extend_info) values (1,'product1','{"product_status":1}');
insert into product_info(product_id,product_name,extend_info) values (2,'product2','{"product_status":1}');
insert into product_info(product_id,product_name,extend_info) values (3,'product3','{"product_status":1}');
insert into product_info(product_id,product_name,extend_info) values (4,'product4','{"product_status":1}');
insert into product_info(product_id,product_name,extend_info) values (5,'product5','{"product_status":1}');
insert into product_info(product_id,product_name,extend_info) values (6,'product6','{"product_status":1}');
insert into product_info(product_id,product_name,extend_info) values (7,'product7','{"product_status":1}');
insert into product_info(product_id,product_name,extend_info) values (8,'product8','{"product_status":1}');
insert into product_info(product_id,product_name,extend_info) values (9,'product9','{"product_status":0}');
insert into product_info(product_id,product_name,extend_info) values (10,'product10','{"product_status":1}');
insert into product_info(product_id,product_name,extend_info) values (11,'product11','{"product_status":0}');
insert into product_info(product_id,product_name,extend_info) values (12,'product12','{"product_status":0}');
insert into product_info(product_id,product_name,extend_info) values (13,'product13','{"product_status":0}');
Hive表信息
create table user_click(
user_id int,
session_id string,
action_time string,
city_id int,
product_id int
) partitioned by (day string)
row format delimited fields terminated by ',';
load data local inpath '/home/hadoop/soul/data/user_click.txt' overwrite into table user_click partition(day='2016-05-05');
数据格式
95,�2bf501a7637549c89cf55342331b15db�,2016-05-05 21:01:56�,1,72
三、使用Spark SQL DataFrame API统计分析得到
product_id,product_name,product_status,area, click_count,rank,day 信息
import java.io.File
import com.google.gson.{JsonObject, JsonParser}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{Row, SparkSession, functions}
import org.apache.spark.sql.functions._
import scalikejdbc.config.DBs
import scalikejdbc.{DB, SQL}
import scala.collection.mutable.ListBuffer
/**
* 两张MySQL表 一张HIve表进行Join 使用DataFrame API得到
*
* product_id,product_name,product_status,area, click_count,rank,day 信息
*/
object SparkSQLJoinApp {
def main(args: Array[String]): Unit = {
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession.builder()
.appName("SparkSQLJoinApp")
.master("local[2]")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport().getOrCreate()
val format = "yyyy-MM-dd'T'HH:mm:ssz"
import spark.implicits._
/**
* userclickDF HiveUDF
*/
import spark.sql
sql("use g6_hadoop")
// spark.table("user_click").show(10,false)
val userclickDF = sql("select user_id,city_id,product_id,day from user_click")
val dbURL = "jdbc:mysql://hadoop000:3306/hadoop_train"
val dbUSerName = "root"
val dbPasswd = "root"
/**
* cityDF MySQLUDF 1
*/
val cityDF = spark.read.format("jdbc").option("url", dbURL)
.option("dbtable", "city_info")
.option("user", dbUSerName)
.option("password", dbPasswd).load()
//TODO 实现UDF函数
val splitUDF = udf((extend_info: String) => {
val obj = new JsonParser().parse(extend_info).getAsJsonObject
val ele = obj.get("product_status")
ele.toString.toInt
})
spark.udf.register("splitUDF", splitUDF)
/**
* productDF MySQLUDF 2
*/
val product_table = spark.read.format("jdbc").option("url", dbURL)
.option("dbtable", "product_info")
.option("user", dbUSerName)
.option("password", dbPasswd).load().createOrReplaceTempView("product_info")
val productDF = spark.sqlContext.sql("select product_id,product_name,splitUDF(extend_info) as product_status from product_info")
//TODO 商品信息的各区域的访问次数
/**
*
* select
* product_id, area, count(1) click_count
* from
* tmp_product_click_basic_info
* group by
* product_id, area
*
*/
val productTempDF = userclickDF.join(cityDF, "city_id").select("product_id", "city_id", "city_name", "area", "day")
val pro_area_countDF = productTempDF.groupBy("product_id", "area").count().join(productDF, "product_id")
.select($"product_id", $"product_name", $"product_status", $"area", $"count".as("click_count"))
// pro_area_countDF.show(false)
classOf[com.mysql.jdbc.Driver]
DBs.setup()
//将商品信息的各区域的访问次数 计算结果保存到MySQL
pro_area_countDF.foreachPartition(partitionOfRecords => {
val list = ListBuffer[area_prod_info]()
partitionOfRecords.foreach(record => {
val product_id = record.getAs[Int]("product_id")
val product_name = record.getAs[String]("product_name")
val product_status = record.getAs[Int]("product_status")
val area = record.getAs[String]("area")
val click_count = record.getAs[Long]("click_count")
list.append(area_prod_info(product_id, product_name, product_status, area, click_count))
//插入前先删除已有数据
deleteByID(product_id)
})
insert(list)
})
DBs.close()
//TODO 获取区域点击top3的商品。使用窗口函数row_number() over()
//
val window_spec = Window.partitionBy("area").orderBy($"click_count".desc)
val rankDF = pro_area_countDF.select(pro_area_countDF("product_id")
, pro_area_countDF("product_name"),
pro_area_countDF("product_status"),
pro_area_countDF("area"),
pro_area_countDF("click_count"),
row_number().over(window_spec).as("rank")).where("rank <=3")
rankDF.show(100, false) //TODO 保存数据库
spark.stop()
}
def insert(area_prod_infos: ListBuffer[area_prod_info]): Unit = {
DB.localTx { implicit session =>
for (area_prod_info <- area_prod_infos) {
SQL("insert into area_product_click_count_full_info(product_id,product_name,product_status,area,click_count) values(?,?,?,?,?)")
.bind(area_prod_info.product_id, area_prod_info.product_name, area_prod_info.product_status, area_prod_info.area, area_prod_info.click_count)
.update().apply()
}
}
}
def deleteByID(id: Int): Unit = {
DB.localTx {
implicit session =>
SQL(s"delete from area_product_click_count_full_info where product_id = ${id}").update().apply()
}
}
case class area_prod_info(product_id: Int, product_name: String, product_status: Int, area: String, click_count: Long)
}
网友评论