美文网首页半栈工程师
Akka HTTP之服务器发送的事件支持

Akka HTTP之服务器发送的事件支持

作者: 乐言笔记 | 来源:发表于2017-10-25 16:32 被阅读124次

    服务器发送的事件(SSE)是一个用于将通知从 HTTP 服务器推送到客户端轻量级、标准化协议。与提供双向通信的 WebSocket 相比, SSE 只允许从服务器到客户端的单向通信。如果这是你所需要的, SSE 的优点是要简单得多, 只能依赖于 HTTP, 并提供浏览器中断的连接的重试语义。

    根据 SSE 规范, 客户端可以通过 HTTP 从服务器请求事件流。服务器使用具有固定字符编码 UTF-8 的媒体类型text/event-stream进行响应, 并保持响应打开, 以便在可用时将事件发送到客户端。事件是文本结构, 它持有字段并以空行终止, 例如

    data: { "username": "John Doe" }
    event: added
    id: 42
    
    data: another event
    

    重新连接后,客户端可以选择发送Last-Event-ID(标识最后一个已接受事件)头部给服务器。

    模型

    Akka HTTP 将事件流表示为Source[ServerSentEvent, NotUsed], 其中 ServerSentEvent 是具有以下只读属性的样例类:

    • data: String – 实际有效载荷, 可能跨越多行
    • eventType: Option[String] – 可选限定符, 例如. “added”, “removed”, 等等.
    • id: Option[String] – 可选标识符
    • retry: Option[Int] – 可选的重新连接延迟 (毫秒)

    根据 SSE 规范Akka HTTP 还提供了Last-Event-ID头部和text/event-stream媒体类型。

    服务器端用法: 编组

    为了响应带有事件流的 HTTP 请求, 必须将 EventStreamMarshalling 定义的 ToResponseMarshaller[Source[ServerSentEvent, Any]] 隐式引入到各自路由定义的范围中:

    import akka.NotUsed
    import akka.stream.scaladsl.Source
    
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.unmarshalling.Unmarshal
    import akka.http.scaladsl.model.sse.ServerSentEvent
    import scala.concurrent.duration._
    
    import java.time.LocalTime
    import java.time.format.DateTimeFormatter.ISO_LOCAL_TIME
    
    def route: Route = {
      import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._
    
      path("events") {
        get {
          complete {
            Source
              .tick(2.seconds, 2.seconds, NotUsed)
              .map(_ => LocalTime.now())
              .map(time => ServerSentEvent(ISO_LOCAL_TIME.format(time)))
              .keepAlive(1.second, () => ServerSentEvent.heartbeat)
          }
        }
      }
    }
    

    客户端用法:解组

    为了解组作为Source[ServerSentEvent, NotUsed]的事件流, 必须将 EventStreamUnmarshalling 定义的 FromEntityUnmarshaller[Source[ServerSentEvent, NotUsed]] 隐式引入到范围中:

    import akka.NotUsed
    import akka.stream.scaladsl.Source
    
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.unmarshalling.Unmarshal
    import akka.http.scaladsl.model.sse.ServerSentEvent
    import scala.concurrent.duration._
    
    import java.time.LocalTime
    import java.time.format.DateTimeFormatter.ISO_LOCAL_TIME
    
    def route: Route = {
      import akka.http.scaladsl.marshalling.sse.EventStreamUnmarshalling._
    
      path("events") {
        get {
          complete {
            Source
              .tick(2.seconds, 2.seconds, NotUsed)
              .map(_ => LocalTime.now())
              .map(time => ServerSentEvent(ISO_LOCAL_TIME.format(time)))
              .keepAlive(1.second, () => ServerSentEvent.heartbeat)
          }
        }
      }
    }
    

    请注意, 如果您正在寻找一种能够永久订阅事件流的弹性方法, Alpakka 提供的 EventSource 连接器可以使用上次收到的事件 id 自动重新连接。

    相关文章

      网友评论

        本文标题:Akka HTTP之服务器发送的事件支持

        本文链接:https://www.haomeiwen.com/subject/tipxpxtx.html