Spark Streaming 有时候需要使用 nc -lk 9999
开启一个终端来手动键入一些数据供 Streaming 来拉取数据,这种方式不能很好地模拟实时流,所以使用 Perl 6 的 react .. whenever
和 Promise
来搞很合适:
my $vin = 'LSJA0000000000091';
my $last_meter = 0; # 当前里程数
react {
whenever IO::Socket::Async.listen('0.0.0.0', 3333) -> $conn {
loop {
react {
whenever Supply.interval(1) {
$conn.print: sprintf("\{'vin':'%s','createTime':%s,'mileage':%s}\n", $vin, DateTime.now.posix, $last_meter);
}
whenever Promise.in(5) {
done;
}
whenever signal(SIGINT) {
say "Done.";
done;
}
}
sleep 10;
}
}
CATCH {
default {
say .^name, ': ', .Str;
say "handled in $?LINE";
}
}
}
上面的代码会不断发送
{'vin':'LSJA0000000000091','createTime':1542358572,'mileage':0}
这样的带当前时间戳的数据。怎么验证能不能接收到数据呢?使用 telnet
:
telnet 0.0.0.0 3333
或者拷贝一个 Streaming Demo:
package com.github.yuvalitzchakov.structuredstateful
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
/**
*
*/
object readSocket {
def main(args: Array[String]): Unit = {
val host = "127.0.0.1"
val port = 3333
val spark: SparkSession = SparkSession.builder
.master("local[*]")
.appName("Stateful Structured Streaming")
.getOrCreate()
import spark.implicits._
val ds: Dataset[String] = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.load()
.as[String]
ds.writeStream
.outputMode(OutputMode.Append())
.trigger(Trigger.ProcessingTime("2 seconds"))
.format("console")
.option("truncate", "false") // 不截断显示
.start()
.awaitTermination()
}
}
网友评论