美文网首页
(五)transform、Join的使用、DStream和RDD

(五)transform、Join的使用、DStream和RDD

作者: 白面葫芦娃92 | 来源:发表于2018-11-16 21:01 被阅读0次

数据一:日志信息
domain.time.traffic
ruozedata.com
baidu.com
ruozedata.com
数据二:已有文件 黑名单
domain
baidu.com
需求:把数据二当做一个黑名单,也就是把数据一中的baidu.com数据剔除掉,只留下ruozedata.com
一、用SparkCore实现:(IDEA)

import scala.collection.mutable.ListBuffer

object LeftJoinApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("LeftJoinApp")
    val sc = new SparkContext(conf)
//    数据一
    val input1 = new ListBuffer[(String,Long)]
    input1.append(("www.ruozedata.com",8888))
    input1.append(("www.ruozedata.com",8889))
    input1.append(("www.ruozedata.com",8887))
    input1.append(("www.baidu.com",8886))
    val data1 = sc.parallelize(input1)

//    数据二
    val input2 = new ListBuffer[(String,Boolean)]
    input2.append(("www.baidu.com",true))
    val data2 = sc.parallelize(input2)

    val res1 = data1.leftOuterJoin(data2)//.collect().foreach(println)
    val res2 = res1.filter(_._2._2.getOrElse(false) != true)//.collect().foreach(println)
    val res3 = res2.map(x=>(x._1,x._2._1))//.collect().foreach(println)

    sc.stop()
  }
}

输出结果:

res1:
(www.ruozedata.com,(8888,None))
(www.ruozedata.com,(8889,None))
(www.ruozedata.com,(8887,None))
(www.baidu.com,(8886,Some(true)))
----------------------------------------------------------------
res3:
(www.ruozedata.com,(8888,None))
(www.ruozedata.com,(8889,None))
(www.ruozedata.com,(8887,None))
----------------------------------------------------------------
res3:
(www.ruozedata.com,8888)
(www.ruozedata.com,8889)
(www.ruozedata.com,8887)

二、用SparkStreaming实现,其中最重要的是使用了transform算子(DStream和RDD做操作,一定要用transform)

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable.ListBuffer

object LeftJoinStreamingApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("LeftJoinStreamingApp")
    val ssc = new StreamingContext(conf,Seconds(5))
    //    数据二:RDD
    val input2 = new ListBuffer[(String,Boolean)]
    input2.append(("www.baidu.com",true))
    val data2 = ssc.sparkContext.parallelize(input2)

//    数据一:nc
    val lines = ssc.socketTextStream("hadoop000",9999)
    lines.map(x=>{(x.split(",")(0),x)}).transform(rdd=> rdd.leftOuterJoin((data2))).filter(_._2._2.getOrElse(false) != true).map(_._2._1).print()

  ssc.start()
    ssc.awaitTermination()
  }
}

nc输入数据一:

[hadoop@hadoop000 ~]$ nc -lk 9999
www.ruozedata.com,9999
www.baidu.com,4444
www.ruozedata.com,8888
www.baidu.com,3333
www.ruozedata.com,9999
www.ruozedata.com,7777

IDEA 输出结果:

-------------------------------------------
Time: 1538102140000 ms
-------------------------------------------
www.ruozedata.com,9999
www.ruozedata.com,8888
www.ruozedata.com,9999
www.ruozedata.com,7777

实现相同的功能,用广播变量的方式,因为广播出去之后匹配没有shuffle,性能更高

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable.ListBuffer

object BroadcastStreamingApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("BroadcastStreamingApp")
    val ssc = new StreamingContext(conf,Seconds(5))
    //    数据二:RDD
    val input2 = new ListBuffer[(String,Boolean)]
    input2.append(("www.baidu.com",true))
    val data2 = ssc.sparkContext.parallelize(input2).collectAsMap()
    val broaddata2 = ssc.sparkContext.broadcast(data2)

    val lines = ssc.socketTextStream("hadoop000",9999)
    lines.map(x=>{(x.split(",")(0),x)})
      .transform(rdd=> {
        rdd.mapPartitions(partition => {
          val map = broaddata2.value
          for ((key, value) <- partition if !map.contains(key))
            yield value
        })
      }).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

相关文章

网友评论

      本文标题:(五)transform、Join的使用、DStream和RDD

      本文链接:https://www.haomeiwen.com/subject/otwigftx.html