创建流式的dataset和dataframe
流式dataframe可以通过DataStreamReader接口来创建,DataStreamReader对象是通过SparkSession的readStream()方法返回的。与创建静态dataframe的read()方法类似,我们可以指定数据源的一些配置信息,比如data format、schema、option等。spark 2.0中初步提供了一些内置的source支持。
- file source
以数据流的方式读取一个目录中的文件。支持text、csv、json、parquet等文件类型。文件必须是被移动到目录中的,比如用mv命令。 - socket source
从socket连接中读取文本内容。driver是负责监听请求的server socket。socket source只能被用来进行测试。
代码
val socketDF = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
socketDF.isStreaming
socketDF.printSchema
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
.readStream
.option("sep", ";")
.schema(userSchema)
.csv("/path/to/directory")
上面的例子都是产生untyped类型的dataframe,这就意味着在编译时是无法检查其schema的,只有在计算被提交并运行时才会进行检查。一些操作,比如map、flatMap等,需要在编译时就知道具体的类型。为了使用一些typed类型的操作,我们可以将dataframe转换为typed类型的dataset,比如df.as[String]。
网友评论