美文网首页Django
在线消息推送和离线消息推送(3)

在线消息推送和离线消息推送(3)

作者: 独步江雪 | 来源:发表于2020-02-21 13:37 被阅读0次

    参考资料
    https://blog.csdn.net/z50l2o08e2u4aftor9a/article/details/80276522

    首先介绍一下具体的功能设计。
    整个系统由服务端、客户端组成。客户端包含前端(浏览器端、pyqt桌面端)和后台(用于添加要推送的消息)。

    因为浏览器端不容易注意到实时提醒,所以我将实时提醒功能放到pyqt桌面端中实现(如托盘区闪烁等醒目的方式)。
    浏览器端中只对消息进行拉取操作(刷新网页时),而不采取实时推的方式。
    pyqt桌面端主要接收新消息提示即可,通知用户到网页端查看最新消息。(登录时主动拉取一次消息判断有无新消息,或者服务器主动推送所有新消息)

    实时推送消息和主动拉取消息两个功能实际上是完全分离的,可以独立行使各自的职能。

    不过下面我就不写浏览器端的代码了, 直接把相关的逻辑也放到pyqt里实现。

    然后再来讲一下拉取服务端消息的逻辑。
    每个用户拥有自己的消息库。当消息库为空时,一次性拉取服务端所有消息入用户库;当消息库内有消息时,每次拉取只拉取用户库内最新的消息发布时间之后的消息(将未入用户库的消息入库后再从用户库拉取)。

    代码共5部分
    1.鉴权服务器:用于服务和服务之间对用户提供的token做权限检查
    2.消息数据服务器:用于读取和处理用户消息在数据库中的状态。
    3.消息在线推送服务器:将管理员发布的消息在线实时推送给所有在线用户(若使用发布器的在线推送功能)
    4.管理员消息发布器:分为在线推送和离线入库两个功能,可独立或组合使用。在线推送消息不会入消息库。
    5.用户消息监听器:登陆后可接收管理员推送的在线消息,也可主动拉取用户对应消息库中的所有已读、未读消息,并标记消息的已读、未读状态。

    下面是代码部分(为了简化代码逻辑,后面代码中的token即用户名。)

    1.鉴权服务器

    from flask import Flask, request, jsonify
    
    app = Flask(__name__)
    
    
    class Lib:
        @staticmethod
        def auth_admin_token(token):
            return True
    
        @staticmethod
        def auth_user_token(token):
            return True
    
    
    @app.route('/admin', methods = ['POST'])
    def admin_auth_token():
        data = request.json
        if Lib.auth_admin_token(data['token']):
            return jsonify({'code': 200, 'msg': '鉴权成功'})
        return jsonify({'code': 400, 'msg': '鉴权失败'})
    
    
    @app.route('/user', methods = ['POST'])
    def user_auth_token():
        data = request.json
        if Lib.auth_user_token(data['token']):
            return jsonify({'code': 200, 'msg': '鉴权成功'})
        return jsonify({'code':400, 'msg': '鉴权失败'})
    
    
    if __name__ == '__main__':
        app.run(host = '0.0.0.0', port = 5009)
    
    

    2.消息数据服务器

    #         coding : utf-8
    #         author : ['Wang Suyin', ]
    #           data : 2020/2/21 14:14
    #       software : PyCharm
    # python_version : '3.5.3 64bit'
    #           file : 2.消息数据服务器.py
    """
    说明文档:
    
    """
    import datetime
    import requests
    import json
    
    from flask import Flask,request,jsonify
    app=Flask(__name__)
    
    # 虚拟数据
    db = {
        'message_table': [
            {'id': 1, 'message': '消息1', 'publish_time': (2020, 1, 20, 3, 4, 5)},
            {'id': 2, 'message': '消息2', 'publish_time': (2020, 1, 20, 3, 4, 6)},
            {'id': 3, 'message': '消息3', 'publish_time': (2020, 1, 20, 3, 4, 7)},
        ],
        'user_to_messages_table': [
            {'username': 'tester', 'message_id': 1, 'read': True},
            {'username': 'tester', 'message_id': 2, 'read': False},
        ],
    }
    
    class Utils:
        @staticmethod
        def auth_admin_token(token):
            data = {'token': token}
            r = requests.post(url = 'http://127.0.0.1:5009/admin', json = data)
            r = json.loads(r.text)
            if r['code'] == 200:
                return True
            return False
    
        @staticmethod
        def add_message(message):
            t = datetime.datetime.now()
            publish_time = (t.year, t.month, t.day, t.hour, t.minute, t.second)
            db['message_table'].append({'id':len(db['message_table'])+1,'message': message, 'publish_time': publish_time})
    
        @staticmethod
        def get_user_messages(username):
            print(db['message_table'])
            # 拉取新消息入用户库
            # 原先考虑的是拉取最新时间后的消息,目前看来求消息ID表的差集更简单些,也更保险些(防止遗漏)。当然,要根据具体场景优化,代码也要更换为数据库代码,这里仅为示意。
            for message_id in list(
                    set([e['id'] for e in db['message_table']])
                    - set([e['message_id'] for e in [e_ for e_ in db['user_to_messages_table'] if e_['username']==username]])
            ):
                db['user_to_messages_table'].append({'username': username, 'message_id': message_id, 'read': False})
    
            # 在通知量少的情况下,可一次性返回所有内容。
            # 在通知量多的情况下,可分页返回,也可只返回未读消息,已读消息由客户端本地保存。
            # 简单起见,以下一次性返回所有内容
            res = []
            for e in db['user_to_messages_table']:
                if e['username'] == username:
                    d = [e_ for e_ in db['message_table'] if e_['id'] == e['message_id']][0]
                    d.update({'read': e['read']})
                    res.append(d)
            return res
        @staticmethod
        def mark_as_read(username, message_id,read_status):
            #将消息标记为已读、未读状态,比较简单,就不具体谢写了
            pass
    
    
    
    
    
    
    @app.route('/add_message', methods=['POST'])
    def add_message():
        data = request.json
        token = data['token']
    
        r = requests.post(url='http://127.0.0.1:5009/admin',json={  'token':token  } )
        r =  json.loads(r.text)
        if not r['code'] == 200:
            return jsonify({'code':400,'msg':'鉴权失败'})
        Utils.add_message(data['message'])
        print('新的消息入库成功')
        return jsonify({'code': 200, 'msg': '消息发布成功'})
    
    @app.route('/get_user_messages', methods=['POST'])
    def get_user_messages():
        data = request.json
        token = data['token']
    
        if not Utils.auth_admin_token(token):
            return jsonify({'code':400,'msg':'鉴权失败'})
    
        username = token
        data = Utils.get_user_messages(username)
        return jsonify({'code': 200, 'msg': '拉取用户消息成功','data':data})
    
    
    @app.route('/mark_as_read', methods=['POST'])
    def mark_as_read():
        data = request.json
        token = data['token']
    
        if not Utils.auth_admin_token(token):
            return jsonify({'code':400,'msg':'鉴权失败'})
        username = token
        message_id =  data['message_id']
        read_status = data['read_status']
        data = Utils.mark_as_read(username, message_id,read_status)
        return jsonify({'code': 200, 'msg': '拉取用户消息成功','data':data})
    
    
    if __name__ == '__main__':
        app.run(host = '0.0.0.0', port = 5008)
    
    

    3.消息在线推送服务器

    #         coding : utf-8
    #         author : ['Wang Suyin', ]
    #           data : 2020/2/20 16:48
    #       software : PyCharm
    # python_version : '3.5.3 64bit'
    #           file : 3.消息在线推送服务器.py
    """
    说明文档:
    
    """
    import requests
    
    import json
    from flask import Flask
    from flask_sockets import Sockets
    
    
    app = Flask(__name__)
    sockets = Sockets(app)
    
    ws_pool = []  # 推送目标池
    
    
    
    def auth_admin_token(token):
        data = {  'token':token  }
        r = requests.post(url='http://127.0.0.1:5009/admin',json=data )
        r =  json.loads(r.text)
        if  r['code'] == 200:
            return True
        return False
    
    def auth_user_token(token):
        data = {  'token':token  }
        r = requests.post(url='http://127.0.0.1:5009/user',json=data )
        r =  json.loads(r.text)
        if  r['code'] == 200:
            return True
        return False
    
    #新消息的插入可以通过ws也可以通过http
    @sockets.route('/admin')
    def admin_socket(ws):
        print('admin接入')
        r_data = ws.receive()
        r_data = json.loads(r_data)
        token = r_data['data']['token']
        if not r_data['type']=='init' or not  auth_admin_token(token):
            ws.send(json.dumps({'type':'init_r','code': 400, 'msg': '鉴权失败'}))
            ws.close()
            return
        ws.send(json.dumps({'type':'init_r','code': 200, 'msg': '鉴权成功'}))
    
        while not ws.closed:
            r_data = ws.receive()
            if not r_data:
                break
            ws.send(json.dumps({'type':'message_r','code':200, 'msg':'发布成功'}))
    
            data = json.loads(r_data)
            if data['type'] == 'message':
                print('将消息推送给{}'.format(ws_pool))
                print(ws_pool)
                #推送给在线用户
                for e in ws_pool:
                    try:
                        e.send(json.dumps({'type':'message','data':{'message':data['data']['message']}}))
                    except:
                        ws_pool.remove(e)
        try:
            ws.close()
        except:
            pass
    
    @sockets.route('/listener')
    def listener_socket(ws):
        print('listener接入')
        r_data = ws.receive()
        r_data = json.loads(r_data)
        token = r_data['data']['token']
        if not r_data['type']=='init' or not  auth_user_token(token):
            ws.send(json.dumps({'type':'init_r','code': 400, 'msg': '鉴权失败'}))
            ws.close()
            return
        ws.send(json.dumps({'type':'init_r','code': 200, 'msg': '鉴权成功'}))
    
        ws_pool.append(ws)
        while not ws.closed:
            r_data = ws.receive()
            if not r_data:
                break
            #这里阻塞住就可以了,因为消息监听器只接收消息
        try:
            ws.close()
        except:
            pass
        finally:
            ws_pool.remove(ws)
    
    
    if __name__ == '__main__':
        from gevent import pywsgi
        from geventwebsocket.handler import WebSocketHandler
        from gevent import monkey
        monkey.patch_all()
    
        server = pywsgi.WSGIServer(('0.0.0.0', 5003), app, handler_class = WebSocketHandler)
        print('web server start ... ')
        server.serve_forever()
    
    

    4.管理员消息发布器

    import json
    import requests
    
    import websocket
    
    websocket.enableTrace(True)
    
    
    # 这里就不写界面了,要推送的消息一并写在on_message里
    
    
    class OnlinePublisher:
        _message = ''
    
        def __init__(self, url = 'ws://127.0.0.1:5003/admin'):
    
            self.__ws = ws = websocket.WebSocketApp(url)
            ws.on_open = self.on_open
            ws.on_message = self.on_message
            ws.on_error = self.on_error
            ws.on_close = self.on_close
    
        def publish(self, message):
            OnlinePublisher._message = message
            self.__ws.run_forever()
    
        @staticmethod
        def on_open(ws):
            token = 'admin'
            data = json.dumps({'type': 'init', 'data': {'token': token}})
            ws.send(data)
    
        @staticmethod
        def on_message(ws, r_data):
            print('接收到消息:{}'.format(r_data))
            data = json.loads(r_data)
            if data['type'] == 'init_r':
                if data['code'] != 200:
                    ws.close()
                    raise Exception('鉴权失败')
                ws.send(json.dumps({'type': 'message', 'data': {'message': OnlinePublisher._message}}))
    
    
            elif data['type'] == 'message_r':
                if data['code'] == 200:
                    print('发布成功')
                else:
                    raise Exception('发布失败')
                ws.close()
    
        @staticmethod
        def on_error(ws):
            print('连接异常')
    
        @staticmethod
        def on_close(ws):
            print('连接关闭')
    
    
    def online_publish(message):
        OnlinePublisher().publish(message)
    
    
    def offline_publish(message):
        data = {'token': 'admin', 'message': message}
        r = requests.post(url = 'http://127.0.0.1:5008/add_message', json = data)
        print(r.text)
        r = json.loads(r.text)
        if r['code'] == 400:
            raise Exception('鉴权失败')
        print('消息入库成功')
    
    
    if __name__ == '__main__':
        message = '测试消息,这是一条公告'
        online_publish(message)
        offline_publish(message)
    
    

    5.用户消息监听器

    import json
    import sys
    import datetime
    import threading
    import requests
    
    from PyQt5.QtWidgets import *
    from PyQt5.QtGui import *
    from PyQt5.QtCore import *
    
    import websocket
    
    websocket.enableTrace(True)
    
    
    class ActionSet:
        @staticmethod
        def on_open(ws):
            token = 'user'
            data = json.dumps({'type': 'init', 'data': {'token': token}})
            ws.send(data)
    
        @staticmethod
        def on_message(ws, r_data):
            print('接收到消息:{}'.format(r_data))
            data = json.loads(r_data)
            if data['type'] == 'init_r':
                if data['code'] != 200:
                    ws.close()
                    print('鉴权失败')
                    return
            print('鉴权成功')
            if data['type'] == 'message':
                print(data['data']['message'])
                MainWindow._instance.new_message_got.emit(data['data']['message'])
    
        @staticmethod
        def on_error(ws):
            print('连接异常')
    
        @staticmethod
        def on_close(ws):
            print('连接关闭')
    
    
    class MainWindow(QWidget):
        _instance = None
        new_message_got = pyqtSignal(str)
    
        def __init__(self, parent = None):
            super().__init__(parent)
            MainWindow._instance = self
            self.setWindowTitle('消息监听器')
            self.resize(800, 600)
    
            self.__btn_online_connect = QPushButton('连接')
            self.__lw_online_message = QListWidget()
    
            self.__le_username = QLineEdit()
            self.__le_username.setPlaceholderText('要拉取消息的用户名')
            self.__btn_pull_all_messages = QPushButton('拉取所有消息/刷新')
            self.__lw_all_messages = QListWidget()
    
    
            main_layout = QVBoxLayout()
            main_layout.setSpacing(0)
            main_layout.setContentsMargins(0, 0, 0, 0)
            self.setLayout(main_layout)
            main_layout.addWidget(self.__btn_online_connect)
            main_layout.addWidget(self.__lw_online_message)
            main_layout.addWidget(self.__le_username)
            main_layout.addWidget(self.__btn_pull_all_messages)
            main_layout.addWidget(self.__lw_all_messages)
            self.__btn_online_connect.clicked.connect(self.__on_btn_online_connect_clicked)
            self.new_message_got.connect(self.__show_new_message)
    
            self.__btn_pull_all_messages.clicked.connect(self.__on_btn_pull_all_messages_clicked)
    
    
        def __on_btn_online_connect_clicked(self):
            url = 'ws://127.0.0.1:5003/listener'
            ws = websocket.WebSocketApp(url)
    
            ws.on_open = ActionSet.on_open
            ws.on_message = ActionSet.on_message
            ws.on_error = ActionSet.on_error
            ws.on_close = ActionSet.on_close
    
            wst = threading.Thread(target = ws.run_forever)
            wst.setDaemon(True)
            wst.start()
    
        def __on_btn_pull_all_messages_clicked(self):
            data = {'token': self.__le_username.text()}
            r = requests.post(url = 'http://127.0.0.1:5008/get_user_messages', json = data)
            r = json.loads(r.text)
            if r['code'] == 400:
                raise Exception('鉴权失败')
            print('消息拉取成功')
            message_datas = sorted(r['data'][:],key = lambda x:-x['id'])
            self.__lw_all_messages.clear()
            for  d in message_datas:
                self.__lw_all_messages.addItem('【{}】{}'.format({True:'已读',False:'未读'}[d['read']],d['message'] ) )
    
        def __show_new_message(self, text):
            time_ = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            text = '{}   {}'.format(time_, text)
            self.__lw_online_message.insertItem(0, text)
    
    
    if __name__ == '__main__':
    
        app = QApplication(sys.argv)
        w = MainWindow()
        w.show()
        app.exec_()
    
    

    相关文章

      网友评论

        本文标题:在线消息推送和离线消息推送(3)

        本文链接:https://www.haomeiwen.com/subject/gzdcqhtx.html