美文网首页
websocket介绍和demo

websocket介绍和demo

作者: guideEmotion | 来源:发表于2019-05-19 10:01 被阅读0次

一 介绍

参考:WebSocket的故事(一)—— WebSocket的由来
websocket的特点

  1. 服务端可以主动推送信息给客户端
  2. Websocket只需要一次HTTP交互,来进行协议上的切换,整个通讯过程是建立在一次连接/状态中,也就避免了HTTP的无状态性,服务端会一直知道你的信息,直到你关闭请求,这样就解决了服务端要反复解析HTTP请求头的问题
  3. 数据格式比较轻量,性能开销小,通信高效。可以发送*文本,也可以发送二进制数据
  4. 没有同源限制,客户端可以与任意服务器通信。

二 建立过程

2.1 请求报文

WebSocket的握手使用HTTP来实现,客户端发送带有Upgrade头的HTTP Request消息。服务端根据请求,做Response。
请求报文:

GET wss://www.example.cn/webSocket HTTP/1.1
Host: www.example.cn
Connection: Upgrade
Upgrade: websocket
Sec-WebSocket-Version: 13
Origin: http://example.cn
Sec-WebSocket-Key: afmbhhBRQuwCLmnWDRWHxw==
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits

  • 第1、2行:与HTTP的Request的请求行一样,这里使用的是HTTPS协议,所以对应的是wss请求。
  • 第3行:Connection:HTTP1.1中规定Upgrade只能应用在直接连接中。带有Upgrade头的HTTP1.1消息必须含有Connection头,因为Connection头的意义就是,任何接收到此消息的人(往往是代理服务器)都要在转发此消息之前处理掉Connection中指定的域(即不转发Upgrade域)。
  • 第4行:Upgrade是HTTP1.1中用于定义转换协议的header域。 如果服务器支持的话,客户端希望使用已经建立好的HTTP(TCP)连接,切换到WebSocket协议
  • 第5行:Sec-WebSocket-Version标识了客户端支持的WebSocket协议的版本列表。
  • 第6行:Origin为安全使用,防止跨站攻击,浏览器一般会使用这个来标识原始域。
  • 第7行:Sec-WebSocket-Key是一个Base64encode的值,这个是客户端随机生成的,用于服务端的验证,服务器会使用此字段组装成另一个key值放在握手返回信息里发送客户端。
  • 第8行:Sec_WebSocket-Protocol是一个用户定义的字符串,用来区分同URL下,不同的服务所需要的协议,标识了客户端支持的子协议的列表
  • 第9行:Sec-WebSocket-Extensions是客户端用来与服务端协商扩展协议的字段,permessage-deflate表示协商是否使用传输数据压缩,client_max_window_bits表示采用LZ77压缩算法时,滑动窗口相关的SIZE大小。

2.2 响应报文

HTTP/1.1 101
Server: nginx/1.12.2
Date: Sat, 11 Aug 2018 13:21:27 GMT
Connection: upgrade
Upgrade: websocket
Sec-WebSocket-Accept: sLMyWetYOwus23qJyUD/fa1hztc=
Sec-WebSocket-Protocol: chat
Sec-WebSocket-Extensions: permessage-deflate;client_max_window_bits=15

  • 第1行:HTTP的版本为HTTP1.1,返回码是101,开始解析Header域(不区分大小写)
  • 第2,3行:服务器信息与时间
  • 第4行:Connection字段,包含Upgrade。
  • 第5行:Upgrade字段,包含websocket。
  • 第6行:Sec-WebSocket-Accept字段,详细介绍一下:
  • 第7行:Sec-WebSocket-Protocol字段,要判断是否之前的Request握手带有此协议,如果没有,则连接失败。
  • 第8行:扩展协议协商,支持压缩,且LZZ的滑动窗口大小为15。

握手过程就完成后,TCP连接不会释放

和http的不同点

  • WebSocket的连接不能通过中间人来转发,它必须是一个直接连接。如果通过代理转发,一个代理要承受如此多的WebSocket连接不释放,就类似于一次DDOS攻击了
  • WebSocket传输的数据是二进制流,是以帧为单位的,HTTP传输的是明文传输,是字符串传输,WebSocket的数据帧有序。

