美文网首页
203、Spark 2.0之Structured Streami

203、Spark 2.0之Structured Streami

作者: ZFH__ZJ | 来源:发表于2019-02-12 17:03 被阅读0次

    创建流式的dataset和dataframe

    流式dataframe可以通过DataStreamReader接口来创建,DataStreamReader对象是通过SparkSession的readStream()方法返回的。与创建静态dataframe的read()方法类似,我们可以指定数据源的一些配置信息,比如data format、schema、option等。spark 2.0中初步提供了一些内置的source支持。

    1. file source
      以数据流的方式读取一个目录中的文件。支持text、csv、json、parquet等文件类型。文件必须是被移动到目录中的,比如用mv命令。
    2. socket source
      从socket连接中读取文本内容。driver是负责监听请求的server socket。socket source只能被用来进行测试。

    代码

    val socketDF = spark
        .readStream
        .format("socket")
        .option("host", "localhost")
        .option("port", 9999)
        .load()
    
    socketDF.isStreaming    
    socketDF.printSchema 
    
    val userSchema = new StructType().add("name", "string").add("age", "integer")
    val csvDF = spark
        .readStream
        .option("sep", ";")
        .schema(userSchema)      
        .csv("/path/to/directory")    
    

    上面的例子都是产生untyped类型的dataframe,这就意味着在编译时是无法检查其schema的,只有在计算被提交并运行时才会进行检查。一些操作,比如map、flatMap等,需要在编译时就知道具体的类型。为了使用一些typed类型的操作,我们可以将dataframe转换为typed类型的dataset,比如df.as[String]。

    相关文章

      网友评论

          本文标题:203、Spark 2.0之Structured Streami

          本文链接:https://www.haomeiwen.com/subject/cygceqtx.html