协议
-
双端实时通信
-
基于TCP协议
-
通过HTTP握手
详细可以参考The WebSocket Protocol
握手
- HTTP Request
GET / HTTP/1.1
Connection: Upgrade
Upgrade: websocket
Sec-WebSocket-Key: b217JVMPYLsf0xXULygwPg==
- HTTP Response
HTTP/1.1 101 Switching Protocols
Connection: Upgrade
Upgrade: websocket
Sec-WebSocket-Accept: HEzw6T2Rtwwtdask3X7uMvi5uSw=
详细可以参考编写 WebSocket 服务器
测试
-
浏览器打开websocket.org Echo Test 同时打开浏览器调试
-
Location => wss://echo.websocket.org => Connect
-
Message => Hello Websocket => Send
实现
vim websocket.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# https://docs.python.org/3.7/library/socketserver.html#module-socketserver
import struct
from base64 import b64encode
from hashlib import sha1
from socketserver import ThreadingMixIn, TCPServer, StreamRequestHandler
'''
+-+-+-+-+-------+-+-------------+-------------------------------+
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| Payload Data continued ... |
+---------------------------------------------------------------+
'''
FIN = 0x80
OPCODE = 0x0f
MASKED = 0x80
PAYLOAD_LEN = 0x7f
PAYLOAD_LEN_EXT16 = 0x7e
PAYLOAD_LEN_EXT64 = 0x7f
OPCODE_CONTINUATION = 0x0
OPCODE_TEXT = 0x1
OPCODE_BINARY = 0x2
OPCODE_CLOSE_CONN = 0x8
OPCODE_PING = 0x9
OPCODE_PONG = 0xA
class WebsocketServer(ThreadingMixIn, TCPServer):
allow_reuse_address = True
daemon_threads = True
clients = []
id_counter = 0
def __init__(self, host='127.0.0.1', port=9000):
TCPServer.__init__(self, (host, port), WebSocketHandler)
self.port = self.socket.getsockname()[1]
def run(self):
try:
self.serve_forever()
except KeyboardInterrupt:
self.server_close()
except Exception as e:
exit(1)
def send_message(self, client, msg):
self._unicast(client, msg)
def send_message_to_all(self, msg):
for client in self.clients:
self._unicast(client, msg)
def set_callbacks(self, client_connected_cb, client_disconnected_cb, message_received_cb):
self.client_connected_cb = client_connected_cb
self.client_disconnected_cb = client_disconnected_cb
self.message_received_cb = message_received_cb
def client_connected(self, handler):
self.id_counter += 1
client = {
'id': self.id_counter,
'handler': handler,
'address': handler.client_address
}
self.clients.append(client)
self.client_connected_cb(client, self)
def client_disconnected(self, handler):
client = self._handler_to_client(handler)
self.clients.remove(client)
self.client_disconnected_cb(client, self)
def message_received(self, handler, msg):
self.message_received_cb(self._handler_to_client(handler), self, msg)
def ping_received(self, handler, msg):
handler.send_pong(msg)
def pong_received(self, handler, msg):
pass
def _unicast(self, client, msg):
client['handler'].send_message(msg)
def _handler_to_client(self, handler):
for client in self.clients:
if client['handler'] == handler:
return client
class WebSocketHandler(StreamRequestHandler):
def __init__(self, socket, addr, server):
StreamRequestHandler.__init__(self, socket, addr, server)
self.server = server
def setup(self):
StreamRequestHandler.setup(self)
self.keep_alive = True
self.handshake_done = False
def handle(self):
while self.keep_alive:
if not self.handshake_done:
self._handshake()
else:
self._read_message()
def finish(self):
self.server.client_disconnected(self)
def send_message(self, message):
self._send_text(message)
def send_pong(self, message):
self._send_text(message, OPCODE_PONG)
def _handshake(self):
headers = {}
http_get = self.rfile.readline().decode().strip()
assert http_get.upper().startswith('GET')
while True:
header = self.rfile.readline().decode().strip()
if not header:
break
key, value = header.split(':', 1)
headers[key.lower().strip()] = value.strip()
try:
assert headers['upgrade'].lower() == 'websocket'
except AssertionError:
self.keep_alive = False
return
try:
key = headers['sec-websocket-key']
except KeyError:
self.keep_alive = False
return
response = self.make_handshake_response(key)
self.handshake_done = self.request.send(response.encode())
self.server.client_connected(self)
def _read_message(self):
try:
b1, b2 = self.rfile.read(2)
except ConnectionResetError as e:
if e.errno == errno.ECONNRESET:
self.keep_alive = 0
return
b1, b2 = 0, 0
except ValueError as e:
b1, b2 = 0, 0
fin = b1 & FIN
opcode = b1 & OPCODE
masked = b2 & MASKED
payload_length = b2 & PAYLOAD_LEN
if not MASKED:
self.keep_alive = 0
return
if opcode == OPCODE_TEXT:
opcode_handler = self.server.message_received
elif opcode == OPCODE_PING:
opcode_handler = self.server.ping_received
elif opcode == OPCODE_PONG:
opcode_handler = self.server.pong_received
else:
self.keep_alive = 0
return
if payload_length == 126:
payload_length = struct.unpack(">H", self.rfile.read(2))[0]
elif payload_length == 127:
payload_length = struct.unpack(">Q", self.rfile.read(8))[0]
masks = self.rfile.read(4)
message_bytes = bytearray()
for message_byte in self.rfile.read(payload_length):
message_byte ^= masks[len(message_bytes) % 4]
message_bytes.append(message_byte)
opcode_handler(self, message_bytes.decode('utf8'))
def _send_text(self, message, opcode=OPCODE_TEXT):
header = bytearray()
payload = message.encode('UTF-8')
payload_length = len(payload)
if payload_length <= 125:
header.append(FIN | opcode)
header.append(payload_length)
elif payload_length >= 126 and payload_length <= 65535:
header.append(FIN | opcode)
header.append(PAYLOAD_LEN_EXT16)
header.extend(struct.pack(">H", payload_length))
elif payload_length < 18446744073709551616:
header.append(FIN | opcode)
header.append(PAYLOAD_LEN_EXT64)
header.extend(struct.pack(">Q", payload_length))
self.request.send(header + payload)
@classmethod
def make_handshake_response(cls, key):
return \
'HTTP/1.1 101 Switching Protocols\r\n'\
'Upgrade: websocket\r\n' \
'Connection: Upgrade\r\n' \
'Sec-WebSocket-Accept: %s\r\n' \
'\r\n' % cls.calculate_response_key(key)
@classmethod
def calculate_response_key(cls, key):
GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
hash = sha1(key.encode() + GUID.encode())
response_key = b64encode(hash.digest()).strip()
return response_key.decode('ASCII')
vim server.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from websocket import WebsocketServer
def client_connected_cb(client, server):
print("[客户端连接] id=%d" % client['id'])
server.send_message_to_all("[服务端消息] 客户端连接 id=%d" % client['id'])
def client_disconnected_cb(client, server):
print("[客户端断开] id=%d" % client['id'])
server.send_message_to_all("[服务端消息] 客户端断开 id=%d" % client['id'])
def message_received_cb(client, server, message):
print("[客户端消息] id=%d message=%s" % (client['id'], message))
server = WebsocketServer()
server.set_callbacks(client_connected_cb, client_disconnected_cb, message_received_cb)
server.run()
vim client.html
<html>
<head>
<script type="text/javascript">
var ws;
function init() {
ws = new WebSocket("ws://localhost:9000/");
ws.onopen = function () {
output("[连接]");
};
ws.onmessage = function (e) {
output("[消息] => " + e.data);
};
ws.onclose = function () {
output("[关闭]");
};
ws.onerror = function (e) {
output("[错误] => " + e);
};
}
function onSubmit() {
var input = document.getElementById("input");
ws.send(input.value);
output("[发送] => " + input.value);
input.value = "";
input.focus();
}
function onCloseClick() {
ws.close();
}
function output(str) {
var log = document.getElementById("log");
var escaped = str.replace(/&/, "&").replace(/</, "<").
replace(/>/, ">").replace(/"/, """); // "
log.innerHTML = escaped + "<br>" + log.innerHTML;
}
</script>
</head>
<body onload="init();">
<form onsubmit="onSubmit(); return false;">
<input type="text" id="input">
<input type="submit" value="发送">
<button onclick="onCloseClick(); return false;">关闭</button>
</form>
<div id="log"></div>
</body>
</html>
-
启动服务端 => python server.py
-
启动客户端 => 浏览器打开client.html
完整代码参考websocket-server
网友评论