三 springboot中使用websocket

参考:# SpringBoot + WebSocket 开发笔记

3.1 @ServerEndPoint注解

注意点
配置ServerEndpointExporter时,如果springboot要打包成war包,这段代码需要注释掉

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
    
}

问题

  • WebSocket实现过程中,尤其是通过“@ServerEndpoint”实现的时候,可能会出现注入失败的问题,即注入的Bean为null的问题。可以通过手动注入的方式来解决,需要改造实现类和SpringBoot启动类,如下:

将ApplicationContext 设置成类的静态变量,需要的bean通过ApplicationContext去取

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

import WebSocketController;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
//        SpringApplication.run(Application.class, args);
        SpringApplication springApplication = new SpringApplication(Application.class);
        ConfigurableApplicationContext configurableApplicationContext = springApplication.run(args);
        WebSocketController.setApplicationContext(configurableApplicationContext);  // 解决WebSocket不能注入的问题
    }

}

3.2 第二种

是可以添加拦截器在WebSocket连接建立和断开前进行一些额外操作

1.HandShake 拦截器实现

import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;

public class TestHandShakeInterceptor extends HttpSessionHandshakeInterceptor {
    
    private final Logger LOGGER = LoggerFactory.getLogger(TestHandShakeInterceptor.class);
    
    /*
     * 在WebSocket连接建立之前的操作,以鉴权为例
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, 
            WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        
        LOGGER.info("Handle before webSocket connected. ");
        
        // 获取url传递的参数,通过attributes在Interceptor处理结束后传递给WebSocketHandler
        // WebSocketHandler可以通过WebSocketSession的getAttributes()方法获取参数
        ServletServerHttpRequest serverRequest = (ServletServerHttpRequest) request;
        String id = serverRequest.getServletRequest().getParameter("id");
        String name = serverRequest.getServletRequest().getParameter("name");

        if (tokenValidation.validateSign()) {
            LOGGER.info("Validation passed. WebSocket connecting.... ");
            attributes.put("id", id);
            attributes.put("name", name);
            return super.beforeHandshake(request, response, wsHandler, attributes);
        } else {
            LOGGER.error("Validation failed. WebSocket will not connect. ");
            return false;
        }
    }
    
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
            WebSocketHandler wsHandler, Exception ex) {
        // 省略
    }

}

2.WebSocket 业务逻辑实现。参数传递采用类似GET请求的方式传递,服务端的参数在拦截器中获取之后通过attributes传递给WebSocketHandler

import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;

@RestController
public class TestWebSocketController implements WebSocketHandler {
    
    private static AtomicInteger onlineCount = new AtomicInteger(0);
    
    private static final ArrayList<WebSocketSession> sessions = new ArrayList<>();
    
    private final Logger LOGGER = LoggerFactory.getLogger(TestWebSocketController.class);
    
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        sessions.add(session);
        int onlineNum = addOnlineCount();
        LOGGER.info("Oprn a WebSocket. Current connection number: " + onlineNum);
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        sessions.remove(session);
        int onlineNum = subOnlineCount();
        LOGGER.info("Close a webSocket. Current connection number: " + onlineNum);
    }

    @Override
    public void handleMessage(WebSocketSession wsSession, WebSocketMessage<?> message) throws Exception {
        LOGGER.info("Receive a message from client: " + message.toString());
    }

    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        LOGGER.error("Exception occurs on webSocket connection. disconnecting....");
        if (session.isOpen()) {
            session.close();
        }
        sessions.remove(session);
        subOnlineCount();
    }

    /*
     * 是否支持消息拆分发送:如果接收的数据量比较大,最好打开(true), 否则可能会导致接收失败。
     * 如果出现WebSocket连接接收一次数据后就自动断开,应检查是否是这里的问题。
     */
    @Override
    public boolean supportsPartialMessages() {
        return true;
    }

    
    public static int getOnlineCount() {
        return onlineCount.get();
    }
    
    public static int addOnlineCount() {
        return onlineCount.incrementAndGet();
    }
    
    public static int subOnlineCount() {
        return onlineCount.decrementAndGet();
    }

}

3.WebSocket 配置类实现(注册WebSocket实现类,绑定接口,同时将实现类和拦截器绑定)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

