参考资料
https://blog.csdn.net/z50l2o08e2u4aftor9a/article/details/80276522
首先介绍一下具体的功能设计。
整个系统由服务端、客户端组成。客户端包含前端(浏览器端、pyqt桌面端)和后台(用于添加要推送的消息)。
因为浏览器端不容易注意到实时提醒,所以我将实时提醒功能放到pyqt桌面端中实现(如托盘区闪烁等醒目的方式)。
浏览器端中只对消息进行拉取操作(刷新网页时),而不采取实时推的方式。
pyqt桌面端主要接收新消息提示即可,通知用户到网页端查看最新消息。(登录时主动拉取一次消息判断有无新消息,或者服务器主动推送所有新消息)
实时推送消息和主动拉取消息两个功能实际上是完全分离的,可以独立行使各自的职能。
不过下面我就不写浏览器端的代码了, 直接把相关的逻辑也放到pyqt里实现。
然后再来讲一下拉取服务端消息的逻辑。
每个用户拥有自己的消息库。当消息库为空时,一次性拉取服务端所有消息入用户库;当消息库内有消息时,每次拉取只拉取用户库内最新的消息发布时间之后的消息(将未入用户库的消息入库后再从用户库拉取)。
代码共5部分
1.鉴权服务器:用于服务和服务之间对用户提供的token做权限检查
2.消息数据服务器:用于读取和处理用户消息在数据库中的状态。
3.消息在线推送服务器:将管理员发布的消息在线实时推送给所有在线用户(若使用发布器的在线推送功能)
4.管理员消息发布器:分为在线推送和离线入库两个功能,可独立或组合使用。在线推送消息不会入消息库。
5.用户消息监听器:登陆后可接收管理员推送的在线消息,也可主动拉取用户对应消息库中的所有已读、未读消息,并标记消息的已读、未读状态。
下面是代码部分(为了简化代码逻辑,后面代码中的token即用户名。)
1.鉴权服务器
from flask import Flask, request, jsonify
app = Flask(__name__)
class Lib:
@staticmethod
def auth_admin_token(token):
return True
@staticmethod
def auth_user_token(token):
return True
@app.route('/admin', methods = ['POST'])
def admin_auth_token():
data = request.json
if Lib.auth_admin_token(data['token']):
return jsonify({'code': 200, 'msg': '鉴权成功'})
return jsonify({'code': 400, 'msg': '鉴权失败'})
@app.route('/user', methods = ['POST'])
def user_auth_token():
data = request.json
if Lib.auth_user_token(data['token']):
return jsonify({'code': 200, 'msg': '鉴权成功'})
return jsonify({'code':400, 'msg': '鉴权失败'})
if __name__ == '__main__':
app.run(host = '0.0.0.0', port = 5009)
2.消息数据服务器
# coding : utf-8
# author : ['Wang Suyin', ]
# data : 2020/2/21 14:14
# software : PyCharm
# python_version : '3.5.3 64bit'
# file : 2.消息数据服务器.py
"""
说明文档:
"""
import datetime
import requests
import json
from flask import Flask,request,jsonify
app=Flask(__name__)
# 虚拟数据
db = {
'message_table': [
{'id': 1, 'message': '消息1', 'publish_time': (2020, 1, 20, 3, 4, 5)},
{'id': 2, 'message': '消息2', 'publish_time': (2020, 1, 20, 3, 4, 6)},
{'id': 3, 'message': '消息3', 'publish_time': (2020, 1, 20, 3, 4, 7)},
],
'user_to_messages_table': [
{'username': 'tester', 'message_id': 1, 'read': True},
{'username': 'tester', 'message_id': 2, 'read': False},
],
}
class Utils:
@staticmethod
def auth_admin_token(token):
data = {'token': token}
r = requests.post(url = 'http://127.0.0.1:5009/admin', json = data)
r = json.loads(r.text)
if r['code'] == 200:
return True
return False
@staticmethod
def add_message(message):
t = datetime.datetime.now()
publish_time = (t.year, t.month, t.day, t.hour, t.minute, t.second)
db['message_table'].append({'id':len(db['message_table'])+1,'message': message, 'publish_time': publish_time})
@staticmethod
def get_user_messages(username):
print(db['message_table'])
# 拉取新消息入用户库
# 原先考虑的是拉取最新时间后的消息,目前看来求消息ID表的差集更简单些,也更保险些(防止遗漏)。当然,要根据具体场景优化,代码也要更换为数据库代码,这里仅为示意。
for message_id in list(
set([e['id'] for e in db['message_table']])
- set([e['message_id'] for e in [e_ for e_ in db['user_to_messages_table'] if e_['username']==username]])
):
db['user_to_messages_table'].append({'username': username, 'message_id': message_id, 'read': False})
# 在通知量少的情况下,可一次性返回所有内容。
# 在通知量多的情况下,可分页返回,也可只返回未读消息,已读消息由客户端本地保存。
# 简单起见,以下一次性返回所有内容
res = []
for e in db['user_to_messages_table']:
if e['username'] == username:
d = [e_ for e_ in db['message_table'] if e_['id'] == e['message_id']][0]
d.update({'read': e['read']})
res.append(d)
return res
@staticmethod
def mark_as_read(username, message_id,read_status):
#将消息标记为已读、未读状态,比较简单,就不具体谢写了
pass
@app.route('/add_message', methods=['POST'])
def add_message():
data = request.json
token = data['token']
r = requests.post(url='http://127.0.0.1:5009/admin',json={ 'token':token } )
r = json.loads(r.text)
if not r['code'] == 200:
return jsonify({'code':400,'msg':'鉴权失败'})
Utils.add_message(data['message'])
print('新的消息入库成功')
return jsonify({'code': 200, 'msg': '消息发布成功'})
@app.route('/get_user_messages', methods=['POST'])
def get_user_messages():
data = request.json
token = data['token']
if not Utils.auth_admin_token(token):
return jsonify({'code':400,'msg':'鉴权失败'})
username = token
data = Utils.get_user_messages(username)
return jsonify({'code': 200, 'msg': '拉取用户消息成功','data':data})
@app.route('/mark_as_read', methods=['POST'])
def mark_as_read():
data = request.json
token = data['token']
if not Utils.auth_admin_token(token):
return jsonify({'code':400,'msg':'鉴权失败'})
username = token
message_id = data['message_id']
read_status = data['read_status']
data = Utils.mark_as_read(username, message_id,read_status)
return jsonify({'code': 200, 'msg': '拉取用户消息成功','data':data})
if __name__ == '__main__':
app.run(host = '0.0.0.0', port = 5008)
3.消息在线推送服务器
# coding : utf-8
# author : ['Wang Suyin', ]
# data : 2020/2/20 16:48
# software : PyCharm
# python_version : '3.5.3 64bit'
# file : 3.消息在线推送服务器.py
"""
说明文档:
"""
import requests
import json
from flask import Flask
from flask_sockets import Sockets
app = Flask(__name__)
sockets = Sockets(app)
ws_pool = [] # 推送目标池
def auth_admin_token(token):
data = { 'token':token }
r = requests.post(url='http://127.0.0.1:5009/admin',json=data )
r = json.loads(r.text)
if r['code'] == 200:
return True
return False
def auth_user_token(token):
data = { 'token':token }
r = requests.post(url='http://127.0.0.1:5009/user',json=data )
r = json.loads(r.text)
if r['code'] == 200:
return True
return False
#新消息的插入可以通过ws也可以通过http
@sockets.route('/admin')
def admin_socket(ws):
print('admin接入')
r_data = ws.receive()
r_data = json.loads(r_data)
token = r_data['data']['token']
if not r_data['type']=='init' or not auth_admin_token(token):
ws.send(json.dumps({'type':'init_r','code': 400, 'msg': '鉴权失败'}))
ws.close()
return
ws.send(json.dumps({'type':'init_r','code': 200, 'msg': '鉴权成功'}))
while not ws.closed:
r_data = ws.receive()
if not r_data:
break
ws.send(json.dumps({'type':'message_r','code':200, 'msg':'发布成功'}))
data = json.loads(r_data)
if data['type'] == 'message':
print('将消息推送给{}'.format(ws_pool))
print(ws_pool)
#推送给在线用户
for e in ws_pool:
try:
e.send(json.dumps({'type':'message','data':{'message':data['data']['message']}}))
except:
ws_pool.remove(e)
try:
ws.close()
except:
pass
@sockets.route('/listener')
def listener_socket(ws):
print('listener接入')
r_data = ws.receive()
r_data = json.loads(r_data)
token = r_data['data']['token']
if not r_data['type']=='init' or not auth_user_token(token):
ws.send(json.dumps({'type':'init_r','code': 400, 'msg': '鉴权失败'}))
ws.close()
return
ws.send(json.dumps({'type':'init_r','code': 200, 'msg': '鉴权成功'}))
ws_pool.append(ws)
while not ws.closed:
r_data = ws.receive()
if not r_data:
break
#这里阻塞住就可以了,因为消息监听器只接收消息
try:
ws.close()
except:
pass
finally:
ws_pool.remove(ws)
if __name__ == '__main__':
from gevent import pywsgi
from geventwebsocket.handler import WebSocketHandler
from gevent import monkey
monkey.patch_all()
server = pywsgi.WSGIServer(('0.0.0.0', 5003), app, handler_class = WebSocketHandler)
print('web server start ... ')
server.serve_forever()
4.管理员消息发布器
import json
import requests
import websocket
websocket.enableTrace(True)
# 这里就不写界面了,要推送的消息一并写在on_message里
class OnlinePublisher:
_message = ''
def __init__(self, url = 'ws://127.0.0.1:5003/admin'):
self.__ws = ws = websocket.WebSocketApp(url)
ws.on_open = self.on_open
ws.on_message = self.on_message
ws.on_error = self.on_error
ws.on_close = self.on_close
def publish(self, message):
OnlinePublisher._message = message
self.__ws.run_forever()
@staticmethod
def on_open(ws):
token = 'admin'
data = json.dumps({'type': 'init', 'data': {'token': token}})
ws.send(data)
@staticmethod
def on_message(ws, r_data):
print('接收到消息:{}'.format(r_data))
data = json.loads(r_data)
if data['type'] == 'init_r':
if data['code'] != 200:
ws.close()
raise Exception('鉴权失败')
ws.send(json.dumps({'type': 'message', 'data': {'message': OnlinePublisher._message}}))
elif data['type'] == 'message_r':
if data['code'] == 200:
print('发布成功')
else:
raise Exception('发布失败')
ws.close()
@staticmethod
def on_error(ws):
print('连接异常')
@staticmethod
def on_close(ws):
print('连接关闭')
def online_publish(message):
OnlinePublisher().publish(message)
def offline_publish(message):
data = {'token': 'admin', 'message': message}
r = requests.post(url = 'http://127.0.0.1:5008/add_message', json = data)
print(r.text)
r = json.loads(r.text)
if r['code'] == 400:
raise Exception('鉴权失败')
print('消息入库成功')
if __name__ == '__main__':
message = '测试消息,这是一条公告'
online_publish(message)
offline_publish(message)
5.用户消息监听器
import json
import sys
import datetime
import threading
import requests
from PyQt5.QtWidgets import *
from PyQt5.QtGui import *
from PyQt5.QtCore import *
import websocket
websocket.enableTrace(True)
class ActionSet:
@staticmethod
def on_open(ws):
token = 'user'
data = json.dumps({'type': 'init', 'data': {'token': token}})
ws.send(data)
@staticmethod
def on_message(ws, r_data):
print('接收到消息:{}'.format(r_data))
data = json.loads(r_data)
if data['type'] == 'init_r':
if data['code'] != 200:
ws.close()
print('鉴权失败')
return
print('鉴权成功')
if data['type'] == 'message':
print(data['data']['message'])
MainWindow._instance.new_message_got.emit(data['data']['message'])
@staticmethod
def on_error(ws):
print('连接异常')
@staticmethod
def on_close(ws):
print('连接关闭')
class MainWindow(QWidget):
_instance = None
new_message_got = pyqtSignal(str)
def __init__(self, parent = None):
super().__init__(parent)
MainWindow._instance = self
self.setWindowTitle('消息监听器')
self.resize(800, 600)
self.__btn_online_connect = QPushButton('连接')
self.__lw_online_message = QListWidget()
self.__le_username = QLineEdit()
self.__le_username.setPlaceholderText('要拉取消息的用户名')
self.__btn_pull_all_messages = QPushButton('拉取所有消息/刷新')
self.__lw_all_messages = QListWidget()
main_layout = QVBoxLayout()
main_layout.setSpacing(0)
main_layout.setContentsMargins(0, 0, 0, 0)
self.setLayout(main_layout)
main_layout.addWidget(self.__btn_online_connect)
main_layout.addWidget(self.__lw_online_message)
main_layout.addWidget(self.__le_username)
main_layout.addWidget(self.__btn_pull_all_messages)
main_layout.addWidget(self.__lw_all_messages)
self.__btn_online_connect.clicked.connect(self.__on_btn_online_connect_clicked)
self.new_message_got.connect(self.__show_new_message)
self.__btn_pull_all_messages.clicked.connect(self.__on_btn_pull_all_messages_clicked)
def __on_btn_online_connect_clicked(self):
url = 'ws://127.0.0.1:5003/listener'
ws = websocket.WebSocketApp(url)
ws.on_open = ActionSet.on_open
ws.on_message = ActionSet.on_message
ws.on_error = ActionSet.on_error
ws.on_close = ActionSet.on_close
wst = threading.Thread(target = ws.run_forever)
wst.setDaemon(True)
wst.start()
def __on_btn_pull_all_messages_clicked(self):
data = {'token': self.__le_username.text()}
r = requests.post(url = 'http://127.0.0.1:5008/get_user_messages', json = data)
r = json.loads(r.text)
if r['code'] == 400:
raise Exception('鉴权失败')
print('消息拉取成功')
message_datas = sorted(r['data'][:],key = lambda x:-x['id'])
self.__lw_all_messages.clear()
for d in message_datas:
self.__lw_all_messages.addItem('【{}】{}'.format({True:'已读',False:'未读'}[d['read']],d['message'] ) )
def __show_new_message(self, text):
time_ = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
text = '{} {}'.format(time_, text)
self.__lw_online_message.insertItem(0, text)
if __name__ == '__main__':
app = QApplication(sys.argv)
w = MainWindow()
w.show()
app.exec_()
网友评论