美文网首页
在线消息推送和离线消息推送(2.1)

在线消息推送和离线消息推送(2.1)

作者: 独步江雪 | 来源:发表于2020-02-21 13:15 被阅读0次

    先稍微拆分下服务端的逻辑,原先处理发布者和接收者都写在一个连接里,有点杂乱了

    推送服务器

    #         coding : utf-8
    #         author : ['Wang Suyin', ]
    #           data : 2020/2/20 16:48
    #       software : PyCharm
    # python_version : '3.5.3 64bit'
    #           file : 推送服务器.py
    """
    说明文档:
    
    """
    
    
    import json
    from flask import Flask
    from flask_sockets import Sockets
    
    
    app = Flask(__name__)
    sockets = Sockets(app)
    
    ws_pool = []  # 推送目标池
    
    
    
    
    def auth_admin_token(token):
        return True
    def auth_listener_token(token):
        return True
    
    @sockets.route('/admin')
    def admin_socket(ws):
        print('admin接入')
        r_data = ws.receive()
        r_data = json.loads(r_data)
        token = r_data['data']['token']
        if not r_data['type']=='init' or not  auth_admin_token(token):
            ws.send(json.dumps({'type':'init_r','code': 400, 'msg': '鉴权失败'}))
            ws.close()
            return
        ws.send(json.dumps({'type':'init_r','code': 200, 'msg': '鉴权成功'}))
    
        while not ws.closed:
            r_data = ws.receive()
            if not r_data:
                break
            ws.send(json.dumps({'type':'message_r','code':200, 'msg':'发布成功'}))
    
            data = json.loads(r_data)
            if data['type'] == 'message':
                print('将消息推送给{}'.format(ws_pool))
                print(ws_pool)
                #推送给在线用户
                for e in ws_pool:
                    try:
                        e.send(json.dumps({'type':'message','data':{'message':data['data']['message']}}))
                    except:
                        ws_pool.remove(e)
        try:
            ws.close()
        except:
            pass
    
    @sockets.route('/listener')
    def listener_socket(ws):
        print('listener接入')
        r_data = ws.receive()
        r_data = json.loads(r_data)
        token = r_data['data']['token']
        if not r_data['type']=='init' or not  auth_admin_token(token):
            ws.send(json.dumps({'type':'init_r','code': 400, 'msg': '鉴权失败'}))
            ws.close()
            return
        ws.send(json.dumps({'type':'init_r','code': 200, 'msg': '鉴权成功'}))
    
        ws_pool.append(ws)
        while not ws.closed:
            r_data = ws.receive()
            if not r_data:
                break
            #这里阻塞住就可以了,因为消息监听器只接收消息
        try:
            ws.close()
        except:
            pass
        finally:
            ws_pool.remove(ws)
    
    
    if __name__ == '__main__':
        from gevent import pywsgi
        from geventwebsocket.handler import WebSocketHandler
        from gevent import monkey
        monkey.patch_all()
    
        server = pywsgi.WSGIServer(('0.0.0.0', 5003), app, handler_class = WebSocketHandler)
        print('web server start ... ')
        server.serve_forever()
    
    

    消息发布器

    import json
    
    import websocket
    
    
    # 这里就不写界面了,要推送的消息一并写在on_message里
    
    def on_open(ws):
        token = 'admin'
        data = json.dumps({'type': 'init', 'data': {'token': token}})
        ws.send(data)
    
    
    def on_message(ws, r_data):
        print('接收到消息:{}'.format(r_data))
        data = json.loads(r_data)
        if data['type'] == 'init_r':
            if data['code'] != 200:
                ws.close()
                print('鉴权失败')
                return
            ws.send(json.dumps({'type': 'message', 'data': {'message': '测试消息,这是一条公告'}}))
    
    
        elif data['type'] == 'message_r':
            if data['code'] == 200:
                print('发布成功')
            else:
                print('发布失败')
            ws.close()
    
    
    def on_error(ws):
        print('连接异常')
    
    
    def on_close(ws):
        print('连接关闭')
    
    
    if __name__ == '__main__':
    
        websocket.enableTrace(True)
        ws = websocket.WebSocketApp('ws://127.0.0.1:5003/admin')
        ws.on_open = on_open
        ws.on_message = on_message
        ws.on_error = on_error
        ws.on_close = on_close
        ws.run_forever()
    

    消息监听器

    import json
    import sys
    import datetime
    import threading
    
    from PyQt5.QtWidgets import *
    from PyQt5.QtGui import *
    from PyQt5.QtCore import *
    
    import websocket
    
    websocket.enableTrace(True)
    
    class ActionSet:
        @staticmethod
        def on_open( ws):
            token = '123456qwe'
            data = json.dumps({'type': 'init', 'data': {'token': token}})
            ws.send(data)
    
        @staticmethod
        def on_message(ws, r_data):
            print('接收到消息:{}'.format(r_data))
            data = json.loads(r_data)
            if data['type'] == 'init_r':
                if data['code'] != 200:
                    ws.close()
                    print('鉴权失败')
                    return
            print('鉴权成功')
            if data['type'] == 'message':
                print(data['data']['message'])
                MainWindow._instance.new_message_got.emit(data['data']['message'])
        @staticmethod
        def on_error(ws):
            print('连接异常')
    
        @staticmethod
        def on_close(ws):
            print('连接关闭')
    
    
    class MainWindow(QWidget):
        _instance=None
        new_message_got = pyqtSignal(str)
        def __init__(self, parent = None):
            super().__init__(parent)
            MainWindow._instance = self
            self.setWindowTitle('消息监听器')
            self.resize(800, 600)
    
            self.__btn = QPushButton('连接')
            self.__lw =QListWidget()
    
            main_layout = QVBoxLayout()
            main_layout.setSpacing(0)
            main_layout.setContentsMargins(0,0,0,0)
            self.setLayout(main_layout)
            main_layout.addWidget(self.__btn)
            main_layout.addWidget(self.__lw)
    
            self.__btn.clicked.connect(self.__on_btn_clicked)
            self.new_message_got.connect(self.__show_new_message)
    
        def __on_btn_clicked(self):
            url='ws://127.0.0.1:5003/listener'
            ws = websocket.WebSocketApp(url )
    
            ws.on_open = ActionSet.on_open
            ws.on_message = ActionSet.on_message
            ws.on_error = ActionSet.on_error
            ws.on_close = ActionSet.on_close
    
            wst = threading.Thread(target = ws.run_forever)
            wst.setDaemon(True)
            wst.start()
    
        def __show_new_message(self,text):
            time_ = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            text= '{}   {}'.format(time_, text)
            self.__lw.insertItem(0,text)
    
    
    if __name__ == '__main__':
    
        app = QApplication(sys.argv)
        w = MainWindow()
        w.show()
        app.exec_()
    

    相关文章

      网友评论

          本文标题:在线消息推送和离线消息推送(2.1)

          本文链接:https://www.haomeiwen.com/subject/vxdcqhtx.html