概述
套接字(socket)是一个抽象层,应用程序可以通过它发送或接收数据,可对其进行像对文件一样的打开、读写和关闭等操作。套接字允许应用程序将I/O插入到网络中,并与网络中的其他应用程序进行通信。网络套接字是IP地址与端口的组合。
一、Web服务器
Socket最初是加利福尼亚大学Berkeley分校为Unix系统开发的网络通信接口。后来随着TCP/IP网络的发展,Socket成为最为通用的应用程序接口,也是在Internet上进行应用开发最为通用的API。
下面开始创建一个简陋的Web服务器
创建Main类监听端口等待请求:
/**
* 简陋的Web服务器
*/
public class Main {
// 静态文件位置
private final String classPath = this.getClass().getClassLoader().getResource("./").getPath();
// 静态文件类型
public static Set<String> fileType;
static {
fileType = new HashSet<>();
fileType.add("png");
fileType.add("ico");
}
public static void main(String[] args) {
new Main(80);
}
public Main(int port) {
try {
// 开启服务监听端口
ServerSocket server = new ServerSocket(port);
while (true) {
// 阻塞,等待请求
Socket client = server.accept();
// TODO 使用线程池
// 创建线程处理请求
new Thread(new Handle(client, classPath)).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
创建请求处理类Handle:
/**
* web请求处理
*/
public class Handle implements Runnable {
private java.net.Socket socket;
private String classPath;
public Handle(java.net.Socket socket, String classPath) {
this.socket = socket;
this.classPath = classPath;
}
@Override
public void run() {
try (InputStream inputStream = socket.getInputStream();
OutputStream outputStream = socket.getOutputStream()) {
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
PrintWriter out = new PrintWriter(outputStream, true);
// 请求类型
String method = null;
// 请求头属性
String[] headers;
// 请求地址
String path = null;
// 接收请求头属性
String line;
while ((line = br.readLine()) != null && !"".equals(line)) {
if (method == null) {
headers = line.split(" ");
method = headers[0];
path = headers[1];
}
}
// 处理路径
if (path != null) {
// 请求地址处理
String[] pathVariable = path.split("/");
if (pathVariable.length == 0) {
out.println("HTTP/1.1 200 OK");
out.println();
out.println("<font style=\"color:red;font-size:50\">Hello world!</font>");
return;
}
// 请求静态文件处理
String lastRoute = pathVariable[pathVariable.length - 1];
if (lastRoute.contains(".")) {
// 处理静态文件
if (Main.fileType.contains(lastRoute.split("[.]")[1])) {
File file = new File(classPath + lastRoute);
if (file.exists()) {
try (FileInputStream fis = new FileInputStream(file)) {
this.socket.getOutputStream().write(fis.readAllBytes());
} catch (IOException e) {
e.printStackTrace();
}
return;
}
}
}
// 404
out.println("<font style=\"font-size:50\">404</font>");
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (this.socket != null) {
this.socket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
以上一个非常简陋的Web服务器就完成了,接下来对本地80端口进行测试
localhost favicon.ico favicon.png二、WebSocket
WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket通信协议于2011年被IETF定为标准RFC 6455,并由RFC7936补充规范。WebSocket API也被W3C定为标准。
WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
下面开始创建WebSocket服务器:
创建后续需要用到的工具类CodingUtil:
public class CodingUtil {
/**
* 字节数组转长整型
*
* @param bytes 字节数组
* @return Long
*/
public static long bytesToLong(byte[] bytes) {
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
buffer.put(bytes);
buffer.flip();
return buffer.getLong();
}
/**
* 对Sec-WebSocket-Key密钥进行加密
* 生成Sec-WebSocket-Accept值
*
* @param key 密钥
* @return String
* @throws NoSuchAlgorithmException
*/
public static String encryption(String key) throws NoSuchAlgorithmException {
MessageDigest md = MessageDigest.getInstance("SHA1");
md.update((key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").getBytes());
byte[] bytes = md.digest();
return new String(Base64.getEncoder().encode(bytes));
}
/**
* 取指定位置bit
*
* @param b 字节
* @param index 位置
* @return
*/
public static int getByteBit(byte b, int index) {
return (b & 0x80 >> index) >> (7 - index);
}
/**
* 取指定区间bit
*
* @param b 字节
* @param start 开始位置
* @param end 结束位置
* @return int
*/
public static int getByteBits(byte b, int start, int end) {
return (b << start & 0xff) >> (7 - end + start);
}
}
CodingUtil类中encryption()方法是根据WebSocket协议https://tools.ietf.org/html/rfc6455#section-1.3,对接收到的Sec-WebSocket-Key值与258EAFA5-E914-47DA-95CA-C5AB0DC85B11连接进行SHA1加密后再通过Base64编码得到Sec-WebSocket-Accept值再返回给客户端。
创建WebSocketService类作为服务器启动入口:
public class WebSocketService {
private SocketManager socketManager;
public WebSocketService() {
this.socketManager = new SocketManager();
}
/**
* 监听指定端口 默认80
* 等待客户端连接
* 连接后创建一个线程进行处理
*
* @return WebSocketService
*/
public WebSocketService start(int port) {
new Thread(new ManageCore(socketManager)).start();
try {
ServerSocket server = new ServerSocket(port);
while (true) {
Socket client = server.accept();
new Thread(new SocketHandle(client, socketManager)).start();
}
} catch (IOException e) {
e.printStackTrace();
}
return this;
}
}
该类中创建了SocketManager类来管理客户端连接,实例化ManageCore类作为控制中心交给一个线程进行管理。开启Socket服务监听指定端口,使用socket的accept()方法阻塞线程等待请求。当请求到来开启一个线程进行处理(SocketHandle类)。
创建SocketManager类:
public class ManageCore implements Runnable {
private SocketManager socketManager;
public ManageCore(SocketManager socketManager) {
this.socketManager = socketManager;
}
@Override
public void run() {
Scanner scanner = new Scanner(System.in);
Map<String, String> message = new HashMap<>();
message.put("name", "admin");
// 广播消息
while (true) {
message.put("content", scanner.nextLine());
socketManager.broadcast(JSON.toJSONString(message));
}
}
}
创建SocketManager类:
public class SocketManager {
private LinkedList<OutputStream> outputStreams;
public SocketManager() {
outputStreams = new LinkedList<>();
}
/**
* 添加缓存
*
* @param outputStream
*/
public void add(OutputStream outputStream) {
outputStreams.add(outputStream);
}
/**
* 删除缓存
*
* @param outputStream
*/
public void remove(OutputStream outputStream) {
outputStreams.remove(outputStream);
}
/**
* 广播
*
* @param message 消息
*/
public void broadcast(String message) {
Iterator<OutputStream> iterator = outputStreams.listIterator();
while (iterator.hasNext()) {
try {
OutputStream outputStream = iterator.next();
push(message.getBytes(), outputStream);
} catch (IOException e) {
iterator.remove();
}
}
}
/**
* 通知
*
* @param message 消息
*/
public void notice(String message, OutputStream outputStream) {
try {
push(message.getBytes(), outputStream);
} catch (IOException e) {
remove(outputStream);
}
}
/**
* 推消息
*
* @param bytes
* @param outputStream
* @throws IOException
*/
public void push(byte[] bytes, OutputStream outputStream) throws IOException {
outputStream.write(new byte[]{(byte) 0x81, (byte) bytes.length});
outputStream.write(bytes);
}
/**
* 返回当前缓存的连接数
*
* @return int
*/
public int getCurrentConnNum() {
return outputStreams.size();
}
}
接下来创建SocketHandle类来对请求进行处理:
public class SocketHandle implements Runnable {
private Socket socket;
private SocketManager socketManager;
/**
* @param socket 与客户端之间的连接
*/
public SocketHandle(Socket socket, SocketManager socketManager) {
this.socket = socket;
this.socketManager = socketManager;
}
@Override
public void run() {
try (InputStream inputStream = socket.getInputStream();
OutputStream outputStream = socket.getOutputStream()) {
// 处理报文,过滤请求
MessageFilter messageFilter = new MessageFilter();
messageFilter.doFilter(inputStream, outputStream);
String path = messageFilter.getPath();
// 缓存与客户端的消息发送通道
socketManager.add(outputStream);
// 开启接收消息
Thread receiveThread = new Thread(new WebSocketReceive(inputStream, socketManager));
receiveThread.start();
// 发送心跳包
while (receiveThread.isAlive()) {
Thread.sleep(15000);
socketManager.notice("h", outputStream);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (this.socket != null) {
this.socket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
当浏览器使用websocket请求时服务端接收到的报文如下:
GET / HTTP/1.1
Host: localhost
Connection: Upgrade
Pragma: no-cache
Cache-Control: no-cache
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36
Upgrade: websocket
Origin: file://
Sec-WebSocket-Version: 13
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9
Cookie: _ga=GA1.1.420931640.1569149707
Sec-WebSocket-Key: q+RL7D/fr9/WQHlF2OK/Nw==
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
我们可以创建一个过滤器来对报文进行过滤(根据自己的需求进行过滤),判断是WebSocket请求还是其他请求,若修改请求处理线程类SocketHandle,可根据过滤结果使用对应的处理器,使WebSocket服务器与上面的Web服务器结合使用:
public class MessageFilter {
private String path;
public void doFilter(InputStream inputStream, OutputStream outputStream) throws Exception {
// 读取请求头属性
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
String line;
HashMap<String, String> headers = new HashMap<>();
while ((line = br.readLine()) != null && !"".equals(line)) {
String[] header = line.split(": ");
if (header.length == 2) {
headers.put(header[0], header[1]);
} else {
String[] parame = line.split(" ");
if (parame.length > 2) {
this.path = parame[1];
}
}
}
// 验证请求
if (whether(headers)) {
throw new Exception("缺少必要的header");
}
// 返回请求头
PrintWriter out = new PrintWriter(outputStream);
out.println("HTTP/1.1 101 Switching Protocols");
out.println("Connection: Upgrade");
out.println("Sec-WebSocket-Accept: " + CodingUtil.encryption(headers.get("Sec-WebSocket-Key")));
out.println("Upgrade: websocket");
out.println();
out.flush();
}
/**
* 确定是否存在对应属性
*
* @param headers 请求头属性
* @return boolean
*/
public boolean whether(HashMap<String, String> headers) {
if (!"Upgrade".equals(headers.get("Connection"))) {
return false;
}
if (headers.get("Sec-WebSocket-Accept") == null) {
return false;
}
if (!"websocket".equals(headers.get("Upgrade"))) {
return false;
}
return true;
}
public String getPath() {
return path;
}
}
过滤请求,确定服务后缓存与客户端的消息发送通道,开启接收客户端消息的线程并发送心跳。
创建WebSocketReceive类接收客户端消息:
public class WebSocketReceive implements Runnable {
private InputStream inputStream;
private SocketManager socketManager;
public WebSocketReceive(InputStream inputStream, SocketManager socketManager) {
this.inputStream = inputStream;
this.socketManager = socketManager;
}
@Override
public void run() {
while (true) {
try {
receive();
} catch (Exception e) {
break;
}![![connect.png](https://img.haomeiwen.com/i18713780/fe629e48150c46ce.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
](https://img.haomeiwen.com/i18713780/9b5d9abfc607e017.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
}
}
public void receive() throws IOException {
byte[] frameReader = inputStream.readNBytes(1);
// FIN表示这是消息中的最后一个片段
// 当FIN为0时表示不是最后一个片段,为1时表示最后一个片段
int fin = getByteBit(frameReader[0], 0);
// RSV 是保留的扩展定义位,没有扩展的情况下为0
int rsv1 = getByteBit(frameReader[0], 1);
int rsv2 = getByteBit(frameReader[0], 2);
int rsv3 = getByteBit(frameReader[0], 3);
// opcode操作码
int opcode = getByteBits(frameReader[0], 4, 7);
switch (opcode) {
case 0:
// 连续帧
break;
case 1:
// 文本帧
break;
case 2:
// 二进制帧
break;
case 8:
// 连接关闭
throw new IOException("socket close");
case 9:
// ping
break;
case 10:
// pone
break;
}
// mask掩码
frameReader = inputStream.readNBytes(1);
int mask = getByteBit(frameReader[0], 0);
// payload len 有效载荷
int payloadLen = getByteBits(frameReader[0], 1, 7);
// extended payload length 有效载荷长度延长
long extendedPayloadLen = payloadLen;
if (payloadLen == 126) {
// 读2个字节
extendedPayloadLen = CodingUtil.bytesToLong(inputStream.readNBytes(2));
} else if (payloadLen == 127) {
// 读8个字节
extendedPayloadLen = CodingUtil.bytesToLong(inputStream.readNBytes(8));
}
// 获得屏蔽键
byte[] maskingKey = null;
if (mask == 1) {
maskingKey = inputStream.readNBytes(4);
}
// 解码
frameReader = inputStream.readNBytes(Long.valueOf(extendedPayloadLen).intValue());
if (maskingKey != null) {
byte[] encodeBytes = new byte[frameReader.length];
for (int i = 0; i < encodeBytes.length; i++) {
encodeBytes[i] = (byte) (frameReader[i] ^ maskingKey[i % 4]);
}
String message = new String(encodeBytes);
socketManager.broadcast(message);
System.out.println(message);
// 自定义的消息格式 JSON解析使用了alibaba的fastjson
// JSONObject messager = JSON.parseObject(message);
// System.out.println(messager.get("name") + " : " + messager.get("content"));
}
}
/**
* 取指定位置bit
*
* @param b 字节
* @param index 位置
* @return
*/
private int getByteBit(byte b, int index) {
return (b & 0x80 >> index) >> (7 - index);
}
/**
* 取指定区间bit
*
* @param b 字节
* @param start 开始位置
* @param end 结束位置
* @return int
*/
private int getByteBits(byte b, int start, int end) {
return (b << start & 0xff) >> (7 - end + start);
}
}
在WebSocket协议中,使用帧传输数据,以上类只完成了对帧数据的获取和解码,为了简单性对于其他机制的处理并没有按照协议(可仔细阅读协议自行补充),但是足够完成与客户端的通信要求。
最后可在WebSocketReceive类中使用SocketManager类进行消息广播或者自定义一对一的消息通信。
完成以上步骤后就可以创建一个WebSocket类指定端口开启WebSocket服务了:
public class WebSocket {
public static void main(String[] args) {
new WebSocketService().start(80);
}
}
接下来编写html页面来请求WebSocket服务器:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>群聊</title>
<script src="https://cdn.bootcss.com/jquery/3.4.0/jquery.slim.min.js"></script>
</head>
<body>
<div>
<label>请输入用户名:</label>
<input type="text" id="name" placeholder="用户名">
</div>
<div>
<button type="button" id="connect">连接</button>
<button type="button" id="disconnect">断开连接</button>
</div>
<div id="chat">
</div>
<div>
<input type="text" id="content" placeholder="内容" style="display: inline-block;">
<button type="button" id="send" style="display: inline-block;">发送</button>
</div>
<ul id="info" style="list-style: none">
</ul>
<script>
function connect() {
var ws = new WebSocket("ws://localhost");
ws.onopen = function(){
$('#chat').text('连接成功');
};
ws.onmessage = function (evt){
try{
update(JSON.parse(evt.data));
}catch(err){
}
};
ws.onclose = function(){
$('#chat').text('');
};
$('#send').click(function () {
ws.send(JSON.stringify({'name': $('#name').val(), 'content': $('#content').val()}));
});
$('#disconnect').click(function () {
ws.close();
});
}
function update(message) {
$('#info').append('<li><b>' + message.name + ' : </b>' + message.content + '</li>')
}
$(function () {
$('#connect').click(function () {
connect();
});
})
</script>
</body>
</html>
测试:
连接WebSocket服务器:
connect服务端接收、发送消息:
service-send客户端发送消息:
client-send
网友评论