转载:https://www.cnblogs.com/noright/p/python-socket-broadcast.html
#!/usr/bin/env python3
import time
import threading
import queue
import socket
# 三个线程:
# 线程1:产生递增的数字,转成字符串放到队列中
# 线程2:监听端口,将产生的连接放到列表中
# 线程3:从队列中取出数字,遍历连接列表,发送到所有客户端
# 线程1:产生递增的数字,转成字符串放到队列中
class Producer(threading.Thread):
def __init__(self, work_queue):
super().__init__() # 必须调用
self.work_queue = work_queue
def run(self):
#print("Begin produce...")
num = 1
while True:
self.work_queue.put(str(num))
num = num+1
time.sleep(1) # 暂停1秒
# 线程2:监听端口,将产生的连接放到列表中
class SocketServer(threading.Thread):
def __init__(self, socket_list):
super().__init__()
self.socket_list = socket_list
def run(self):
sock = socket.socket()
sock.bind(('', 9090))
sock.listen(5)
print("Start listen...")
while True:
conn, addr = sock.accept()
print("Connect by", addr)
self.socket_list.append((conn, addr))
# 线程3:从队列中取出数字,遍历连接列表,发送到所有客户端
class Printer(threading.Thread):
def __init__(self, work_queue, socket_list):
super().__init__() # 必须调用
self.work_queue = work_queue
self.socket_list = socket_list
def run(self):
while True:
num = self.work_queue.get() # 当队列为空时,会阻塞,直到有数据
for sock, addr in self.socket_list: # 遍历保存连接的列表
print("Send", num, "To", addr)
try:
sock.sendall(bytes(num + '\r\n', 'utf-8')) # 把字符串转换成字节数组发送
except:
print("Disconnect by", addr) # 如果连接断开,发送会失败
self.socket_list.remove((sock, addr)) # 从列表中删除断开的连接
def main():
work_queue = queue.Queue()
socket_list = [] # 为了更安全可靠,从多线程访问列表时应该加锁,
# 这里做了简化,因为列表的增加删除操作基本上可以认为是线程安全的
socket_server = SocketServer(socket_list)
socket_server.daemon = True
socket_server.start()
printer = Printer(work_queue, socket_list)
printer.daemon = True # 当主线程退出时子线程也退出
printer.start()
producer = Producer(work_queue)
producer.daemon = True # 当主线程退出时子线程也退出
producer.start()
time.sleep(1) # 这里要暂停一下,否则执行下一条语句时,会因队列为空而直接返回
work_queue.join() # 主线程会停在这里,直到所有数字被get(),并且task_done(),因为没有调用task_done(),所在这里会一直阻塞,直到用户按^C
if __name__ == '__main__':
main()
网友评论