美文网首页
三、wss连接B站弹幕

三、wss连接B站弹幕

作者: ouczbs | 来源:发表于2018-09-29 21:29 被阅读0次

    环境 ws4py+

    pip install ws4py
    from ws4py.client.threadedclient import WebSocketClient
    

    一、websocket协议

    1. 先与wss://broadcastlv.chat.bilibili.com/sub建立连接
    2. 发送登录包
      {
      "uid": 0表示未登录,否则为用户ID,
      "roomid": 房间ID,
      "protover": 1,
      "platform": "web",
      "clientver": "1.4.0"
      }
    3. 每隔一段时间发送心跳包(30秒)
      Python只有延迟功能threading.Timer(delay,fun)
      需要自定义一个不断循环的定时器
    4. 接收响应
      响应由头部和数据组成


      图片.png
      图片.png
    5. 解析响应得到数据
      这里有个细节 b站可能一次返回了好几帧数据
      先解析操作码再由操作码解析数据段
    6. 把数据交给主程序去处理
      需要自定义一个事件类
      用来实现事件的注册on和分发功能emit
      二、工具层 utils.py
    7. 定时器类
      可取消
    8. 事件类
      注册事件 (可重复)
      分发事件
      取消事件
    class Timer():
        def __init__(self,delay,fun):
            self.delay,self.f=delay,fun
            self.t=threading.Timer(self.delay,self.fun)
            self.t.start()
        def fun(self):
            if self.f:self.f()
            self.t=threading.Timer(self.delay,self.fun)
            self.t.start()
        def cancel(self):
            self.t.cancel()
            print("threading cancel")
    
    class Event():
        def __init__(self):
            self.map=[]
            self.keys=[]
        def index(self,k):
            i=-1
            for key in self.keys:
                i+=1
                if key==k:return i
            return -1
        def on(self,key,fun):
            i=self.index(key)
            if i==-1:
                self.map.append({"key":key,"funs":[fun]})
                self.keys.append(key)
            else:
                self.map[i]["funs"].append(fun)
        def emit(self,key,data=None):
            i=self.index(key)
            if i==-1:
                print("no regist event:"+str(key))
                return
            for f in self.map[i]["funs"]:f(data)
        def rm(self,key,fun):
            i=self.index(key)
            if i==-1:
                print("no regist event:"+str(key))
                return
            funs=self.map[i]["funs"]
            for j in range(len(funs)):
                if funs[j]==fun:funs[j]=None
            self.map[i]["funs"]=list(filter(None,funs))
    

    三、服务层 DanmuWS.py
    opened(self) 连接建立后父类会自动调用
    closed(self,code,reason) 连接关闭后父类会自动调用
    需要进行断线重连
    received_message(self,message) 接收到数据时会自动调用
    在这里解析数据并分发给主程序处理
    send(self,data)父类发送数据的方法
    sendLoginPacket 发送登录包
    sendHeartBeatPacket 发送心跳包
    bind 绑定主程序处理数据和事件的函数

    import threading
    import json
    import struct
    from ws4py.client.threadedclient import WebSocketClient
    from utils import Event,Timer
    event=Event()
    class DanmuWebSocket(WebSocketClient):
        def __init__(self,info,serveraddress='wss://broadcastlv.chat.bilibili.com/sub'):
            self.serveraddress=serveraddress
            WebSocketClient.__init__(self,serveraddress)
            DanmuWebSocket.event=event
            DanmuWebSocket.headerLength=16
            self.Info=info
        def opened(self):
            self.sendLoginPacket(self.Info['uid'],self.Info['roomid'],self.Info['protover'],self.Info['platform'],self.Info['clientver'])
            self.sendHeartBeatPacket();
            self.heartBeatHandler = Timer(20,self.sendHeartBeatPacket)
            print("opened")
        def delay_close(self):
            dws=DanmuWebSocket(self.Info,self.serveraddress)
            event.emit('reconnect',dws);
        def closed(self, code, reason=None):
            print("Closed", code, reason)
            if hasattr(self,"heartBeatHandler"):self.heartBeatHandler.cancel();
            if code == 1000: return
            threading.Timer(5,self.delay_close).start()
            print("Closed", code, reason)
        def received_message(self, message):
            position,length=0,len(message.data)-1
            while position<length:
                header_pack=struct.unpack(">IHHII",message.data[position:position+16])
                length_pack=header_pack[0]
                operation=header_pack[3]
                if operation==3:
                    num=header_pack[1]+position
                    num=struct.unpack(">I",message.data[num:num+4])[0]
                    event.emit('heartbeat',num)
                elif operation==5:
                    data=json.loads(message.data[position+16:position+length_pack])
                    event.emit('cmd',data)
                    #print("recv:"+data["cmd"])
                else:
                    event.emit('login');
                position+=length_pack
        def sendData(self,data, protover = 1, operation = 2, sequence = 1):
            if type(data)==dict:
                data=json.dumps(data).encode()
            elif type(data)==str:
                data=data.encode()
            header=struct.pack(">IHHII",DanmuWebSocket.headerLength+len(data),DanmuWebSocket.headerLength,protover,operation,sequence)
            self.send(header+data)
        def sendLoginPacket(self,uid, roomid, protover = 1, platform = 'web', clientver = '1.4.6'):
            # Uint(4byte) + 00 10 + 00 01 + 00 00 00 07 + 00 00 00 01 + Data 登录数据包
            data = {
                'uid': int(uid),
                'roomid': int(roomid),
                'protover': protover,
                'platform': platform,
                'clientver': clientver
            }
            print("sendLoginPacket")
            data=json.dumps(data)
            data=data.replace(' ','')
            self.sendData(data.encode(),1,7,1)
        def sendHeartBeatPacket(self):
            # Uint(4byte) + 00 10 + 00 01 + 00 00 00 02 + 00 00 00 01 + Data 心跳数据包
            self.sendData(b'[object Object]', 1, 2, 1);
        def bind(self,onreconnect=None,onlogin=None,onheartbeat=None,oncmd=None,onreceive =None):
            if "cmd" in event.keys:return
            if hasattr(onreconnect,"__call__"):event.on("reconnect",onreconnect)
            if hasattr(onlogin,"__call__"):event.on("login",onlogin)
            if hasattr(onheartbeat,"__call__"):event.on("heartbeat",onheartbeat)
            if hasattr(oncmd,"__call__"):event.on("cmd",oncmd)
            if hasattr(onreceive,"__call__"):event.on("receive",onreceive)
    

    四、测试代码
    利用前面写的二维码登录代码
    登录上Bilibili获取个人uid
    oncmd 处理弹幕数据
    onlogin 连接成功时的处理函数
    onreconnect 断线重连时调用更新ws
    onheartbeat 处理服务器发送的直播间人气值

    from server import Login
    headers={
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:62.0) Gecko/20100101 Firefox/62.0',
        'Accept': 'application/json, text/plain, */*',
        'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
        'Accept-Encoding': 'gzip, deflate, br',
        'Referer': 'https://live.bilibili.com/',
        'Origin': 'https://live.bilibili.com',
        'Connection': 'keep-alive'
        }
    s=session(headers,'cookie.txt')
    login=Login(s)
    while not login.isLogin():
        login.get_vdcode()
        login.loop_vdcode()
    info={
      "uid": login.info['uid'],
      "roomid": 7603080,
      "protover": 1,
      "platform": "web",
      "clientver": "1.4.0"
    }
    
    from DanmuWS import DanmuWebSocket
    def oncmd(data):
        cmd=data["cmd"]
        if cmd=="SYS_MSG":
            print(data)
        elif cmd=="SPECIAL_GIFT":
            print(data)
        else:
            print(data)
    def onlogin(data):
        print("login success")
    def onreconnect(dws):
        global ws
        ws=dws
    def onheartbeat(num):
        print(num)
    try:
        ws = DanmuWebSocket(info,'wss://broadcastlv.chat.bilibili.com/sub')
        ws.connect()
        ws.bind(None,onlogin,onheartbeat,oncmd)
        ws.run_forever()
    except:
        ws.close()
    
    图片.png

    相关文章

      网友评论

          本文标题:三、wss连接B站弹幕

          本文链接:https://www.haomeiwen.com/subject/jogboftx.html