RDD的创建
从集合中创建
- makeRDD函数创建
// 设定spark计算框架的运行环境
val config = new SparkConf().setMaster("local[*]").setAppName("WordCount")
// 创建spark上下文
val sc = new SparkContext(config)
// 从集合中调用makeRDD创建RDD
val list = sc.makeRDD(List(1,2,3,4))
// 打印结果
list.collect().foreach(println)
- parallelize函数创建
// 设定spark计算框架的运行环境
val config = new SparkConf().setMaster("local[*]").setAppName("WordCount")
// 创建spark上下文
val sc = new SparkContext(config)
// 从集合中调用makeRDD创建RDD
val list = sc.parallelize(List(1,2,3,4))
// 打印结果
list.collect().foreach(println)
从外部存储中创建
// 读取文件,将文件内容一行一行读取出来
// 默认情况下可以读取项目路径,也可以读取其他路径,HDFS
// 默认从文件读取的数据都是字符串类型
// 读取文件时,传递的分区参数为最小分区数,但不一定是这个分区数,取决于hadoop文件的时分片规则
val lines : RDD[String] = sc.textFile("in",2)
从其他RDD创建
RDD的转换
value类型
- map(func)的案例
作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
// 从集合中调用makeRDD创建RDD
val list = sc.parallelize(1 to 10)
// 给每个元素乘以2组成新值
val newValue = list.map(x => x*2)
newValue.foreach(println)
- flatmap(func)的案例
作用:类似于map,但是每一个输入元素可以被映射为0个或者多个输出元素(所以func应返回一个序列,而不是单一元素)
// 从集合中调用makeRDD创建RDD
val list = sc.parallelize(Array(List(1,2), List(3, 4)))
// flatmap
// 1,2,3,4
val result = list.flatMap(list => list)
result.collect().foreach(println)
- groupBy(func)的案例
作用:分组,按照传入函数的返回值进行分组,将相同的key对应的值放入一个迭代器
// 从集合中调用makeRDD创建RDD
val list = sc.parallelize(1 to 10)
val groupDatas = list.groupBy(x => x%2)
groupDatas.collect().foreach(println)
- filter(func)的案例
作用:过滤,返回一个新的RDD,该RDD由经过func函数计算后返回为true的输入元素组成
// 从集合中调用makeRDD创建RDD
val list = sc.parallelize(1 to 10)
val filterNums = list.filter(x => x%2==0)
filterNums.collect().foreach(println)
- distinct([numTasks])的案例
作用:对源RDD进行去重后返回一个新的RDD,默认情况下,只有8个并行任务来操作,但是可以传入一个numTasks参数改变它
// 从集合中调用makeRDD创建RDD
val list = sc.parallelize(List(1,2,3,4,1,2))
// 去重
list.distinct().collect().foreach(println)
- sortBy(func,[ascending],[numTasks])的案例
作用:使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为升序
// 从集合中调用makeRDD创建RDD
val list = sc.parallelize(List(1,2,3,4,1,2))
// 降序
val result = list.sortBy(x => x, false)
result.collect().foreach(println)
双value类型
- union(otherDataset)的案例
作用:对源RDD和参数RDD求并集后返回一个新的RDD
// 从集合中调用makeRDD创建RDD
val list1 = sc.parallelize(1 to 10)
// 集合2
val list2 = sc.parallelize(10 to 15)
// 并集
val lists = list1.union(list2).distinct().collect()
lists.foreach(println)
- substract(otherDataset)的案例
作用:计算差的一种函数,去除两个RDD中相同的元素,不同的RDD将被保留下来
// 从集合中调用makeRDD创建RDD
val list1 = sc.parallelize(1 to 10)
// 集合2
val list2 = sc.parallelize(10 to 15)
val result = list1.subtract(list2)
result.collect().foreach(println)
- intersection(otherDataset)的案例
作用:对源RDD和参数RDD求交集饭后一个新的RDD
// 从集合中调用makeRDD创建RDD
val list1 = sc.parallelize(1 to 10)
// 集合2
val list2 = sc.parallelize(10 to 15)
val result = list1.intersection(list2)
result.collect().foreach(println)
- cartesian(otherDataset)的案例
作用:笛卡尔积
// 从集合中调用makeRDD创建RDD
val list1 = sc.parallelize(1 to 10)
// 集合2
val list2 = sc.parallelize(10 to 15)
val result = list1.cartesian(list2)
result.collect().foreach(println)
- zip(otherDataset)的案例
作用:将两个RDD组合成key/value形式的RDD,这里默认两个RDD的partition数量以及元素数量相同,否则会抛出异常
// 从集合中调用makeRDD创建RDD
val list1 = sc.parallelize(Array("x","y","z"), 2)
// 集合2
val list2 = sc.parallelize(Array(1,2,3), 2)
val result = list1.zip(list2)
// (x,1) (y,2) (z,3)
result.collect().foreach(println)
key-value类型
- groupByKey的案例
作用:groupByKey也是对每个key进行分组,但只生成一个sequence
// 从集合中调用makeRDD创建RDD
val list = sc.parallelize(Array("hello", "hello", "world"))
// 组装成map
val map = list.map(x => (x, 1))
val group = map.groupByKey()
group.collect().foreach(println)
- reduceByKey(func,[numTasks])案例
作用:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同的key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数设置
// 从集合中调用makeRDD创建RDD
val list = sc.parallelize(Array("hello", "hello", "world"))
// 组装成map
val map = list.map(x => (x, 1))
val group = map.reduceByKey(_+_)
group.collect().foreach(println)
- aggregateByKey的案例
参数:(zeroValue:U,)
- sortByKey([ascending],[numTasks])的案例
作用:在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
// 从集合中调用makeRDD创建RDD
val list = sc.parallelize(Array((1,"a"),(5, "b"),(3,"c")))
// 降序
val result = list.sortByKey( false)
// (5,b) (3,c) (1,a)
result.collect().foreach(println)
- mapValues的案例
针对于(K,V)的形式的类型只对V进行操作
// 从集合中调用makeRDD创建RDD
val list = sc.parallelize(Array((1,"a"),(5, "b"),(3,"c")))
// 降序
val result = list.mapValues(x => x.concat("|||"))
result.collect().foreach(println)
- join(otherDataset,[numTasks])的案例
作用:在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
// 从集合中调用makeRDD创建RDD
val list1 = sc.parallelize(Array((1,"a"),(5, "b"),(3,"c")))
val list2 = sc.parallelize(Array((1,"f"),(5, "g"),(4,"h")))
val result = list1.join(list2)
//(1,(a,f)) (5,(b,g))
result.collect().foreach(println)
action
- reduce(func)的案例
作用:通过func函数聚集RDD中的所有元素,先聚合分区内的数据,再聚合分区间的数据
// 从集合中调用makeRDD创建RDD
val list = sc.makeRDD(1 to 10, 2)
// 聚合RDD[int]中所有元素
val sum = list.reduce(_+_)
// 55
print(sum)
- collect()案例
作用:在驱动程序中,以数组的形式返回数据集的所有元素
val result = sc.makeRDD(Array(1 to 10))
result.collect()
3.count()案例
作用:返回RDD中元素的个数
val result = sc.makeRDD(1 to 10)
// 统计个数为10
print(result.count())
- first()案例
作用:返回RDD中的第一个元素 - tabke(n)案例
作用:返回一个由RDD的前n个元素组成的数组 - takeOrdered(n)
作用:返回该RDD排序后的前n个元素组成的数组 - countByKey()
作用:针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的个数
val map = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 1)))
val result = map.countByKey()
println(result)
- foreach(func)的案例
作用:在数据集的每一个元素上,运行函数fucn进行更新
数据的读取和保存
文件数据读取与保存
- text文件
数据读取:text(String)
// 读取文件,将文件内容一行一行读取出来
// 默认情况下可以读取项目路径,也可以读取其他路径,HDFS
// 默认从文件读取的数据都是字符串类型
val lines : RDD[String] = sc.textFile("in")
数据保存:saveAsTextFile
list.saveAsTextFile("output")
- json文件
如果JSON文件中每一行就是一个JSON记录,那么可以通过将JSON文件当做文本文件来读取,然后利用相关的JSON库对每一条数据进行JSON解析
// 读取json文件
val file = sc.textFile("in/user.json")
val json = file.map(JSON.parseFull)
json.foreach(println)
user.json(注意:每行不能用逗号隔开)
{"name": "xuwei", "age": 10}
{"name": "xuwei2", "age": 101}
{"name": "xuwei2", "age": 103}
输出结果
Some(Map(name -> xuwei2, age -> 103.0))
Some(Map(name -> xuwei, age -> 10.0))
Some(Map(name -> xuwei2, age -> 101.0))
文件系统类数据读取与保存
- mysql读取与保存
mysql读取
支持通过Java jdbc访问关系数据库,需要通过JdbcRDD进行
(1) 添加依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.18</version>
</dependency>
(2) mysql读取
// 设定spark计算框架的运行环境
val config = new SparkConf().setMaster("local[*]").setAppName("WordCount")
// 创建spark上下文
val sc = new SparkContext(config)
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://192.168.6.70:3306/watson"
val userName = "watson"
val password = "watson@150922"
val sql = "select * from user where userid >= ? and userid<= ?"
val jdbc = new JdbcRDD(
sc,
() => {
// 获取数据库的连接对象
//Class.forName(driver)
java.sql.DriverManager.getConnection(url, userName, password)
},
sql,
1,
3,
2,
(rs)=>{
println(rs.getString(1)+","+rs.getString(2))
}
)
jdbc.collect()
}
(3) mysql保存
// 设定spark计算框架的运行环境
val config = new SparkConf().setMaster("local[*]").setAppName("WordCount")
// 创建spark上下文
val sc = new SparkContext(config)
val data = sc.makeRDD(List(("xuwei", "test")))
val url = "jdbc:mysql://192.168.6.70:3306/watson"
val userName = "watson"
val password = "watson@150922"
data.foreachPartition(datas =>{
val connection = java.sql.DriverManager.getConnection(url, userName, password)
datas.foreach{
case (userName, password)=>{
val sql = "insert into user(username, password) values (?,?)"
val statement = connection.prepareStatement(sql)
statement.setString(1, userName)
statement.setString(2, password)
statement.executeUpdate()
statement.close()
}
}
connection.close()
})
}
- hbase读取与保存
网友评论