美文网首页
自定义Web服务器和WebSocket通信

自定义Web服务器和WebSocket通信

作者: ruoshy | 来源:发表于2019-10-10 16:48 被阅读0次

概述

  套接字(socket)是一个抽象层,应用程序可以通过它发送或接收数据,可对其进行像对文件一样的打开、读写和关闭等操作。套接字允许应用程序将I/O插入到网络中,并与网络中的其他应用程序进行通信。网络套接字是IP地址与端口的组合。

一、Web服务器

  Socket最初是加利福尼亚大学Berkeley分校为Unix系统开发的网络通信接口。后来随着TCP/IP网络的发展,Socket成为最为通用的应用程序接口,也是在Internet上进行应用开发最为通用的API。

下面开始创建一个简陋的Web服务器

创建Main类监听端口等待请求:

/**
 * 简陋的Web服务器
 */
public class Main {
    // 静态文件位置
    private final String classPath = this.getClass().getClassLoader().getResource("./").getPath();

    // 静态文件类型
    public static Set<String> fileType;

    static {
        fileType = new HashSet<>();
        fileType.add("png");
        fileType.add("ico");
    }

    public static void main(String[] args) {
        new Main(80);
    }

    public Main(int port) {
        try {
            // 开启服务监听端口
            ServerSocket server = new ServerSocket(port);
            while (true) {
                // 阻塞,等待请求
                Socket client = server.accept();
                // TODO 使用线程池
                // 创建线程处理请求
                new Thread(new Handle(client, classPath)).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

创建请求处理类Handle:

/**
 * web请求处理
 */
public class Handle implements Runnable {
    private java.net.Socket socket;
    private String classPath;

    public Handle(java.net.Socket socket, String classPath) {
        this.socket = socket;
        this.classPath = classPath;
    }

    @Override
    public void run() {
        try (InputStream inputStream = socket.getInputStream();
             OutputStream outputStream = socket.getOutputStream()) {
            BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
            PrintWriter out = new PrintWriter(outputStream, true);
            // 请求类型
            String method = null;
            // 请求头属性
            String[] headers;
            // 请求地址
            String path = null;
            // 接收请求头属性
            String line;
            while ((line = br.readLine()) != null && !"".equals(line)) {
                if (method == null) {
                    headers = line.split(" ");
                    method = headers[0];
                    path = headers[1];
                }
            }
            // 处理路径
            if (path != null) {
                // 请求地址处理
                String[] pathVariable = path.split("/");
                if (pathVariable.length == 0) {
                    out.println("HTTP/1.1 200 OK");
                    out.println();
                    out.println("<font style=\"color:red;font-size:50\">Hello world!</font>");
                    return;
                }
                // 请求静态文件处理
                String lastRoute = pathVariable[pathVariable.length - 1];
                if (lastRoute.contains(".")) {
                    // 处理静态文件
                    if (Main.fileType.contains(lastRoute.split("[.]")[1])) {
                        File file = new File(classPath + lastRoute);
                        if (file.exists()) {
                            try (FileInputStream fis = new FileInputStream(file)) {
                                this.socket.getOutputStream().write(fis.readAllBytes());
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                            return;
                        }
                    }
                }
                // 404
                out.println("<font style=\"font-size:50\">404</font>");
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (this.socket != null) {
                    this.socket.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

以上一个非常简陋的Web服务器就完成了,接下来对本地80端口进行测试

localhost favicon.ico favicon.png

二、WebSocket

  WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket通信协议于2011年被IETF定为标准RFC 6455,并由RFC7936补充规范。WebSocket API也被W3C定为标准。

  WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

下面开始创建WebSocket服务器:

创建后续需要用到的工具类CodingUtil:

public class CodingUtil {

    /**
     * 字节数组转长整型
     *
     * @param bytes 字节数组
     * @return Long
     */
    public static long bytesToLong(byte[] bytes) {
        ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
        buffer.put(bytes);
        buffer.flip();
        return buffer.getLong();
    }

    /**
     * 对Sec-WebSocket-Key密钥进行加密
     * 生成Sec-WebSocket-Accept值
     *
     * @param key 密钥
     * @return String
     * @throws NoSuchAlgorithmException
     */
    public static String encryption(String key) throws NoSuchAlgorithmException {
        MessageDigest md = MessageDigest.getInstance("SHA1");
        md.update((key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").getBytes());
        byte[] bytes = md.digest();
        return new String(Base64.getEncoder().encode(bytes));
    }

    /**
     * 取指定位置bit
     *
     * @param b     字节
     * @param index 位置
     * @return
     */
    public static int getByteBit(byte b, int index) {
        return (b & 0x80 >> index) >> (7 - index);
    }

    /**
     * 取指定区间bit
     *
     * @param b     字节
     * @param start 开始位置
     * @param end   结束位置
     * @return int
     */
    public static int getByteBits(byte b, int start, int end) {
        return (b << start & 0xff) >> (7 - end + start);
    }
    
}

CodingUtil类中encryption()方法是根据WebSocket协议https://tools.ietf.org/html/rfc6455#section-1.3,对接收到的Sec-WebSocket-Key值与258EAFA5-E914-47DA-95CA-C5AB0DC85B11连接进行SHA1加密后再通过Base64编码得到Sec-WebSocket-Accept值再返回给客户端。

创建WebSocketService类作为服务器启动入口:

public class WebSocketService {

    private SocketManager socketManager;

    public WebSocketService() {
        this.socketManager = new SocketManager();
    }

    /**
     * 监听指定端口 默认80
     * 等待客户端连接
     * 连接后创建一个线程进行处理
     *
     * @return WebSocketService
     */
    public WebSocketService start(int port) {
        new Thread(new ManageCore(socketManager)).start();
        try {
            ServerSocket server = new ServerSocket(port);
            while (true) {
                Socket client = server.accept();
                new Thread(new SocketHandle(client, socketManager)).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return this;
    }
}

该类中创建了SocketManager类来管理客户端连接,实例化ManageCore类作为控制中心交给一个线程进行管理。开启Socket服务监听指定端口,使用socket的accept()方法阻塞线程等待请求。当请求到来开启一个线程进行处理(SocketHandle类)。

创建SocketManager类:

public class ManageCore implements Runnable {

    private SocketManager socketManager;

    public ManageCore(SocketManager socketManager) {
        this.socketManager = socketManager;
    }

    @Override
    public void run() {
        Scanner scanner = new Scanner(System.in);
        Map<String, String> message = new HashMap<>();
        message.put("name", "admin");
        // 广播消息
        while (true) {
            message.put("content", scanner.nextLine());
            socketManager.broadcast(JSON.toJSONString(message));
        }
    }
}

创建SocketManager类:

public class SocketManager {

    private LinkedList<OutputStream> outputStreams;

    public SocketManager() {
        outputStreams = new LinkedList<>();
    }

    /**
     * 添加缓存
     *
     * @param outputStream
     */
    public void add(OutputStream outputStream) {
        outputStreams.add(outputStream);
    }

    /**
     * 删除缓存
     *
     * @param outputStream
     */
    public void remove(OutputStream outputStream) {
        outputStreams.remove(outputStream);
    }

    /**
     * 广播
     *
     * @param message 消息
     */
    public void broadcast(String message) {
        Iterator<OutputStream> iterator = outputStreams.listIterator();
        while (iterator.hasNext()) {
            try {
                OutputStream outputStream = iterator.next();
                push(message.getBytes(), outputStream);
            } catch (IOException e) {
                iterator.remove();
            }
        }
    }

    /**
     * 通知
     *
     * @param message 消息
     */
    public void notice(String message, OutputStream outputStream) {
        try {
            push(message.getBytes(), outputStream);
        } catch (IOException e) {
            remove(outputStream);
        }
    }

    /**
     * 推消息
     *
     * @param bytes
     * @param outputStream
     * @throws IOException
     */
    public void push(byte[] bytes, OutputStream outputStream) throws IOException {
        outputStream.write(new byte[]{(byte) 0x81, (byte) bytes.length});
        outputStream.write(bytes);
    }

    /**
     * 返回当前缓存的连接数
     *
     * @return int
     */
    public int getCurrentConnNum() {
        return outputStreams.size();
    }
}

接下来创建SocketHandle类来对请求进行处理:

public class SocketHandle implements Runnable {

    private Socket socket;
    private SocketManager socketManager;

    /**
     * @param socket 与客户端之间的连接
     */
    public SocketHandle(Socket socket, SocketManager socketManager) {
        this.socket = socket;
        this.socketManager = socketManager;
    }

    @Override
    public void run() {
        try (InputStream inputStream = socket.getInputStream();
             OutputStream outputStream = socket.getOutputStream()) {
            // 处理报文,过滤请求
            MessageFilter messageFilter = new MessageFilter();
            messageFilter.doFilter(inputStream, outputStream);
            String path = messageFilter.getPath();
            // 缓存与客户端的消息发送通道
            socketManager.add(outputStream);
            // 开启接收消息
            Thread receiveThread = new Thread(new WebSocketReceive(inputStream, socketManager));
            receiveThread.start();
            // 发送心跳包
            while (receiveThread.isAlive()) {
                Thread.sleep(15000);
                socketManager.notice("h", outputStream);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (this.socket != null) {
                    this.socket.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

当浏览器使用websocket请求时服务端接收到的报文如下:

GET / HTTP/1.1
Host: localhost
Connection: Upgrade
Pragma: no-cache
Cache-Control: no-cache
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36
Upgrade: websocket
Origin: file://
Sec-WebSocket-Version: 13
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9
Cookie: _ga=GA1.1.420931640.1569149707
Sec-WebSocket-Key: q+RL7D/fr9/WQHlF2OK/Nw==
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits

我们可以创建一个过滤器来对报文进行过滤(根据自己的需求进行过滤),判断是WebSocket请求还是其他请求,若修改请求处理线程类SocketHandle,可根据过滤结果使用对应的处理器,使WebSocket服务器与上面的Web服务器结合使用:

public class MessageFilter {

    private String path;

    public void doFilter(InputStream inputStream, OutputStream outputStream) throws Exception {
        // 读取请求头属性
        BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
        String line;
        HashMap<String, String> headers = new HashMap<>();
        while ((line = br.readLine()) != null && !"".equals(line)) {
            String[] header = line.split(": ");
            if (header.length == 2) {
                headers.put(header[0], header[1]);
            } else {
                String[] parame = line.split(" ");
                if (parame.length > 2) {
                    this.path = parame[1];
                }
            }
        }
        // 验证请求
        if (whether(headers)) {
            throw new Exception("缺少必要的header");
        }
        // 返回请求头
        PrintWriter out = new PrintWriter(outputStream);
        out.println("HTTP/1.1 101 Switching Protocols");
        out.println("Connection: Upgrade");
        out.println("Sec-WebSocket-Accept: " + CodingUtil.encryption(headers.get("Sec-WebSocket-Key")));
        out.println("Upgrade: websocket");
        out.println();
        out.flush();
    }

    /**
     * 确定是否存在对应属性
     *
     * @param headers 请求头属性
     * @return boolean
     */
    public boolean whether(HashMap<String, String> headers) {
        if (!"Upgrade".equals(headers.get("Connection"))) {
            return false;
        }

        if (headers.get("Sec-WebSocket-Accept") == null) {
            return false;
        }

        if (!"websocket".equals(headers.get("Upgrade"))) {
            return false;
        }
        return true;
    }

    public String getPath() {
        return path;
    }

}

过滤请求,确定服务后缓存与客户端的消息发送通道,开启接收客户端消息的线程并发送心跳。

创建WebSocketReceive类接收客户端消息:

public class WebSocketReceive implements Runnable {

    private InputStream inputStream;
    private SocketManager socketManager;

    public WebSocketReceive(InputStream inputStream, SocketManager socketManager) {
        this.inputStream = inputStream;
        this.socketManager = socketManager;
    }

    @Override
    public void run() {
        while (true) {
            try {
                receive();
            } catch (Exception e) {
                break;
            }![![connect.png](https://img.haomeiwen.com/i18713780/fe629e48150c46ce.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
](https://img.haomeiwen.com/i18713780/9b5d9abfc607e017.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

        }
    }

    public void receive() throws IOException {
        byte[] frameReader = inputStream.readNBytes(1);
        // FIN表示这是消息中的最后一个片段
        // 当FIN为0时表示不是最后一个片段,为1时表示最后一个片段
        int fin = getByteBit(frameReader[0], 0);
        // RSV 是保留的扩展定义位,没有扩展的情况下为0
        int rsv1 = getByteBit(frameReader[0], 1);
        int rsv2 = getByteBit(frameReader[0], 2);
        int rsv3 = getByteBit(frameReader[0], 3);

        // opcode操作码
        int opcode = getByteBits(frameReader[0], 4, 7);
        switch (opcode) {
            case 0:
                // 连续帧
                break;
            case 1:
                // 文本帧
                break;
            case 2:
                // 二进制帧
                break;
            case 8:
                // 连接关闭
                throw new IOException("socket close");
            case 9:
                // ping
                break;
            case 10:
                // pone
                break;
        }
        // mask掩码
        frameReader = inputStream.readNBytes(1);
        int mask = getByteBit(frameReader[0], 0);

        // payload len 有效载荷
        int payloadLen = getByteBits(frameReader[0], 1, 7);

        // extended payload length 有效载荷长度延长
        long extendedPayloadLen = payloadLen;
        if (payloadLen == 126) {
            // 读2个字节
            extendedPayloadLen = CodingUtil.bytesToLong(inputStream.readNBytes(2));
        } else if (payloadLen == 127) {
            // 读8个字节
            extendedPayloadLen = CodingUtil.bytesToLong(inputStream.readNBytes(8));
        }

        // 获得屏蔽键
        byte[] maskingKey = null;
        if (mask == 1) {
            maskingKey = inputStream.readNBytes(4);
        }

        // 解码
        frameReader = inputStream.readNBytes(Long.valueOf(extendedPayloadLen).intValue());
        if (maskingKey != null) {
            byte[] encodeBytes = new byte[frameReader.length];
            for (int i = 0; i < encodeBytes.length; i++) {
                encodeBytes[i] = (byte) (frameReader[i] ^ maskingKey[i % 4]);
            }
            String message = new String(encodeBytes);
            socketManager.broadcast(message);
            System.out.println(message);
            // 自定义的消息格式 JSON解析使用了alibaba的fastjson
            // JSONObject messager = JSON.parseObject(message);
            // System.out.println(messager.get("name") + " : " + messager.get("content"));
        }

    }

    /**
     * 取指定位置bit
     *
     * @param b     字节
     * @param index 位置
     * @return
     */
    private int getByteBit(byte b, int index) {
        return (b & 0x80 >> index) >> (7 - index);
    }

    /**
     * 取指定区间bit
     *
     * @param b     字节
     * @param start 开始位置
     * @param end   结束位置
     * @return int
     */
    private int getByteBits(byte b, int start, int end) {
        return (b << start & 0xff) >> (7 - end + start);
    }
}

在WebSocket协议中,使用帧传输数据,以上类只完成了对帧数据的获取和解码,为了简单性对于其他机制的处理并没有按照协议(可仔细阅读协议自行补充),但是足够完成与客户端的通信要求。

WebSocket Frame的协议:

最后可在WebSocketReceive类中使用SocketManager类进行消息广播或者自定义一对一的消息通信。

完成以上步骤后就可以创建一个WebSocket类指定端口开启WebSocket服务了:

public class WebSocket {

    public static void main(String[] args) {
        new WebSocketService().start(80);
    }
}

接下来编写html页面来请求WebSocket服务器:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>群聊</title>
    <script src="https://cdn.bootcss.com/jquery/3.4.0/jquery.slim.min.js"></script>
</head>
<body>
<div>
    <label>请输入用户名:</label>
    <input type="text" id="name" placeholder="用户名">
</div>
<div>
    <button type="button" id="connect">连接</button>
    <button type="button" id="disconnect">断开连接</button>
</div>
<div id="chat">
</div>
<div>
    <input type="text" id="content" placeholder="内容" style="display: inline-block;">
    <button type="button" id="send" style="display: inline-block;">发送</button>
</div>
<ul id="info" style="list-style: none">
</ul>
<script>
    function connect() {
        var ws = new WebSocket("ws://localhost");
        ws.onopen = function(){
            $('#chat').text('连接成功');
        };

        ws.onmessage = function (evt){
            try{
                update(JSON.parse(evt.data));
            }catch(err){
            }
        };

        ws.onclose = function(){
            $('#chat').text('');
        };


        $('#send').click(function () {
            ws.send(JSON.stringify({'name': $('#name').val(), 'content': $('#content').val()}));
        });

        $('#disconnect').click(function () {
            ws.close();
        });
    }

    function update(message) {
        $('#info').append('<li><b>' + message.name + ' : </b>' + message.content + '</li>')
    }

    $(function () {
        $('#connect').click(function () {
            connect();
        });
    })
</script>
</body>
</html>

测试:

连接WebSocket服务器:

connect

服务端接收、发送消息:

service-send

客户端发送消息:

client-send

实例地址

相关文章

  • 自定义Web服务器和WebSocket通信

    概述   套接字(socket)是一个抽象层,应用程序可以通过它发送或接收数据,可对其进行像对文件一样的打开、读写...

  • iOS简单的WebSocket连接

    概述WebSocket 1.1 为什么我们需要WebSocket这样的实时的通信协议? WebSocket是web...

  • WebSocket协议

    一旦 Web 服务器与客户端之间建立起 WebSocket 协议的通信连接,之后所有的通信都依靠这个专用协议进行。...

  • 0依赖解析Websocket协议(Node)

    Websocket 是HTML5中的一种新的Web通信技术,它实现了浏览器与服务器之间的双向通信(full-dup...

  • HTML5服务器通信

    一,web服务器通信历史及通信流程 web服务器通信历史,之前的web通信就是浏览器请求网页,然后服务器返回响应...

  • 03-webSocket学习

    WebSocket 是什么? WebSocket 是一种网络通信协议。RFC6455 定义了它的通信标准。 Web...

  • 浅谈WebSocket

    概述 WebSocket 是什么? WebSocket是一种网络通信协议。RFC6455定义了它的通信标准。Web...

  • Web通信中传统轮询、长轮询和WebSocket简介

    Web通信中传统轮询、长轮询和WebSocket简介https://zhuanlan.zhihu.com/p/25...

  • WebSocket协议分析及实现

    1.概述 Webscoket是Web浏览器和服务器之间的一种全双工通信协议,其中WebSocket协议由IETF定...

  • 使用websocket协议完成推送(tornado.websoc

    关于WebSocket WebSocket API是下一代客户端-服务器的异步通信方法。该通信取代了单个的TCP套...

网友评论

      本文标题:自定义Web服务器和WebSocket通信

      本文链接:https://www.haomeiwen.com/subject/wmnnpctx.html