- Socket:(基于TCP、IP协议的网络通信)套接字
- 基于Socket实现聊天机器人
# 客户端 import socket obj = socket.socket() obj.connect(('127.0.0.1', 9999)) # 最多接收1024字节 res_bytes = obj.recv(1024) res_str = str(res_bytes, encoding='utf-8') print(res_str) while True: inp = input("请输入你想要发送的内容:") if inp == "q": obj.sendall(bytes(inp, encoding='utf-8')) break else: obj.sendall(bytes(inp, encoding='utf-8')) ret = str(obj.recv(1024), encoding='utf-8') print(ret) obj.close() #====================================================== # 服务端 import socket sk = socket.socket() sk.bind(('127.0.0.1', 9999, )) sk.listen(5) while True: # 接收客户端的请求 accept是阻塞的 conn, address = sk.accept() conn.sendall(bytes('欢迎致电中国移动', encoding='utf-8')) while True: ret_bytes = conn.recv(1024) ret_str = str(ret_bytes, encoding='utf-8') if ret_str == 'q': break conn.sendall(bytes(ret_str + "好的", encoding='utf-8'))
- 基于Socket 实现文件上传
# client.py 客户端 import socket import os obj = socket.socket() obj.connect(('127.0.0.1', 9999, )) # 阻塞,最多接收1024字节 res_bytes = obj.recv(1024) res_str = str(res_bytes, encoding='utf-8') print(res_str) # 发送当前文件大小 # size = os.stat('f.png').st_size() size = os.path.getsize('f.png') obj.sendall(bytes(str(size), encoding='utf-8')) # 接收反馈,解决粘包问题 obj.recv(1024) with open('f.png', 'rb') as f: for line in f: obj.sendall(line) obj.close() # ===================================================== # server.py 服务端 import socket sk = socket.socket() sk.bind(('127.0.0.1', 9999, )) sk.listen(5) while True: # 接收客户端的请求 accept是阻塞的 conn, address = sk.accept() conn.sendall(bytes("欢迎登录xxx平台FTP", encoding='utf-8')) # 先接收文件大小,再开始接收文件 file_size = str(conn.recv(1024), encoding='utf-8') # 反馈一下 conn.sendall(bytes("收到文件大小", encoding='utf-8')) total_size = int(file_size) has_recv = 0 f = open('new.png', 'wb') # 接收文件内容,直到获取完毕 while True: if total_size == has_recv: break data = conn.recv(1024) f.write(data) has_recv += len(data) f.close()
- scoketserver模块实现并发操作
# 处理多并发服务端 import socketserver # 自定义类,必须继承socketserver.BaseRequestHandler class MyServer(socketserver.BaseRequestHandler): # 实现handle方法 def handle(self): # self.request, self.client_address, self.server conn = self.request conn.sendall(bytes('欢迎致电中国移动', encoding='utf-8')) while True: ret_bytes = conn.recv(1024) ret_str = str(ret_bytes, encoding='utf-8') if ret_str == 'q': break conn.sendall(bytes(ret_str + '好的', encoding='utf-8')) if __name__ == '__main__': server = socketserver.ThreadingTCPServer(('127.0.0.1', 9999), MyServer) server.serve_forever()
网友评论