1.以 List 作为源创建 RDD
val df = sc.parallelize(List(2,8,66,9,34,5))
- 初始化 sbgma --> 创建spark实例
val ss = SparkSession
.builder()
.master("local")
.getOrCreate()
.appName(" spark 2.0")
val sc = ss.sparkContext //获取socket
val df = sc.parallelize(List(2,8,66,9,34,5))
3.初始化SparkStreaming
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
------------这里是类名和主函数入口,自己补充
val ss = SparkSession
.builder()
.appName("word count streaming")
//流式处理中 ,一个job需要开启两个线程。
.master("local[2]")
//.enableHiveSupport() //把hive功能打开
//用于存放临时数据 不设置 默认会在当前目录下创建一个 spark-warehouse
.config("spark.sql.warehouse.dir", "D:\\BDTC11")
.getOrCreate()
val sc = ss.sparkContext
//Spark Streaming
//def this(sparkContext: SparkContext, batchDuration: Duration)
//SparkSteaming 设置batch 一般是秒级别
//每隔10S处理一次
val ssc = new StreamingContext(sc, Seconds(4))
//输入源
//监听端口 192.168.0.251 7777
val sourceDStream = ssc.socketTextStream("192.168.1.4", 7777)
- RDD的合并(基于内存的,会生成新的RDD,而不是向rdd1里面塞rdd2)
合并RDD , 这里是取并集 , 相当于数学里面的加法
val rdd1=sc.parallelize(List(1,2,3))
val rdd2=sc.parallelize(List(4,5,6))
val rdd3=rdd1.union(rdd2)
rdd3.foreach(println)
1
2
3
4
5
6
网友评论