WebSocket 简介
WebSocket 协议是完全重新设计的协议,旨在为Web 上的双向数据传输问题提供一个切实可行的解决方案,使得客户端和服务器之间可以在任意时刻传输消息,因此,这也就要求它们异步地处理消息回执.
我们的WebSocket 示例应用程序
WebSocket 应用程序逻辑
添加WebSocket 支持
服务器逻辑
-
处理HTTP 请求
HttpRequestHandler 对应的代码
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private final String wsUri;
private static final File INDEX;
static {
URL location = HttpRequestHandler.class
.getProtectionDomain()
.getCodeSource().getLocation();
try {
String path = location.toURI() + "index.html";
path = !path.contains("file:") ? path : path.substring(5);
INDEX = new File(path);
} catch (URISyntaxException e) {
throw new IllegalStateException("Unable to locate index.html", e);
}
}
public HttpRequestHandler(String wsUri) {
this.wsUri = wsUri;
}
@Override
public void channelRead0(ChannelHandlerContext ctx,FullHttpRequest request) throws Exception {
if (wsUri.equalsIgnoreCase(request.getUri())) {
ctx.fireChannelRead(request.retain()); // 如果请求了WebSocket协议升级,则增加引用计数(调用retain()方法),并将它传递给下一个
ChannelInboundHandler
} else {
if (HttpHeaders.is100ContinueExpected(request)) { // 处理100 Continue请求以符合HTTP1.1 规范
send100Continue(ctx);
}
RandomAccessFile file = new RandomAccessFile(INDEX, "r"); // 读取index.html
HttpResponse response = new DefaultHttpResponse(
request.getProtocolVersion(), HttpResponseStatus.OK);
response.headers().set(HttpHeaders.Names.CONTENT_TYPE,"text/plain; charset=UTF-8");
boolean keepAlive = HttpHeaders.isKeepAlive(request);
if (keepAlive) {
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
response.headers().set( HttpHeaders.Names.CONNECTION,HttpHeaders.Values.KEEP_ALIVE);
}
ctx.write(response); // 将HttpResponse写到客户端
if (ctx.pipeline().get(SslHandler.class) == null) {
ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length())); // 将index.html写到客户端
} else {
ctx.write(new ChunkedNioFile(file.getChannel()));
}
ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); // 写LastHttpContent并冲刷至客户端
if (!keepAlive) { // 如果没有请求keep-alive,则在写操作完成后关闭Channel
future.addListener(ChannelFutureListener.CLOSE);
}
}
}
private static void send100Continue(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
如果该HTTP 请求指向了地址为/ws 的URI,那么HttpRequestHandler 将调用FullHttp-Request 对象上的retain()方法,并通过调用fireChannelRead(msg)方法将它转发给下一个ChannelInboundHandler。之所以需要调用retain()方法,是因为调用channelRead()方法完成之后,它将调用FullHttpRequest 对象上的release()方法以释放它的资源。
- 处理WebSocket 帧
WebSocketFrame 的类型
处理TextWebSocketFrame 的ChannelInboundHandler,其还将在它的ChannelGroup 中跟踪所有活动的WebSocket 连接。
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private final ChannelGroup group;
public TextWebSocketFrameHandler(ChannelGroup group) {
this.group = group;
}
@Override // 重写userEventTriggered()方法以处理自定义事件
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 如果该事件表示握手成功,则从该Channelipeline中移除HttpRequestHandler,因为将不会接收到任何HTTP 消息了
if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
ctx.pipeline().remove(HttpRequestHandler.class);
// 通知所有已经连接的WebSocket 客户端新的客户端已经连接上了
group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined"));
// 将新的WebSocket Channel添加到ChannelGroup 中,以便它可以接收到所有的消息
group.add(ctx.channel());
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelRead0(ChannelHandlerContext ctx,TextWebSocketFrame msg) throws Exception {
group.writeAndFlush(msg.retain()); // 增加消息的引用计数,并将它写到ChannelGroup 中所有已经连接的客户端
}
}
和之前一样,对于retain()方法的调用是必需的,因为当channelRead0()方法返回时,TextWebSocketFrame 的引用计数将会被减少。由于所有的操作都是异步的,因此,writeAndFlush()方法可能会在channelRead0()方法返回之后完成,而且它绝对不能访问一个已经失效的引用。
- 初始化ChannelPipeline
public class ChatServerInitializer extends ChannelInitializer<Channel> {
private final ChannelGroup group;
public ChatServerInitializer(ChannelGroup group) {
this.group = group;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
pipeline.addLast(new HttpRequestHandler("/ws"));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new TextWebSocketFrameHandler(group));
}
}
对于initChannel()方法的调用,通过安装所有必需的ChannelHandler 来设置该新注册的Channel 的ChannelPipeline。
基于WebSocket 聊天服务器的ChannelHandler
Netty 的WebSocketServerProtocolHandler 处理了所有委托管理的WebSocket 帧类型以及升级握手本身。如果握手成功,那么所需的ChannelHandler 将会被添加到ChannelPipeline中,而那些不再需要的ChannelHandler 则将会被移除。
WebSocket 协议升级之前的ChannelPipeline
WebSocket 协议升级完成之后的ChannelPipeline
- 引导
public class ChatServer {
private final ChannelGroup channelGroup =
new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
private final EventLoopGroup group = new NioEventLoopGroup();
private Channel channel;
public ChannelFuture start(InetSocketAddress address) {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(createInitializer(channelGroup));
ChannelFuture future = bootstrap.bind(address);
future.syncUninterruptibly();
channel = future.channel();
return future;
}
protected ChannelInitializer<Channel> createInitializer(
ChannelGroup group) {
return new ChatServerInitializer(group);
}
public void destroy() {
if (channel != null) {
channel.close();
}
channelGroup.close();
group.shutdownGracefully();
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("Please give port as argument");
System.exit(1);
}
int port = Integer.parseInt(args[0]);
final ChatServer endpoint = new ChatServer();
ChannelFuture future = endpoint.start(
new InetSocketAddress(port));
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
endpoint.destroy();
}
});
future.channel().closeFuture().syncUninterruptibly();
}
}
如何进行加密
public class SecureChatServerInitializer extends ChatServerInitializer {
private final SslContext context;
public SecureChatServerInitializer(ChannelGroup group, SslContext context) {
super(group);
this.context = context;
}
@Override
protected void initChannel(Channel ch) throws Exception {
super.initChannel(ch);
SSLEng.ine engine = context.newEngine(ch.alloc());
engine.setUseClientMode(false);
ch.pipeline().addFirst(new SslHandler(engine));
}
}
package com.hehe.netty.bootstrap;
import com.hehe.netty.initializer.SecureChatServerInitializer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import java.net.InetSocketAddress;
public class SecureChatServer extends ChatServer {
private final SslContext context;
public SecureChatServer(SslContext context) {
this.context = context;
}
@Override
protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
return new SecureChatServerInitializer(group, context); // 返回之前创建的SecureChatServerInitializer 以启用加密
}
public static void main(String[] args) throws Exception {
// if (args.length != 1) {
// System.err.println("Please give port as argument");
// System.exit(1);
// }
int port = 9999;
// int port = Integer.parseInt(args[0]);
SelfSignedCertificate cert = new SelfSignedCertificate();
SslContext context = SslContext.newServerContext(cert.certificate(), cert.privateKey());
final SecureChatServer endpoint = new SecureChatServer(context);
ChannelFuture future = endpoint.start(new InetSocketAddress(port));
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
endpoint.destroy();
}
});
future.channel().closeFuture().syncUninterruptibly();
}
}
网友评论