读取hive库数据
pom.xml依赖配置
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency>
读取hive数据demo
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object Main {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("spark://master:7077")//申明spark运行模式
.setAppName("risk")//设置job名称(可不写)
val spark = SparkSession.builder()//spark-2.0采用SparkSession代替sparkContext
.config(conf)
.enableHiveSupport()//添加对HIVE的支持,否则无法访问hive库
.getOrCreate()
import spark.implicits._
spark.sql("use bmkp")
val df= spark.sql("select * from customer")//在hive中执行sql语句,返回DataSet格式数据
df.show()
spark.stop()
}
}
读取mysql数据
pom.xml配置文件
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency>
读取mysql数据demo
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object Main {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("spark://master:7077")//申明spark运行模式
.setAppName("risk")//设置job名称(可不写)
val spark = SparkSession.builder()//spark-2.0采用SparkSession代替sparkContext
.config(conf)
.enableHiveSupport()//添加对HIVE的支持,否则无法访问hive库
.getOrCreate()
//读取mysql中数据,返回数据类型为DataSet
val df = spark.read
.format("jdbc")
.options(Map("url" ->
//配置mysql连接参数,包括mysql ip 端口 数据库名称 登录名和密码
"jdbc:mysql://***.***.***.***:3036/bmkpstress?user=root&password=**********",
//定义驱动程序
"driver"->"com.mysql.jdbc.Driver",
//编写sql 在mysql中执行该sql并返回数据
"dbtable" -> "(select * from test group by id) as aaa"))
.load()
spark.stop()
}
}
SPARKSTREAMING读取kafka数据
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency>
读取kafka数据demo
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
object Main {
def main(args:Array[String]):Unit={
val conf = new SparkConf().setMaster("spark://master:7077")
.setAppName("kafka_hive");
val spark = SparkSession.builder().master("spark://master:7077").config(conf).enableHiveSupport().getOrCreate()
var ssc = new StreamingContext(conf, Seconds(10));
var topics = Array("service_cksc","service_ckxc","service_dcyy");//kafka topic名称
var group = "bmkp" //定义groupID
val kafkaParam = Map( //申明kafka相关配置参数
"bootstrap.servers" -> "***.104.42.127:19092,***.104.202.222:19092,***.135.73.152:19092", //kafka 集群IP及端口
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> group, //定义groupID
"auto.offset.reset" -> "earliest",//设置丢数据模式 有 earliest,latest, none
"enable.auto.commit" -> (false: java.lang.Boolean)//设置是否自动存储offset 这里设置为否
);
val offsetRanges = Array()
var stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParam))//从kafka读取数据 获取数据流
stream.foreachRDD { rdd =>
import spark.implicits._
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //获取offset
/*
这里处理从kafka获取的数据,在确定获取的数据已经存储或者处理后将该RDD的offset存储
*/
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) //存储offset
}
}
}
SPARK写数据到HIVE
pom.xml配置信息
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency
写数据到hive库demo
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object Main {
case class Person(name:String,col1:Int,col2:String)
def main(args:Array[String]):Unit={
val conf = new SparkConf()
.setMaster("spark://master:7077")//申明spark运行模式
.setAppName("kettle")//设置job名称(可不写)
val spark = SparkSession.builder()//spark-2.0采用SparkSession代替sparkContext
.config(conf)
.enableHiveSupport()//添加对HIVE的支持,否则无法访问hive库
.getOrCreate()
import spark.implicits._ //引入隐式转换 否则RDD无法转换成DataSet(DataFrame)
spark.sql("use DataBaseName"//在hive中执行sql语句
val data = spark.read.textFile("path")//读取hdfs中的文件,返回的是RDD格式数据,RDD格式数据不能直接写入hive,(这里代表任意的RDD类型数据)
.map(x=>x.split(","))
.map(x=>Person(x(0),x(1).toInt,x(2)))//利用用例类将RDD格式居转换成DataSetG格式数据,从而可以写入hive中
data.toDF().createOrReplaceTempView("table1") //将DataSet格式数据映射到临时表中
spark.sql("insert into table2 partition(date='2015-04-02') select name,col1,col2 from table1")//在hive上运行sql语句将临时表中数据抽出并存入hive中
spark.close()
}
}
写数据到mysql
pom.xml配置
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
spark 写数据到mysql库demo 1
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object Main {
case class Blog(name: String, count: Int)
def main(args: Array[String]) {
val conf = new SparkConf()
.setMaster("spark://master:7077")//申明spark运行模式
.setAppName("kettle")//设置job名称(可不写)
val spark = SparkSession.builder()//spark-2.0采用SparkSession代替sparkContext
.config(conf)
.enableHiveSupport()//添加对HIVE的支持,否则无法访问hive库
.getOrCreate()
//获取RDD数据 这里只是做一个实例 代表spark处理产生的所有RDD类型的数据
val data = spark.sparkContext.parallelize(List(("www", 10), ("iteblog", 20), ("com", 30)))
var conn: Connection = null//定义mysql连接
var ps: PreparedStatement = null
val sql = "insert into blog(name, count) values (?, ?)"//需要执行的sql语句,两个 “?”代表后面需要替换的数据
data.foreachPartition(rdd=>
try {
//具体定义mysql的驱动管理器,主要设置mysql地址 端口 数据库 用户名 密码
conn = DriverManager.getConnection("jdbc:mysql://***.***.***.***:3306/test","root", "******")
rdd.toIterator.foreach(data => {
ps = conn.prepareStatement(sql)
ps.setString(1, data._1)//将需要写入mysql的数据进行映射
ps.setInt(2, data._2)
ps.executeUpdate()//在mysql上执行sql语句将数据插入到相应的表中
})
} catch {
case e: Exception => println("Mysql Exception")
} finally {
if (ps != null) {
ps.close()
}
if (conn != null) {
conn.close()//关闭mysql连接
}
})
}
}
写数据到mysql库demo 2
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
object Main {
case class Blog(name: String, count: Int)
def main(args: Array[String]) {
val conf = new SparkConf()
.setMaster("spark://master:7077") //申明spark运行模式
.setAppName("kettle")
//设置job名称(可不写)
val spark = SparkSession.builder() //spark-2.0采用SparkSession代替sparkContext
.config(conf)
.enableHiveSupport() //添加对HIVE的支持,否则无法访问hive库
.getOrCreate()
//获取RDD数据 这里只是做一个实例 代表spark处理产生的所有RDD类型的数据
val data = spark.sparkContext.parallelize(List(("www", 10), ("iteblog", 20), ("com", 30)))
import spark.implicits._
val df = data.map(x=>new Blog(x._1,x._2)).toDF()//将RDD类型数据转换成DataSet类型
df.write.mode(SaveMode.Append).format("jdbc")
.option("url", "jdbc:mysql://***.***.***.***:3306/test")//定义mysql 地址 端口 数据库
.option("dbtable", "blog")//定义需要插入的mysql目标表
.option("user", "****")//定义登录用户名
.option("password", "************")//定义登录密码
.save()//保存数据
}
}
网友评论