发送数据到 socket

作者: 焉知非鱼 | 来源:发表于2018-11-16 17:10 被阅读0次

    Spark Streaming 有时候需要使用 nc -lk 9999 开启一个终端来手动键入一些数据供 Streaming 来拉取数据,这种方式不能很好地模拟实时流,所以使用 Perl 6 的 react .. wheneverPromise 来搞很合适:

    my $vin = 'LSJA0000000000091';
    my $last_meter = 0;             # 当前里程数
    
    react {
        whenever IO::Socket::Async.listen('0.0.0.0', 3333) -> $conn {
            loop {
                react {
                    whenever Supply.interval(1) {
                        $conn.print: sprintf("\{'vin':'%s','createTime':%s,'mileage':%s}\n", $vin, DateTime.now.posix, $last_meter);
                    }
            
                    whenever Promise.in(5) {
                        done;
                    }
                
                    whenever signal(SIGINT) {
                        say "Done.";
                        done;
                    }
                } 
            sleep 10;
            }
        }
        CATCH {
            default {
                say .^name, ': ', .Str;
                say "handled in $?LINE";
            }
        }
    }
    

    上面的代码会不断发送

    {'vin':'LSJA0000000000091','createTime':1542358572,'mileage':0}
    

    这样的带当前时间戳的数据。怎么验证能不能接收到数据呢?使用 telnet

    telnet 0.0.0.0 3333
    

    或者拷贝一个 Streaming Demo:

    package com.github.yuvalitzchakov.structuredstateful
    
    import org.apache.spark.sql._
    import org.apache.spark.sql.streaming.{OutputMode, Trigger}
    
    
    /**
      * 
      */
    object readSocket {
    
      def main(args: Array[String]): Unit = {
    
        val host = "127.0.0.1"
        val port = 3333
    
        val spark: SparkSession = SparkSession.builder
          .master("local[*]")
          .appName("Stateful Structured Streaming")
          .getOrCreate()
    
        import spark.implicits._
    
        val ds: Dataset[String] = spark.readStream
          .format("socket")
          .option("host", host)
          .option("port", port)
          .load()
          .as[String]
    
        ds.writeStream
          .outputMode(OutputMode.Append())
          .trigger(Trigger.ProcessingTime("2 seconds"))
          .format("console")
          .option("truncate", "false") // 不截断显示
          .start()
          .awaitTermination()
      }
    }
    

    相关文章

      网友评论

        本文标题:发送数据到 socket

        本文链接:https://www.haomeiwen.com/subject/brnmfqtx.html