美文网首页
Netty学习--WebSocket

Netty学习--WebSocket

作者: 何何与呵呵呵 | 来源:发表于2019-01-25 11:33 被阅读0次
WebSocket 简介

WebSocket 协议是完全重新设计的协议,旨在为Web 上的双向数据传输问题提供一个切实可行的解决方案,使得客户端和服务器之间可以在任意时刻传输消息,因此,这也就要求它们异步地处理消息回执.

我们的WebSocket 示例应用程序
WebSocket 应用程序逻辑
添加WebSocket 支持
服务器逻辑
  • 处理HTTP 请求
    HttpRequestHandler 对应的代码
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    private final String wsUri;
    private static final File INDEX;
    static {
        URL location = HttpRequestHandler.class
            .getProtectionDomain()
            .getCodeSource().getLocation();
        try {
            String path = location.toURI() + "index.html";
            path = !path.contains("file:") ? path : path.substring(5);
            INDEX = new File(path);
        } catch (URISyntaxException e) {
            throw new IllegalStateException("Unable to locate index.html", e);
        }
    }
    public HttpRequestHandler(String wsUri) {
        this.wsUri = wsUri;
    }
    @Override
    public void channelRead0(ChannelHandlerContext ctx,FullHttpRequest request) throws Exception {
        if (wsUri.equalsIgnoreCase(request.getUri())) {
            ctx.fireChannelRead(request.retain()); // 如果请求了WebSocket协议升级,则增加引用计数(调用retain()方法),并将它传递给下一个
ChannelInboundHandler
        } else {
            if (HttpHeaders.is100ContinueExpected(request)) { // 处理100 Continue请求以符合HTTP1.1 规范
                send100Continue(ctx);
            }
            RandomAccessFile file = new RandomAccessFile(INDEX, "r"); // 读取index.html
            HttpResponse response = new DefaultHttpResponse(
            request.getProtocolVersion(), HttpResponseStatus.OK);
            response.headers().set(HttpHeaders.Names.CONTENT_TYPE,"text/plain; charset=UTF-8");
            boolean keepAlive = HttpHeaders.isKeepAlive(request);
            if (keepAlive) {
                response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
                response.headers().set( HttpHeaders.Names.CONNECTION,HttpHeaders.Values.KEEP_ALIVE);
            }
            ctx.write(response); // 将HttpResponse写到客户端
            if (ctx.pipeline().get(SslHandler.class) == null) {
                ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length())); // 将index.html写到客户端
            } else {
                ctx.write(new ChunkedNioFile(file.getChannel()));
            }
            ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); // 写LastHttpContent并冲刷至客户端
            if (!keepAlive) { // 如果没有请求keep-alive,则在写操作完成后关闭Channel
                future.addListener(ChannelFutureListener.CLOSE);
            }
        }
    }
    private static void send100Continue(ChannelHandlerContext ctx) {
        FullHttpResponse response = new DefaultFullHttpResponse(
        HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
        ctx.writeAndFlush(response);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

如果该HTTP 请求指向了地址为/ws 的URI,那么HttpRequestHandler 将调用FullHttp-Request 对象上的retain()方法,并通过调用fireChannelRead(msg)方法将它转发给下一个ChannelInboundHandler。之所以需要调用retain()方法,是因为调用channelRead()方法完成之后,它将调用FullHttpRequest 对象上的release()方法以释放它的资源。

  • 处理WebSocket 帧
WebSocketFrame 的类型

处理TextWebSocketFrame 的ChannelInboundHandler,其还将在它的ChannelGroup 中跟踪所有活动的WebSocket 连接。

public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    private final ChannelGroup group;
    public TextWebSocketFrameHandler(ChannelGroup group) {
        this.group = group;
    }
    @Override // 重写userEventTriggered()方法以处理自定义事件
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        // 如果该事件表示握手成功,则从该Channelipeline中移除HttpRequestHandler,因为将不会接收到任何HTTP 消息了
        if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
            ctx.pipeline().remove(HttpRequestHandler.class);
            // 通知所有已经连接的WebSocket 客户端新的客户端已经连接上了
            group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined"));
            // 将新的WebSocket Channel添加到ChannelGroup 中,以便它可以接收到所有的消息
            group.add(ctx.channel());
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
    @Override
    public void channelRead0(ChannelHandlerContext ctx,TextWebSocketFrame msg) throws Exception {
        group.writeAndFlush(msg.retain()); // 增加消息的引用计数,并将它写到ChannelGroup 中所有已经连接的客户端
    }
}

和之前一样,对于retain()方法的调用是必需的,因为当channelRead0()方法返回时,TextWebSocketFrame 的引用计数将会被减少。由于所有的操作都是异步的,因此,writeAndFlush()方法可能会在channelRead0()方法返回之后完成,而且它绝对不能访问一个已经失效的引用。

  • 初始化ChannelPipeline
public class ChatServerInitializer extends ChannelInitializer<Channel> {
    private final ChannelGroup group;
    public ChatServerInitializer(ChannelGroup group) {
        this.group = group;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpObjectAggregator(64 * 1024));
        pipeline.addLast(new HttpRequestHandler("/ws"));
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        pipeline.addLast(new TextWebSocketFrameHandler(group));
    }
}

