自动参加小电视抽奖
自动领取舰长、提督奖励
运行一天大概能获取1800包辣条
However. 每次凌晨B站后台都会检测账号脚本,一次抢太多很容易被封号一周。
可能是依据最大的连续领奖次数判断的吧
一、抽奖流程
1. 分析页面可知
每当有人赠送小电视之类的礼物时
系统就会发送广播弹幕(SYS_MSG)
2. 点击广播跳转到抽奖房间
F12清空网络请求
接着点击抽奖
就找到了抽奖请求
POST https://api.live.bilibili.com/gift/v3/smalltv/join
图片.png
图片.png
- 分析post参数
roomid系统广播里就有
raffleId是抽奖关键参数
type 固定为 Gift
csrf_token 来自于cookie值bili_jct (可为空)
visit_id (可为空) - 刷新抽奖页面
F12过滤得到XHR 请求
逐个分析返回值和url请求
发现raffleId请求
GET https://api.live.bilibili.com/gift/v3/smalltv/check
data={
'roomid':roomid
}
图片.png
分析结果
- 小电视抽奖和月色真美、摩天大楼
获取raffleId的请求略有不同
小电视 GET https://api.live.bilibili.com/gift/v3/smalltv/check
月色真美 GET https://api.live.bilibili.com/gift/v2/smalltv/check
参加抽奖的请求相同
二、舰长提督、总督领奖
- 每当有人开总督时,系统也会发送SYS_MSG到弹幕
- 然而舰长和提督却没有提示,只有点进房间时知道有没有
- F12分析XHR请求发现流程和小电视抽奖类似
领奖请求
https://api.live.bilibili.com/lottery/v2/Lottery/join
图片.png
图片.png
POST参数与小电视类似
提督舰长和总督的获取id的请求也不同
提督、舰长 POST https://api.live.bilibili.com/lottery/v1/Storm/check?roomid=404538
总督 POST https://api.live.bilibili.com/lottery/v1/Lottery/check_guard?roomid=404538
图片.png
图片.png
- 一般有人送礼的直播间以及排行靠前的主播
有人开通舰长和提督的概率较大
只要记录下他们的roomid到列表中
再定时检查是否有人开通
然后再领取奖励
三、代码实现
- api.py 添加Gift类
检测是否有小电视抽奖的函数
检测是否有舰长领奖的函数
参加小电视抽奖的函数
参加舰长领奖的函数
class Gift():
def check_smalltv(self,s,roomid,ver='v2'):
url='https://api.live.bilibili.com/gift/'+ver+'/smalltv/check'
data={'roomid': roomid}
return ajax(s,url,'GET',data)
def join_smalltv(self,s,roomid,raffleId,csrf_token,visit_id='', type = 'Gift'):
url= 'https://api.live.bilibili.com/gift/v3/smalltv/join'
data={
'roomid': roomid,
'raffleId': raffleId,
'type': type,
'csrf_token': csrf_token,
'visit_id': visit_id
}
return ajax(s,url,'POST',data)
def check_guard(self,s,roomid,_type='_guard'):
url='https://api.live.bilibili.com/lottery/v1/Lottery/check'+_type+'?roomid='+str(roomid)
return ajax(s,url,'GET');
def join_guard(self,s,roomid,id,csrf_token,type='guard',visit_id=None):
url= 'https://api.live.bilibili.com/lottery/v2/Lottery/join'
data= {
'roomid': roomid,
'id': id,
'type': type,
'csrf_token': csrf_token,
'visit_id': visit_id
}
return ajax(s,url,'POST',data)
- api.py 添加Room类
获取小时榜和周星榜前十的房间
class Room():
def room_rank(self,s,type="week_star_master",type_id='week'):
url='https://api.live.bilibili.com/rankdb/v1/Rank2018/getTop?type='+type+'&type_id='+type_id
return ajax(s,url)
- server.py添加抽奖类
Info包含csrf_token和visit_id
check_roomids为所有待检查舰长领奖的房间号
hour_run为每隔一个小时添加排行靠前的房间号到check_roomids
raffleId确保不会重复抽奖
num记录最大连续抽奖次数,这里每连抽50次就跳过一次
class Join:
def __init__(self,Info,s):
self.Info = Info
self.check_roomids=[Info['roomid']]
self.raffleId=0
self.hour_run=Timer(7.2e3,self.each_hour_run)
self.s=s
self.num=1
def join_smalltv(self,obj):
if self.num==50:
self.num=0
print('[跳过抽奖]['+obj['title']+']已跳过抽奖(roomid=' + str(obj['roomid'])+ ',raffleId=' + str(obj['raffleId'])+ ')')
return
self.num+=1
res=API.Gift.join_smalltv(self.s,obj['roomid'], obj['raffleId'],self.Info['csrf_token'],self.Info['visit_id'])
code=res['code']
if code==0:Logger.info('[自动抽奖]['+obj['title']+']已参加抽奖(roomid=' + str(obj['roomid'])+ ',raffleId=' + str(obj['raffleId'])+ ')')
elif code==400:Logger.warning('[自动抽奖][礼物抽奖]访问被拒绝,您的帐号可能已经被封禁,已停止')
elif code==65531:Logger.info('[自动抽奖][礼物抽奖]已参加抽奖(roomid=' + str(obj['roomid'])+ ',raffleId=' + str(obj['raffleId'])+ ')')
else:Logger.error('[自动抽奖][礼物抽奖]已参加抽奖(roomid=' + str(obj['roomid'])+ ',raffleId=' + str(obj['raffleId'])+ ')'+res['msg'])
def push_and_check_roomid(self,roomid):
self.push_roomid(roomid)
for roomid in self.check_roomids:self.check_and_join_guard(roomid)
def push_roomid(self,roomid):
if roomid in self.check_roomids:return
self.check_roomids.append(roomid)
def each_hour_run(self):
res=API.Room.room_rank(self.s,"master_realtime_hour",'areaid_realtime_hour')
for room in res['data']['list']:self.push_roomid(room['roomid'])
res=API.Room.room_rank(self.s,"week_star_master",'week')
for room in res['data']['list']:self.push_roomid(room['roomid'])
def check_and_join_smalltv(self,roomid):
res=API.Gift.check_smalltv(self.s,roomid,'v2')
if res['code']==-400:return
for data in res['data']:
if data['raffleId']<=self.raffleId:continue
self.raffleId=data['raffleId']
self.join_smalltv({'roomid':roomid,'raffleId':self.raffleId,'title':data['title']})
if len(res['data'])==0:
res=API.Gift.check_smalltv(self.s,roomid,'v3')
for data in res['data']['list']:
if data['raffleId']<=self.raffleId:continue
self.raffleId=data['raffleId']
self.join_smalltv({'roomid':roomid,'raffleId':self.raffleId,'title':data['title']})
self.push_and_check_roomid(roomid)
def join_guard(self,obj):
res=API.Gift.join_guard(self.s,obj['roomid'],obj['id'],self.Info['csrf_token'])
print(obj['roomid'])
Logger.info(res['data']['message'])
def check_and_join_guard(self,roomid):
res=API.Gift.check_guard(self.s,roomid)
if len(res['data'])==0:return
for guard in res['data']:self.join_guard({'id':guard['id'],'roomid':roomid})
res=API.Gift.check_guard(self.s,roomid,'')
if len(res['data']['guard'])==0:return
for guard in res['data']['guard']:self.join_guard({'id':guard['id'],'roomid':roomid})
- utils.py添加日志类
添加日志到指定文件中
分为五个级别
NOTSET
DEBUG
INFO
WARNING
ERROR
CRITICAL
@singleton
class Logger:
def __init__(self):
handlers = {
logging.NOTSET: "logs/notset.logs",
logging.DEBUG: "logs/debug.logs",
logging.INFO: "logs/info.logs",
logging.WARNING: "logs/warning.logs",
logging.ERROR: "logs/error.logs",
logging.CRITICAL: "logs/critical.logs"
}
self.__loggers = {}
logLevels = handlers.keys()
fmt = logging.Formatter('%(asctime)s [%(levelname)s]: %(message)s')
for level in logLevels: #创建logger
logger = logging.getLogger(str(level))
logger.setLevel(level)
#创建hander用于写日日志文件
log_path = os.path.abspath(handlers[level])
fh = logging.FileHandler(log_path) #定义日志的输出格式
fh.setFormatter(fmt)
fh.setLevel(level) #给logger添加hander
logger.addHandler(fh)
self.__loggers.update({level: logger})
def info(self, message):
self.__loggers[logging.INFO].info(message)
def error(self, message):
self.__loggers[logging.ERROR].error(message)
def warning(self, message):
self.__loggers[logging.WARNING].warning(message)
def debug(self, message):
self.__loggers[logging.DEBUG].debug(message)
def critical(self, message):
self.__loggers[logging.CRITICAL].critical(message)
- utils.py添加 保存配置文件函数
感觉json文件保存一个json值时很方便
但是放多个时需要一个个的赋值很麻烦
但是多个json放入.py文件中再引入就方便多了
然而修改.py文件需要重新写一个函数实现
def save_py(moudle):
keys,py=dir(moudle),''
for key in keys:
if '__' in key:continue
py+=key+'='+json.dumps(getattr(moudle,key),indent=4, separators=(',', ':'))+'\n'
with open(moudle.__name__+".py",'w') as f:f.write(py)
- 添加配置文件 config.py
headers 默认请求头
msg_info 包含弹幕类型的黑名单和白名单
post_info 抽奖领奖时需要用到的默认参数
ws_info 连接弹幕时的默认参数
headers={
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:62.0) Gecko/20100101 Firefox/62.0",
"Accept":"application/json, text/plain, */*",
"Accept-Language":"zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
"Accept-Encoding":"gzip, deflate, br",
"Referer":"https://live.bilibili.com/",
"Origin":"https://live.bilibili.com",
"Connection":"keep-alive"
}
msg_info={
"reject_msg":[
"SEND_GIFT",
"WELCOME",
"WELCOME_GUARD",
"COMBO_SEND",
"ENTRY_EFFECT",
"NOTICE_MSG",
"DANMU_MSG",
"ROOM_RANK",
"WISH_BOTTLE",
"COMBO_END"
],
"receive_msg":[
"SYS_MSG",
"SPECIAL_GIFT"
]
}
post_info={
"csrf_token":"",
"visit_id":"",
"roomid":392351,
"uid":380741418
}
ws_info={
"uid":380741418,
"roomid":392351,
"protover":1,
"platform":"web",
"clientver":"1.4.0"
}
四、主程序
先登录然后再开始自动抽奖
oncmd 绑定的处理弹幕消息的函数
onlogin 绑定的登录成功的函数
onreconnect 绑定的断线重连的函数
onheartbeat 绑定的处理人气值的函数
import os
os.chdir('/data/jupyter/root/软件/Bilibili')
from jquery import http,session
from servers import Join,Login
from DanmuWS import DanmuWebSocket
from config import *
from utils import save_py
import config
%matplotlib inline
#save_py(config)
s=session(headers,'config/2685054765.txt')
login=Login(s)
join=Join(post_info,s)
ws=None
while not login.isLogin():
login.get_vdcode()
login.loop_vdcode()
ws_info['uid']=post_info['uid']=login.info['uid']
def oncmd(data):
cmd=data["cmd"]
if cmd in msg_info['reject_msg']:return
if cmd=="SYS_MSG":
if "real_roomid" in data:join.check_and_join_smalltv(data["real_roomid"])
def onlogin(data):
print("login success")
def onreconnect(code,data=None):
global ws
ws=data['dws']
room_num=0
def onheartbeat(num):
if num>3:return
close()
if len(join.check_roomids)==0:
global room_num
join.each_hour_run()
room_num+=1
if room_num>3:return join.hour_run.cancel()
ws_info['roomid']=post_info['roomid']=join.check_roomids.pop()
main()
def main():
global ws
try:
print(ws_info['roomid'])
ws = DanmuWebSocket(ws_info,'wss://broadcastlv.chat.bilibili.com/sub')
ws.connect()
ws.bind(onreconnect,onlogin,onheartbeat,oncmd)
#ws.run_forever()
except:
close()
def close():
ws.close()
save_py(config)
s.save()
main()
看完了
留个评论再走呗
网友评论