Zuul 2.0 支持消息推送,例如送服务端发送消息给客户端。自持两种协议,WebSockets和Server Sent Events(SSE) 来发送推送消息。
官方也提供了示例sample app来使用WebSockets和SSE发送消息。
授权 Authentication
Zuul 推送服务器必须对进入的推送连接进行授权验证。可以在Zuul 推送服务器中以插件化的方式实现自己的认证。可以通过继承抽象类PushAuthHandler
实现该抽象类的doAuth()
方法。可以参考官方提供的实例SamplePushAuthHandler。
参考示例代码:
protected PushUserAuth doAuth(FullHttpRequest req) {
final Cookies cookies = parseCookies(req);
for (final Cookie c : cookies.getAll()) {
if(c.getName().equals("userAuthCookie")) {
final String customerId = c.getValue();
if (!Strings.isNullOrEmpty(customerId)) {
return new SamplePushUserAuth(customerId);
}
}
}
return new SamplePushUserAuth(HttpResponseStatus.UNAUTHORIZED.code());
}
该类的doAuth()
方法经过自己的处理后返回一个PushUserAuth接口的实例,该接口定义了三个方法:
public interface PushUserAuth {
boolean isSuccess();
int statusCode();
String getClientIdentity();
}
客户端注册和查找
在认证成功后,Zuul推送服务根据客户端或者用户身份注册记录每个授权的连接以便之后的查找和发送推送消息给指定的客户端或者用户。可以在认证通过后实现PushUserAuth
接口并且在上文提到的Handler类PushAuthHandler
类的doAuth()
方法中返回PushUserAuth
的实例。可以参考示例SamplePushUserAuth。
示例源码如下:
public class SamplePushUserAuth implements PushUserAuth {
private String customerId;
private int statusCode;
private SamplePushUserAuth(String customerId, int statusCode) {
this.customerId = customerId;
this.statusCode = statusCode;
}
// Successful auth
public SamplePushUserAuth(String customerId) {
this(customerId, 200);
}
// Failed auth
public SamplePushUserAuth(int statusCode) {
this("", statusCode);
}
@Override
public boolean isSuccess() {
return statusCode == 200;
}
@Override
public int statusCode() {
return statusCode;
}
@Override
public String getClientIdentity() {
return customerId;
}
每个Zuul的推送服务使用PushConnectionRegistry
管理了一个本地基于内存的与其连接的所有客户端注册表。对单个节点的推送集群来说,基于内存的本地记录是足够满足使用的。但是对于多节点推送集群的场景,除了内存注册表,还需要第二级别:即全局数据存储注册表,来将推送记录扩展单节点限制。在这种情况下查找一个特定的客户端需要两个步骤。
- 在全局数据存储注册表中查找特定客户端连接到的推送服务器
- 在返回的推送服务器中,通过其本地的内存推送记录可以查找到实际的客户端连接
可以通过继承PushRegistrationHandler并且覆盖registerClient()
方法来集成全局的数据数据推送注册表。Zuul推送服务插入任何类型的数据存储来作为你选择的全局数据存储注册表但是最好选择的数据存储支持如下的特性:
- 读低延迟
- TTL或者记录以某种顺序自动到期
- 支持分区
- 可复制
拥有这些特性意味着你的推送集群可以根据需求水平扩展到上百万的推送链接。Redis,Cassandra,Amazon DynamoDB都是不错的选择。
可以参考类SampleWebSocketPushRegistrationHandler和SampleSSEPushRegistrationHandler,看下如何让WebSocket和SSE连接与推送注册表集成。
这里以WebSocket连接和推送注册表的关联进行分析: - 首先初始化ChannelConfig和ChannelGroup,在类
BaseServerStartup
中。
public void init() throws Exception
{
ChannelConfig channelDeps = new ChannelConfig();
addChannelDependencies(channelDeps);
ChannelGroup clientChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
clientConnectionsShutdown = new ClientConnectionsShutdown(clientChannels,
GlobalEventExecutor.INSTANCE, discoveryClient);
portsToChannelInitializers = choosePortsAndChannels(clientChannels, channelDeps);
server = new Server(portsToChannelInitializers, serverStatusManager, clientConnectionsShutdown, eventLoopGroupMetrics);
}
- 根据初始化信息选择端口和通道(协议),在类
SampleServerStartup
中,这里为了方便演示,去掉与WebSocket无关代码。
protected Map<Integer, ChannelInitializer> choosePortsAndChannels(
ChannelGroup clientChannels,
ChannelConfig channelDependencies) {
Map<Integer, ChannelInitializer> portsToChannels = new HashMap<>();
int port = new DynamicIntProperty("zuul.server.port.main", 7001).get();
ChannelConfig channelConfig = BaseServerStartup.defaultChannelConfig();
int pushPort = new DynamicIntProperty("zuul.server.port.http.push", 7008).get();
ServerSslConfig sslConfig;
/* These settings may need to be tweaked depending if you're running behind an ELB HTTP listener, TCP listener,
* or directly on the internet.
*/
channelConfig.set(CommonChannelConfigKeys.allowProxyHeadersWhen, StripUntrustedProxyHeadersHandler.AllowWhen.NEVER);
channelConfig.set(CommonChannelConfigKeys.preferProxyProtocolForClientIp, true);
channelConfig.set(CommonChannelConfigKeys.isSSlFromIntermediary, false);
channelConfig.set(CommonChannelConfigKeys.withProxyProtocol, true);
channelDependencies.set(ZuulDependencyKeys.pushConnectionRegistry, pushConnectionRegistry);
portsToChannels.put(port, new SampleWebSocketPushChannelInitializer(port, channelConfig, channelDependencies, clientChannels));
logPortConfigured(port, null);
// port to accept push message from the backend, should be accessible on internal network only.
portsToChannels.put(pushPort, pushSenderInitializer);
logPortConfigured(pushPort, null);
return portsToChannels;
}
上面的倒数第四行代码,找到了SampleWebSocketPushChannelInitializer,把两个参数channelDependencies
和clientChannels
传给了类SampleWebSocketPushChannelInitializer
- 在SampleWebSocketPushChannelInitializer类添加推送处理
SampleWebSocketPushChannelInitializer继承了Netty的ChannelInitializer,在初始化的时候会调用addPushHandlers(final ChannelPipeline pipeline)方法,在示例的该方法中调用类SampleWebSocketPushRegistrationHandler
的构造方法,将第三步初设置的pushConnectionRegistry拿到,这里就将WebSocket协议和pushConnectionRegistry关联。
接受新的推送连接
SampleWebSocketPushChannelInitializer 和 SampleSSEPushChannelInitializer分别展示了安装Netty通道管线(channel pipeline)来接受进入的WebSocket和SSE连接。这些类基于使用的协议为每个请求安装认证和注册表处理器,其实就是上边的最后一步。
负载均衡器与WebSockets和SSE的对比
推送连接与通常的HTTP类型的请求/响应是不同的。推送链接是持久的和长期存活的(long-lived)。一旦连接被创建,即使是没有请求进入,也会被客户端和服务端保持打开状态。这对常见的负载均衡器来说就会有个问题:在一定的空闲时间后连接会被断开。Amazon的ELB以及低版本的HAProxy和Nginx都会有该问题。你有两种选择来让集群和负载均衡器一起工作:
1.使用最新版本的支持WebSocket代理的负载均衡器如最新版本的HAProxy或者Nginx或者Amazon的ALB。
2.运行存在的负载均衡器作为4层网络协议的TCP负载均衡器,而不是7层网络协议的HTTP负载均衡器。多数负载均衡器,包括ELB,都支持作为TCP负载均衡器的模式。在这种模式下,它们只是代理TCP包的流入流出,不会对出现这些问题的应用层协议做解析或者阻断。
你也很可能需要增加负载均衡器的空闲超值,因为默认空间超时值通常以秒为单位这对典型的长时间存活、持久性和激活没有推送连接的情况是不够用的。
配置选项
名称 | 描述 | 默认值 |
---|---|---|
zuul.push.registry.ttl.seconds | Record expiry (TTL) of a client registration record in the global registry: 记录在全局注册表中客户注册记录的超时时间 | 1800 s |
zuul.push.reconnect.dither.seconds | Randomization window for each client's max connection lifetime. Helps in spreading subsequent client reconnects across time:每个客户端最大连接生存期的随机化窗口,有助于随后时间客户端重连 | 180 s |
zuul.push.client.close.grace.period | Number of seconds the server will wait for the client to close the connection before it closes it forcefully from its side:服务端在强制关掉其自身一端的连接之前等待客户端的时间 | 4 s |
如果使用了Netflix OSS Auchaius模块,则可以在运行时修改上表的所有配置,服务不用重启配置就会立即生效。
网友评论