可以继承Receiver类来实现自定义采集器,需要实现方法如下:
-
onStart
:接收器启动方法 -
onStop
:接收器停止方法
同时还要指定Receiver的缓存等级
-
代码实现
class MyReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) { var socket: Socket = _ override def onStart(): Unit = { new Thread(new Runnable { override def run(): Unit = { receive() } }).start() } override def onStop(): Unit = { if(socket != null) { socket.close() socket = null } } def receive() { socket = new Socket(host, port) val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8)) var line: String = null while ((line = reader.readLine()) != null) { this.store(line) } } }
-
使用自定义接收器
import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} object MyReceiverDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[4]").setAppName("MyReceiverDemo") val streamingContext = new StreamingContext(conf, Seconds(5)) // 使用StreamingContext对象的receiverStream方法,指定自定义Receiver接收数据 val receiverDStream = streamingContext.receiverStream(new MyReceiver("192.168.0.100", 9999)) val flatMapDStream = receiverDStream.flatMap(_.split(" ")) val mapDStream = flatMapDStream.map((_, 1)) val reduceByKeyDStream = mapDStream.reduceByKey(_ + _) reduceByKeyDStream.print() streamingContext.start() streamingContext.awaitTermination() } }
网友评论