import TestWebSocketController;
import TestHandShakeInterceptor;

@Configuration
@EnableWebMvc
@EnableWebSocket
public class WebSocketConfig extends WebMvcConfigurerAdapter implements WebSocketConfigurer {

    @Autowired
    private TestWebSocketController testWebSocketController;

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(TestWebSocketController, "/testWebSocket")
                .addInterceptors(new TestHandShakeInterceptor()).setAllowedOrigins("*");
    }

}

四 netty实现websocket

参考: websocket(三) 进阶!netty框架实现websocket达到高并发
1.导入依赖

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.12.Final</version>
        </dependency>

2.server类

package com.example.demo.service;

import javax.annotation.PostConstruct;
import org.springframework.stereotype.Service;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

@Service
public class NettyServer {
    public static void main(String[] args) {
        new NettyServer().run();
    }
    
    @PostConstruct
    public void initNetty(){
        new Thread(){
            public void run() {
                new NettyServer().run();
            }
        }.start();
    }
    
    
    public void run(){
        System.out.println("===========================Netty端口启动========");
        // Boss线程:由这个线程池提供的线程是boss种类的,用于创建、连接、绑定socket, (有点像门卫)然后把这些socket传给worker线程池。
        // 在服务器端每个监听的socket都有一个boss线程来处理。在客户端,只有一个boss线程来处理所有的socket。
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // Worker线程:Worker线程执行所有的异步I/O,即处理操作
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            // ServerBootstrap 启动NIO服务的辅助启动类,负责初始话netty服务器,并且开始监听端口的socket请求
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workGroup);
            // 设置非阻塞,用它来建立新accept的连接,用于构造serversocketchannel的工厂类
            b.channel(NioServerSocketChannel.class);
            // ChildChannelHandler 对出入的数据进行的业务操作,其继承ChannelInitializer
            b.childHandler(new ChildChannelHandler());
            System.out.println("服务端开启等待客户端连接 ... ...");
            Channel ch = b.bind(8083).sync().channel();
            ch.closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

3.channle注册类

package com.example.demo.service;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;

public class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
    @Override
    protected void initChannel(SocketChannel e) throws Exception {
    // 设置30秒没有读到数据,则触发一个READER_IDLE事件。
    // pipeline.addLast(new IdleStateHandler(30, 0, 0));
    // HttpServerCodec:将请求和应答消息解码为HTTP消息
    e.pipeline().addLast("http-codec",new HttpServerCodec());
    // HttpObjectAggregator:将HTTP消息的多个部分合成一条完整的HTTP消息
    e.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));
    // ChunkedWriteHandler:向客户端发送HTML5文件
    e.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
    // 在管道中添加我们自己的接收数据实现方法
    e.pipeline().addLast("handler",new MyWebSocketServerHandler());
    }
}


4.handler处理类

package com.example.demo.service;

import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.AttributeKey;
import io.netty.util.CharsetUtil;

