二、通讯原理
首先建立sockettongxin
import socket, base64, hashlib
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('127.0.0.1', 9527))
sock.listen(5)
# 获取客户端socket对象
conn, address = sock.accept()
# 获取客户端的【握手】信息
data = conn.recv(1024)
print(data)
然后获取 Sec-WebSocket-Key
def get_headers(data):
header_dict = {}
header_str = data.decode("utf8")
for i in header_str.split("\r\n"):
if str(i).startswith("Sec-WebSocket-Key"):
header_dict["Sec-WebSocket-Key"] = i.split(":")[1].strip()
return header_dict
ws_key = get_headers(data).get("Sec-WebSocket-Key")
再用 Sec-WebSocket-Key 与魔法字符串拼接
魔法字符串:magic string = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11' 固定的不能更改
加密之前的值:socket_str = ws_key + magic_string
然后将得到的值用sha1算法和base64算法加密
socket_str_sha1 = hashlib.sha1(socket_str.encode("utf8")).digest()
socket_str_base64 = base64.b64encode(socket_str_sha1)
最后将加密的字符串返回给客户端,告诉客户端握手成功,并建立长链接
response_tpl = "HTTP/1.1 101 Switching Protocols\r\n" \
"Upgrade:websocket\r\n" \
"Connection: Upgrade\r\n" \
"Sec-WebSocket-Accept: %s\r\n" \
"WebSocket-Location: ws://127.0.0.1:9527\r\n\r\n" %(socket_str_base64.decode("utf8"))
conn.send(response_tpl.encode("utf8"))
while 1:
msg = conn.recv(8096)
print(msg)
三、加密
import struct
msg_bytes = "先帝创业未半而中道崩殂,今天下三分,益州疲弊,此诚危急存亡之秋也。" .encode("utf8")
token = b"\x81" # 一种验证规则
length = len(msg_bytes) # 计算要加密字符串的长度
if length < 126:
token += struct.pack("B", length) # 长度小于126用这种方法转换长度为字符串,然后与加密规则拼接
elif length == 126:
token += struct.pack("!BH", 126, length)
else:
token += struct.pack("!BQ", 127, length)
msg = token + msg_bytes # 将要加密的字符串拼接到最后
# 最后的结果是:b"\x81" + 长度字符串 + 要加密的字符串‘’
print(msg)
四、解密
要解密的字符串:hashstr = b'\x81\x83\xceH\xb6\x85\xffz\x85'
第一个字节是\x81,他是一种验证规则,是固定不变的
将第二个字节也就是 \x83 第9-16位 进行与127进行位运算
hashstr = b'\x81\x83\xceH\xb6\x85\xffz\x85'
# b'\x81 \x83 \xceH\xb6\x85\xffz\x85'
# 将第二个字节也就是 \x83 第9-16位 进行与127进行位运算
payload = hashstr[1] & 127
print(payload)
if payload == 127:
extend_payload_len = hashstr[2:10]
mask = hashstr[10:14]
decoded = hashstr[14:]
# 当位运算结果等于127时,则第3-10个字节为数据长度
# 第11-14字节为mask 解密所需字符串
# 则数据为第15字节至结尾
if payload == 126:
extend_payload_len = hashstr[2:4]
mask = hashstr[4:8]
decoded = hashstr[8:]
# 当位运算结果等于126时,则第3-4个字节为数据长度
# 第5-8字节为mask 解密所需字符串
# 则数据为第9字节至结尾
if payload <= 125:
extend_payload_len = None
mask = hashstr[2:6]
decoded = hashstr[6:]
# 当位运算结果小于等于125时,则这个数字就是数据的长度
# 第3-6字节为mask 解密所需字符串
# 则数据为第7字节至结尾
str_byte = bytearray()
for i in range(len(decoded)):
byte = decoded[i] ^ mask[i % 4] # mask 只有0,1,2,3 四个值
str_byte.append(byte)
print(str_byte.decode("utf8"))
网友评论