发送数据到 socket

作者: 焉知非鱼 | 来源:发表于2018-11-16 17:10 被阅读0次

Spark Streaming 有时候需要使用 nc -lk 9999 开启一个终端来手动键入一些数据供 Streaming 来拉取数据,这种方式不能很好地模拟实时流,所以使用 Perl 6 的 react .. wheneverPromise 来搞很合适:

my $vin = 'LSJA0000000000091';
my $last_meter = 0;             # 当前里程数

react {
    whenever IO::Socket::Async.listen('0.0.0.0', 3333) -> $conn {
        loop {
            react {
                whenever Supply.interval(1) {
                    $conn.print: sprintf("\{'vin':'%s','createTime':%s,'mileage':%s}\n", $vin, DateTime.now.posix, $last_meter);
                }
        
                whenever Promise.in(5) {
                    done;
                }
            
                whenever signal(SIGINT) {
                    say "Done.";
                    done;
                }
            } 
        sleep 10;
        }
    }
    CATCH {
        default {
            say .^name, ': ', .Str;
            say "handled in $?LINE";
        }
    }
}

上面的代码会不断发送

{'vin':'LSJA0000000000091','createTime':1542358572,'mileage':0}

这样的带当前时间戳的数据。怎么验证能不能接收到数据呢?使用 telnet

telnet 0.0.0.0 3333

或者拷贝一个 Streaming Demo:

package com.github.yuvalitzchakov.structuredstateful

import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}


/**
  * 
  */
object readSocket {

  def main(args: Array[String]): Unit = {

    val host = "127.0.0.1"
    val port = 3333

    val spark: SparkSession = SparkSession.builder
      .master("local[*]")
      .appName("Stateful Structured Streaming")
      .getOrCreate()

    import spark.implicits._

    val ds: Dataset[String] = spark.readStream
      .format("socket")
      .option("host", host)
      .option("port", port)
      .load()
      .as[String]

    ds.writeStream
      .outputMode(OutputMode.Append())
      .trigger(Trigger.ProcessingTime("2 seconds"))
      .format("console")
      .option("truncate", "false") // 不截断显示
      .start()
      .awaitTermination()
  }
}

相关文章

  • 发送数据到 socket

    Spark Streaming 有时候需要使用 nc -lk 9999 开启一个终端来手动键入一些数据供 Stre...

  • 多播的发送与接收

    多播发送 流程 创建socket 发送数据(是多播地址发送数据) 实例 多播接收 流程 创建socket 将套接字...

  • 深入理解 Socket 缓冲区

    网络编程离不开 Socket,Socket 就是发送和接收网络数据,Socket 有发送缓冲也有接收缓冲,这些缓冲...

  • 网络编程3

    1.1 udp网络程序-发送数据 Socket函数 mySocket = socket(family, type...

  • python 网络编程②

    一.udp网络程序-发送数据 socket函数 mySocket = socket(family, type) 函...

  • iOS中NSStream实现发送和接受数据

    实现NSSstream发送和接受数据 CFStream进行socket通信

  • UDP

    好文推荐# 【Java TCP/IP Socket】UDP Socket(含代码)黑马程序员——UDP发送数据,接...

  • Socket,你需要知道的事儿

    what is socket socket作为一种抽象层,应用程序通过它来发送和接收数据,使用socket可以将应...

  • AsynSocket 源码解析之二

    CocoaAsyncSocket 源码学习摘要: GCDAsynSocket 读取socket数据(接收对方发送过...

  • 10.socket缓冲区以及阻塞模式

    在《socket数据的接收和发送》一节中讲到,可以使用 write()/send() 函数发送数据,使用 read...

网友评论

    本文标题:发送数据到 socket

    本文链接:https://www.haomeiwen.com/subject/brnmfqtx.html