public class MyWebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
    private static final Logger logger = Logger.getLogger(WebSocketServerHandshaker.class.getName());
    private WebSocketServerHandshaker handshaker;

    /**
     * channel 通道 action 活跃的 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 添加
        Global.group.add(ctx.channel());
//      System.out.println("客户端与服务端连接开启:" + ctx.channel().remoteAddress().toString());
    }

    /**
     * channel 通道 Inactive 不活跃的 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端关闭了通信通道并且不可以传输数据
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 移除
        Global.group.remove(ctx.channel());
//      System.out.println("客户端与服务端连接关闭:" + ctx.channel().remoteAddress().toString());
    }
    
//  /**
//  * 接收客户端发送的消息 channel 通道 Read 读 简而言之就是从通道中读取数据,也就是服务端接收客户端发来的数据。但是这个数据在不进行解码时它是ByteBuf类型的
//  */
//  @Override
//  protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
//      // 传统的HTTP接入
//      if (msg instanceof FullHttpRequest) {
//          handleHttpRequest(ctx, ((FullHttpRequest) msg));
//          // WebSocket接入
//      } else if (msg instanceof WebSocketFrame) {
//          System.out.println(handshaker.uri());
//          if("anzhuo".equals(ctx.attr(AttributeKey.valueOf("type")).get())){
//              handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
//          }else{
//              handlerWebSocketFrame2(ctx, (WebSocketFrame) msg);
//          }
//      }
//  }
    
    /**
    * channel 通道 Read 读取 Complete 完成 在通道读取完成后会在这个方法里通知,对应可以做刷新操作 ctx.flush()
    */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
        // 判断是否关闭链路的指令
        if (frame instanceof CloseWebSocketFrame) {
            System.out.println(1);
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        // 判断是否ping消息
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        // 本例程仅支持文本消息,不支持二进制消息
        if (!(frame instanceof TextWebSocketFrame)) {
            System.out.println("本例程仅支持文本消息,不支持二进制消息");
            throw new UnsupportedOperationException(
                    String.format("%s frame types not supported", frame.getClass().getName()));
        }
        // 返回应答消息
        String request = ((TextWebSocketFrame) frame).text();
        System.out.println("服务端收到:" + request);
        if (logger.isLoggable(Level.FINE)) {
            logger.fine(String.format("%s received %s", ctx.channel(), request));
        }
        
        TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + ":" + request);
        // 群发
        Global.group.writeAndFlush(tws);
        // 返回【谁发的发给谁】
        // ctx.channel().writeAndFlush(tws);
    
    }
    
    private void handlerWebSocketFrame2(ChannelHandlerContext ctx, WebSocketFrame frame) {
        // 判断是否关闭链路的指令
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        
        // 判断是否ping消息
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        // 本例程仅支持文本消息,不支持二进制消息
        if (!(frame instanceof TextWebSocketFrame)) {
            System.out.println("本例程仅支持文本消息,不支持二进制消息");
            throw new UnsupportedOperationException(
                    String.format("%s frame types not supported", frame.getClass().getName()));
        }
        // 返回应答消息
        String request = ((TextWebSocketFrame) frame).text();
        System.out.println("服务端2收到:" + request);
        if (logger.isLoggable(Level.FINE)) {
            logger.fine(String.format("%s received %s", ctx.channel(), request));
        }
        TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + ":" + request);
        // 群发
        Global.group.writeAndFlush(tws);
        // 返回【谁发的发给谁】
        // ctx.channel().writeAndFlush(tws);
    }
    
    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
        // 如果HTTP解码失败,返回HHTP异常
        if (!req.getDecoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
            sendHttpResponse(ctx, req,
                    new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }
        //获取url后置参数
        HttpMethod method=req.getMethod();
        String uri=req.getUri();
        QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri);
        Map<String, List<String>> parameters = queryStringDecoder.parameters();
//      System.out.println(parameters.get("request").get(0));
        if(method==HttpMethod.GET&&"/webssss".equals(uri)){
        //....处理
        ctx.attr(AttributeKey.valueOf("type")).set("anzhuo");
        }else if(method==HttpMethod.GET&&"/websocket".equals(uri)){
        //...处理
        ctx.attr(AttributeKey.valueOf("type")).set("live");
        }
        // 构造握手响应返回,本机测试
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                "ws://"+req.headers().get(HttpHeaders.Names.HOST)+uri, null, false);
        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
        } else {
                handshaker.handshake(ctx.channel(), req);
        }
    
    
    }
    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
        // 返回应答给客户端
        if (res.getStatus().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
        }
        // 如果是非Keep-Alive,关闭连接
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!HttpHeaders.isKeepAlive(req) || res.getStatus().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }

    /**
     * exception 异常 Caught 抓住 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 传统的HTTP接入
        if (msg instanceof FullHttpRequest) {
            handleHttpRequest(ctx, ((FullHttpRequest) msg));
            // WebSocket接入
        } else if (msg instanceof WebSocketFrame) {
//          System.out.println(handshaker.uri());
            if("anzhuo".equals(ctx.attr(AttributeKey.valueOf("type")).get())){
                handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
            }else{
                handlerWebSocketFrame2(ctx, (WebSocketFrame) msg);
            }
        }
    }
}

相关文章

网友评论

      本文标题:websocket介绍和demo

      本文链接:https://www.haomeiwen.com/subject/fukdaqtx.html