先稍微拆分下服务端的逻辑,原先处理发布者和接收者都写在一个连接里,有点杂乱了
推送服务器
# coding : utf-8
# author : ['Wang Suyin', ]
# data : 2020/2/20 16:48
# software : PyCharm
# python_version : '3.5.3 64bit'
# file : 推送服务器.py
"""
说明文档:
"""
import json
from flask import Flask
from flask_sockets import Sockets
app = Flask(__name__)
sockets = Sockets(app)
ws_pool = [] # 推送目标池
def auth_admin_token(token):
return True
def auth_listener_token(token):
return True
@sockets.route('/admin')
def admin_socket(ws):
print('admin接入')
r_data = ws.receive()
r_data = json.loads(r_data)
token = r_data['data']['token']
if not r_data['type']=='init' or not auth_admin_token(token):
ws.send(json.dumps({'type':'init_r','code': 400, 'msg': '鉴权失败'}))
ws.close()
return
ws.send(json.dumps({'type':'init_r','code': 200, 'msg': '鉴权成功'}))
while not ws.closed:
r_data = ws.receive()
if not r_data:
break
ws.send(json.dumps({'type':'message_r','code':200, 'msg':'发布成功'}))
data = json.loads(r_data)
if data['type'] == 'message':
print('将消息推送给{}'.format(ws_pool))
print(ws_pool)
#推送给在线用户
for e in ws_pool:
try:
e.send(json.dumps({'type':'message','data':{'message':data['data']['message']}}))
except:
ws_pool.remove(e)
try:
ws.close()
except:
pass
@sockets.route('/listener')
def listener_socket(ws):
print('listener接入')
r_data = ws.receive()
r_data = json.loads(r_data)
token = r_data['data']['token']
if not r_data['type']=='init' or not auth_admin_token(token):
ws.send(json.dumps({'type':'init_r','code': 400, 'msg': '鉴权失败'}))
ws.close()
return
ws.send(json.dumps({'type':'init_r','code': 200, 'msg': '鉴权成功'}))
ws_pool.append(ws)
while not ws.closed:
r_data = ws.receive()
if not r_data:
break
#这里阻塞住就可以了,因为消息监听器只接收消息
try:
ws.close()
except:
pass
finally:
ws_pool.remove(ws)
if __name__ == '__main__':
from gevent import pywsgi
from geventwebsocket.handler import WebSocketHandler
from gevent import monkey
monkey.patch_all()
server = pywsgi.WSGIServer(('0.0.0.0', 5003), app, handler_class = WebSocketHandler)
print('web server start ... ')
server.serve_forever()
消息发布器
import json
import websocket
# 这里就不写界面了,要推送的消息一并写在on_message里
def on_open(ws):
token = 'admin'
data = json.dumps({'type': 'init', 'data': {'token': token}})
ws.send(data)
def on_message(ws, r_data):
print('接收到消息:{}'.format(r_data))
data = json.loads(r_data)
if data['type'] == 'init_r':
if data['code'] != 200:
ws.close()
print('鉴权失败')
return
ws.send(json.dumps({'type': 'message', 'data': {'message': '测试消息,这是一条公告'}}))
elif data['type'] == 'message_r':
if data['code'] == 200:
print('发布成功')
else:
print('发布失败')
ws.close()
def on_error(ws):
print('连接异常')
def on_close(ws):
print('连接关闭')
if __name__ == '__main__':
websocket.enableTrace(True)
ws = websocket.WebSocketApp('ws://127.0.0.1:5003/admin')
ws.on_open = on_open
ws.on_message = on_message
ws.on_error = on_error
ws.on_close = on_close
ws.run_forever()
消息监听器
import json
import sys
import datetime
import threading
from PyQt5.QtWidgets import *
from PyQt5.QtGui import *
from PyQt5.QtCore import *
import websocket
websocket.enableTrace(True)
class ActionSet:
@staticmethod
def on_open( ws):
token = '123456qwe'
data = json.dumps({'type': 'init', 'data': {'token': token}})
ws.send(data)
@staticmethod
def on_message(ws, r_data):
print('接收到消息:{}'.format(r_data))
data = json.loads(r_data)
if data['type'] == 'init_r':
if data['code'] != 200:
ws.close()
print('鉴权失败')
return
print('鉴权成功')
if data['type'] == 'message':
print(data['data']['message'])
MainWindow._instance.new_message_got.emit(data['data']['message'])
@staticmethod
def on_error(ws):
print('连接异常')
@staticmethod
def on_close(ws):
print('连接关闭')
class MainWindow(QWidget):
_instance=None
new_message_got = pyqtSignal(str)
def __init__(self, parent = None):
super().__init__(parent)
MainWindow._instance = self
self.setWindowTitle('消息监听器')
self.resize(800, 600)
self.__btn = QPushButton('连接')
self.__lw =QListWidget()
main_layout = QVBoxLayout()
main_layout.setSpacing(0)
main_layout.setContentsMargins(0,0,0,0)
self.setLayout(main_layout)
main_layout.addWidget(self.__btn)
main_layout.addWidget(self.__lw)
self.__btn.clicked.connect(self.__on_btn_clicked)
self.new_message_got.connect(self.__show_new_message)
def __on_btn_clicked(self):
url='ws://127.0.0.1:5003/listener'
ws = websocket.WebSocketApp(url )
ws.on_open = ActionSet.on_open
ws.on_message = ActionSet.on_message
ws.on_error = ActionSet.on_error
ws.on_close = ActionSet.on_close
wst = threading.Thread(target = ws.run_forever)
wst.setDaemon(True)
wst.start()
def __show_new_message(self,text):
time_ = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
text= '{} {}'.format(time_, text)
self.__lw.insertItem(0,text)
if __name__ == '__main__':
app = QApplication(sys.argv)
w = MainWindow()
w.show()
app.exec_()
网友评论