服务器发送的事件(SSE)是一个用于将通知从 HTTP 服务器推送到客户端轻量级、标准化协议。与提供双向通信的 WebSocket 相比, SSE 只允许从服务器到客户端的单向通信。如果这是你所需要的, SSE 的优点是要简单得多, 只能依赖于 HTTP, 并提供浏览器中断的连接的重试语义。
根据 SSE 规范, 客户端可以通过 HTTP 从服务器请求事件流。服务器使用具有固定字符编码 UTF-8 的媒体类型text/event-stream
进行响应, 并保持响应打开, 以便在可用时将事件发送到客户端。事件是文本结构, 它持有字段并以空行终止, 例如
data: { "username": "John Doe" }
event: added
id: 42
data: another event
重新连接后,客户端可以选择发送Last-Event-ID
(标识最后一个已接受事件)头部给服务器。
模型
Akka HTTP 将事件流表示为Source[ServerSentEvent, NotUsed]
, 其中 ServerSentEvent 是具有以下只读属性的样例类:
-
data: String
– 实际有效载荷, 可能跨越多行 -
eventType
: Option[String] – 可选限定符, 例如. “added”, “removed”, 等等. -
id: Option[String]
– 可选标识符 -
retry
: Option[Int] – 可选的重新连接延迟 (毫秒)
根据 SSE 规范Akka HTTP 还提供了Last-Event-ID
头部和text/event-stream
媒体类型。
服务器端用法: 编组
为了响应带有事件流的 HTTP 请求, 必须将 EventStreamMarshalling 定义的 ToResponseMarshaller[Source[ServerSentEvent, Any]] 隐式引入到各自路由定义的范围中:
import akka.NotUsed
import akka.stream.scaladsl.Source
import akka.http.scaladsl.Http
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.model.sse.ServerSentEvent
import scala.concurrent.duration._
import java.time.LocalTime
import java.time.format.DateTimeFormatter.ISO_LOCAL_TIME
def route: Route = {
import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._
path("events") {
get {
complete {
Source
.tick(2.seconds, 2.seconds, NotUsed)
.map(_ => LocalTime.now())
.map(time => ServerSentEvent(ISO_LOCAL_TIME.format(time)))
.keepAlive(1.second, () => ServerSentEvent.heartbeat)
}
}
}
}
客户端用法:解组
为了解组作为Source[ServerSentEvent, NotUsed]
的事件流, 必须将 EventStreamUnmarshalling
定义的 FromEntityUnmarshaller[Source[ServerSentEvent, NotUsed]]
隐式引入到范围中:
import akka.NotUsed
import akka.stream.scaladsl.Source
import akka.http.scaladsl.Http
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.model.sse.ServerSentEvent
import scala.concurrent.duration._
import java.time.LocalTime
import java.time.format.DateTimeFormatter.ISO_LOCAL_TIME
def route: Route = {
import akka.http.scaladsl.marshalling.sse.EventStreamUnmarshalling._
path("events") {
get {
complete {
Source
.tick(2.seconds, 2.seconds, NotUsed)
.map(_ => LocalTime.now())
.map(time => ServerSentEvent(ISO_LOCAL_TIME.format(time)))
.keepAlive(1.second, () => ServerSentEvent.heartbeat)
}
}
}
}
请注意, 如果您正在寻找一种能够永久订阅事件流的弹性方法, Alpakka 提供的 EventSource 连接器可以使用上次收到的事件 id 自动重新连接。
网友评论