对象方法和属性
名 称 | 描 述 |
---|---|
服务端 | 服务器套接字方法 |
s.bind(ADDR) | 将地址(主机名、端口号对)绑定到套接字上 |
s.listen([backlog]) | 设置并启动 TCP 监听器,如果指定backlog,则必须至少为0(如果低于0,则设置为0); |
s.accept() | 被动接受 TCP 客户端连接,一直等待直到连接到达(阻塞) |
客户端 | 客户端套接字方法 |
s.connect() | 主动发起 TCP 服务器连接 |
s.connect_ex() | connect()的扩展版本,此时会以错误码的形式返回问题,而不是抛出一个异常 |
普通通用 | 普通的套接字方法 |
s.recv() | 接收 TCP 消息 |
s.recv_into() | 接收 TCP 消息到指定的缓冲区 |
s.send() | 发送 TCP 消息 |
s.sendall() | 完整地发送 TCP 消息 |
s.recvfrom() | 接收 UDP 消息 |
s.recvfrom_into() | 接收 UDP 消息到指定的缓冲区 |
s.sendto() | 发送 UDP 消息 |
s.getpeername() | 连接到套接字(TCP)的远程地址 |
s.getsockname() | 当前套接字的地址 |
s.getsockopt() | 返回给定套接字选项的值 |
s.setsockopt() | 设置给定套接字选项的值 |
s.shutdown() | 关闭连接 |
s.close() | 关闭套接字 |
s.detach() | 在未关闭文件描述符的情况下关闭套接字,返回文件描述符 |
s.ioctl() | 控制套接字的模式(仅支持 Windows) |
阻塞 | 面向阻塞的套接字方法 |
s.setblocking() | 设置套接字的阻塞或非阻塞模式 |
s.settimeout() | 设置阻塞套接字操作的超时时间 |
s.gettimeout() | 获取阻塞套接字操作的超时时间 |
文件方法 | 面向文件的套接字方法 |
s.fileno() | 套接字的文件描述符 |
s.makefile() | 创建与套接字关联的文件对象 |
属性 | 数据属性 |
s.family | 套接字家族 |
s.type | 套接字类型 |
s.proto | 套接字协议 |
创建服务和客户端
创建TCP服务
s = socket() # 创建服务器套接字
s.bind(ADDR) # 套接字与地址绑定
s.listen() # 监听连接
while True: # 服务器无限循环
c = s.accept() # 接受客户端连接
comm_loop: # 通信循环
c.recv()/cs.send() # 对话(接收 / 发送)
c.close() # 关闭客户端套接字
s.close() # 关闭服务器套接字 # (可选)
示例
import socket # 导入 socket 模块
s = socket.socket() # 创建 socket 对象
host = socket.gethostname() # 获取本地主机名
port = 12345 # 设置端口
s.bind((host, port)) # 绑定端口
s.listen(5) # 等待客户端连接
while True:
c, addr = s.accept() # 建立客户端连接
print('连接地址:', addr)
c.send('今天天气真好/哈哈'.encode(encoding="UTF-8"))
c.close() # 关闭连接
s.close()
创建 TCP 客户端
s = socket() # 创建客户端套接字
s.connect() # 尝试连接服务器
comm_loop: # 通信循环
s.send()/cs.recv() # 对话(发送 / 接收)
s.close() # 关闭客户端套接字
示例
import socket # 导入 socket 模块
s = socket.socket() # 创建 socket 对象
host = socket.gethostname() # 获取本地主机名
port = 12345 # 设置端口号
s.connect((host, port))
str1 = s.recv(1024)
str1=str1.decode(encoding="gbk")
print(str1)
s.close()
创建UDP服务
s = socket() # 创建服务器套接字
s.bind(ADDR) # 套接字与地址绑定
while True: # 服务器无限循环
s.sendto() # 发送
s.recvfrom() # 接收
s.close() # 关闭服务器套接字 # (可选)
创建 UDP 客户端
cs = socket() # 创建客户端套接字
comm_loop: # 通信循环
cs.sendto() # 发送
cs.recvfrom() # 接收
cs.close() # 关闭客户端套接字
demo
#server.py 打开终端先运行
import socket
from time import ctime
import json
import time
#HOST = ''
HOST = socket.gethostname()
PORT = 12346
ADDR = (HOST, PORT)
BUFFSIZE = 1024
MAX_LISTEN = 5
ENCODING = 'utf-8'
def tcpServer():
# TCP服务
# with socket.socket() as s:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
# 绑定服务器地址和端口
s.bind(ADDR)
# 启动服务监听
s.listen(MAX_LISTEN)
print('等待用户接入。。。。。。。。。。。。')
while True:
# 等待客户端连接请求,获取connSock
conn, addr = s.accept()
print('警告,远端客户:{} 接入系统!!!'.format(addr))
with conn:
while True:
print('接收请求信息。。。。。')
# 接收请求信息
data = conn.recv(BUFFSIZE)
print('data=%s' % data)
print('接收数据:{!r}'.format(data.decode(ENCODING)))
if data.decode(ENCODING) == 'q':
print('当前用户退出,等待下个用户')
break
# 发送请求数据
conn.send(data)
print('发送返回完毕!!!')
conn.close()
s.close()
# 创建UDP服务
def udpServer():
# 创建UPD服务端套接字
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
# 绑定地址和端口
s.bind(ADDR)
# 等待接收信息
while True:
print('UDP服务启动,准备接收数据。。。')
# 接收数据和客户端请求地址
data, address = s.recvfrom(BUFFSIZE)
if not data:
break
print('接收请求信息:{}'.format(data.decode('utf-8')))
s.sendto(b'i am udp,i got it', address)
s.close()
if __name__ == '__main__':
while True:
choice = input('input choice t-tcp or u-udp:(t/u)')
if choice != 't' and choice != 'u':
print('please input t or u,ok?')
continue
if choice == 't':
print('execute tcpsever')
tcpServer()
else:
print('execute udpsever')
udpServer()
#client.py 打开终端先运行server.py后运行client.py,可以开启多个
import socket
from time import ctime
HOST = socket.gethostname()
PORT = 12346
ADDR = (HOST, PORT)
ENCODING = 'utf-8'
BUFFSIZE = 1024
def tcpClient():
# 创建客户套接字
with socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM) as s:
# 尝试连接服务器
s.connect(ADDR)
print('连接服务成功!!')
#通信循环
while True:
inData = input('pleace input something:')
if inData == 'q':
s.send(inData.encode(encoding=ENCODING))
break
# 发送数据到服务器
inData = '[{}]:{}'.format(ctime(), inData)
s.send(inData.encode(encoding=ENCODING))
print('发送成功!')
# 接收返回数据
outData = s.recv(BUFFSIZE)
print('返回数据信息:{!r}'.format(outData))
# 关闭客户端套接字
s.close()
def udpClient():
# 创建客户端套接字
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
while True:
# 发送信息到服务器
data = input('please input message to server or input \'quit\':')
if data == 'quit':
break
data = '[{}]:{}'.format(ctime(), data)
s.sendto(data.encode('utf-8'), ADDR)
print('send success')
# 接收服务端返回信息
recvData, addrs = s.recvfrom(BUFFSIZE)
print('recv message : {}'.format(recvData.decode('utf-8')))
# 关闭套接字
s.close()
if __name__ == '__main__':
while True:
choice = input('input choice t-tcp or u-udp or q-quit:')
if choice == 'q':
break
if choice != 't' and choice != 'u':
print('please input t or u,ok?')
continue
if choice == 't':
print('execute tcpsever')
tcpClient()
else:
print('execute udpsever')
udpClient()
网友评论