对于initChannel()方法的调用,通过安装所有必需的ChannelHandler 来设置该新注册的Channel 的ChannelPipeline。

基于WebSocket 聊天服务器的ChannelHandler

Netty 的WebSocketServerProtocolHandler 处理了所有委托管理的WebSocket 帧类型以及升级握手本身。如果握手成功,那么所需的ChannelHandler 将会被添加到ChannelPipeline中,而那些不再需要的ChannelHandler 则将会被移除。

WebSocket 协议升级之前的ChannelPipeline
WebSocket 协议升级完成之后的ChannelPipeline
  • 引导
public class ChatServer {

    private final ChannelGroup channelGroup =
            new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
    private final EventLoopGroup group = new NioEventLoopGroup();
    private Channel channel;
    public ChannelFuture start(InetSocketAddress address) {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(group)
                .channel(NioServerSocketChannel.class)
                .childHandler(createInitializer(channelGroup));
        ChannelFuture future = bootstrap.bind(address);
        future.syncUninterruptibly();
        channel = future.channel();
        return future;
    }
    protected ChannelInitializer<Channel> createInitializer(
            ChannelGroup group) {
        return new ChatServerInitializer(group);
    }
    public void destroy() {
        if (channel != null) {
            channel.close();
        }
        channelGroup.close();
        group.shutdownGracefully();
    }
    public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            System.err.println("Please give port as argument");
            System.exit(1);
        }
        int port = Integer.parseInt(args[0]);
        final ChatServer endpoint = new ChatServer();
        ChannelFuture future = endpoint.start(
                new InetSocketAddress(port));
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                endpoint.destroy();
            }
        });
        future.channel().closeFuture().syncUninterruptibly();
    }
}

测试地址入口

如何进行加密
public class SecureChatServerInitializer extends ChatServerInitializer {
    private final SslContext context;
    public SecureChatServerInitializer(ChannelGroup group, SslContext context) {
        super(group);
        this.context = context;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        super.initChannel(ch);
        SSLEng.ine engine = context.newEngine(ch.alloc());
        engine.setUseClientMode(false);
        ch.pipeline().addFirst(new SslHandler(engine));
    }
}
package com.hehe.netty.bootstrap;

import com.hehe.netty.initializer.SecureChatServerInitializer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.util.SelfSignedCertificate;

import java.net.InetSocketAddress;

public class SecureChatServer extends ChatServer {

    private final SslContext context;
    public SecureChatServer(SslContext context) {
        this.context = context;
    }
    @Override
    protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
        return new SecureChatServerInitializer(group, context); // 返回之前创建的SecureChatServerInitializer 以启用加密
    }
    public static void main(String[] args) throws Exception {
//        if (args.length != 1) {
//            System.err.println("Please give port as argument");
//            System.exit(1);
//        }
        int port = 9999;
//        int port = Integer.parseInt(args[0]);
        SelfSignedCertificate cert = new SelfSignedCertificate();
        SslContext context = SslContext.newServerContext(cert.certificate(), cert.privateKey());
        final SecureChatServer endpoint = new SecureChatServer(context);
        ChannelFuture future = endpoint.start(new InetSocketAddress(port));
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                endpoint.destroy();
            }
        });
        future.channel().closeFuture().syncUninterruptibly();
    }

}

相关文章

网友评论

      本文标题:Netty学习--WebSocket

      本文链接:https://www.haomeiwen.com/subject/rqsujqtx.html