# encoding=utf-8
import asyncio
import time
import numpy, random
import requests
import sys
from datetime import datetime
from pymongo import MongoClient
from pyppeteer import launch
from pyppeteer.network_manager import Response
from apscheduler.schedulers.blocking import BlockingScheduler
mongo_url = ''
mongo_client = MongoClient(mongo_url)
database = mongo_url.split('/')[-1]
mongodb = getattr(mongo_client, database)
COOKIE = []
def random_linspace(num, length):
# 数列的起始值 、 结束值。 这里以平均值的 0.5 作为起始值,平均值的 1.5倍作为结束值。
start, end = 0.5 * (length / num), 1.5 * (length / num)
# 借助三方库生成一个标准的等差数列,主要是得出标准等差 space
origin_list = numpy.linspace(start, end, num)
space = origin_list[2] - origin_list[1]
# 在标准等差的基础上,设置上下浮动的大小,(上下浮动10%)
min_random, max_random = -(space / 10), space / 10
result = []
# 等差数列的初始值不变,就是我们设置的start
value = start
# 将等差数列添加到 list
result.append(value)
# 初始值已经添加,循环的次数 减一
for i in range(num - 1):
# 浮动的等差值 space
random_space = space + random.uniform(min_random, max_random)
value += random_space
result.append(value)
return result
def slide_list(total_length):
'''等差数列生成器,根据传入的长度,生成一个随机的,先递增后递减,不严格的等差数列'''
# 具体分成几段是随机的
total_num = random.randint(10, 15)
# 中间的拐点是随机的
mid = total_num - random.randint(3, 5)
# 第一段、第二段的分段数
first_num, second_num = mid, total_num - mid
# 第一段、第二段的长度,根据总长度,按比例分成
first_length, second_length = total_length * (first_num / total_num), total_length * (second_num / total_num)
# 调用上面的辅助函数,生成两个随机等差数列
first_result = random_linspace(first_num, first_length)
second_result = random_linspace(second_num, second_length)
# 第二段等差数列进行逆序排序
slide_result = first_result + second_result[::-1]
# 由于随机性,判断一下总长度是否满足,不满足的再补上一段
if sum(slide_result) < total_length:
slide_result.append(total_length - sum(slide_result))
return slide_result
def open_url(url):
async def get_content(response: Response):
global COOKIE
if 'mtop.taobao.detail.getdetail' in response.url:
_cookie = response.headers
COOKIE.append(_cookie['bx-x5sec'].split(';')[0])
print(COOKIE)
async def mouse_slide(page=None):
await asyncio.sleep(1)
await page.evaluate('window.scrollBy(400,0)')
frame = page.frames[1]
try:
await frame.hover('#nc_1_n1z')
except:
return 1, page
try:
await page.mouse.down()
await page.evaluateOnNewDocument('() =>{ Object.defineProperties(navigator,'
'{ webdriver:{ get: () => undefined } }) }')
steps = random.randint(30, 80)
print("steps:{}".format(steps))
slides = slide_list(300)
x = 550
for distance in slides:
time.sleep(0.1)
x += distance
await page.mouse.move(x, 0, )
await page.mouse.up()
except Exception as e:
print('{}:验证失败'.format(e))
return None, page
else:
slider_again = await page.querySelector('.nc_iconfont.icon_warn')
if slider_again != None:
print("验证失败")
return None
else:
print('验证通过')
return page
async def get_data(url):
browser = await launch(headless=False, args=['--disable-infobars',
'--no-sandbox',
'--no-default-browser-check',
'--disable-extensions',
'--hide-scrollbars',
'--disable-bundled-ppapi-flash',
'--mute-audio',
'--disable-setuid-sandbox',
'--disable-gpu',
"--window-size=1920,1080",
"--disable-infobars",
# '--proxy-server=http://http-dyn.abuyun.com:9020'
])
page = await browser.newPage()
await page.setViewport({"width": 1366, "height": 720})
await page.setJavaScriptEnabled(enabled=True)
# await page.authenticate({
# 'username': '',
# 'password': ''
# })
await page.setUserAgent(
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299')
login_url = url
await page.goto(login_url)
await page.evaluateOnNewDocument('() =>{ Object.defineProperties(navigator,'
'{ webdriver:{ get: () => undefined } }) }')
time.sleep(2)
frame = page
await frame.waitForSelector('#fm-login-id')
await frame.type('#fm-login-id', '', {'delay': 120})
time.sleep(random.random()+0.5)
await frame.type('#fm-login-password', '!', {'delay': 120})
await frame.click("[type=submit]")
time.sleep(3)
await page.goto("https://h5.m.taobao.com/awp/core/detail.htm?id=557776930020")
time.sleep(2)
try:
# # 验证滑块
await asyncio.sleep(1)
await page.evaluate('window.scrollBy(400,0)')
frame = page.frames[1]
await frame.hover('#nc_1_n1z')
slider = True
except:
slider = False
if slider:
page = await mouse_slide(page=page)
else:
pass
await page.setRequestInterception(True)
time.sleep(6)
page.on('response', get_content)
cookies = await page.cookies()
await page.close()
await browser.close()
time.sleep(3)
return cookies
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(get_data(url))
def get_abuyun_tunnel():
# use abuyun proxy
proxyUser = ""
proxyPass = ""
proxyHost = ""
proxyPort = "9020"
proxyMeta = "http://%(user)s:%(pass)s@%(host)s:%(port)s" % {
"host": proxyHost,
"port": proxyPort,
"user": proxyUser,
"pass": proxyPass,
}
proxies = {
"http": proxyMeta,
"https": proxyMeta,
}
return proxies
def cookie_live(cookie):
headers = {
'authority': 'h5api.m.taobao.com',
'user-agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1',
'accept': '*/*',
'sec-fetch-site': 'same-site',
'sec-fetch-mode': 'no-cors',
'sec-fetch-dest': 'script',
'referer': 'https://h5.m.taobao.com/',
'accept-language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7',
'cookie': cookie,
}
proxies = get_abuyun_tunnel()
response = requests.get('https://h5api.m.taobao.com/h5/mtop.taobao.detail.getdetail/6.0/?jsv=2.6.1&appKey=12574478&t=1606209893980&sign=e3f978ec205660e16b2a123c1d1a4e4b&api=mtop.taobao.detail.getdetail&v=6.0&isSec=0&ecode=0&AntiFlood=true&AntiCreep=true&H5Request=true&ttid=2018%40taobao_h5_9.9.9&type=jsonp&dataType=jsonp&callback=mtopjsonp1&data=%7B%22id%22%3A%22557776930020%22%2C%22itemNumId%22%3A%22557776930020%22%2C%22itemId%22%3A%22557776930020%22%2C%22exParams%22%3A%22%7B%5C%22id%5C%22%3A%5C%22557776930020%5C%22%7D%22%2C%22detail_v%22%3A%228.0.0%22%2C%22utdid%22%3A%221%22%7D', headers=headers, proxies = proxies)
if '调用成功' in response.text:
return True
else:
return False
def get_cookie():
url = "https://login.m.taobao.com/login.htm?"
cookies = open_url(url)
cookie = []
for c in cookies:
cookie.append(c['name'] + '=' + c['value'])
if COOKIE:
cookie.append(COOKIE[0])
format_cookie = ';'.join(cookie)
return format_cookie
def update_cookie():
col = mongodb.get_collection('cookie_info')
cookie = col.find_one({'store': 'Taobao'})
if not cookie:
cookie = col.insert_one({'store': 'Taobao', 'update_time': datetime.now(), 'cookie': get_cookie()})
if not cookie_live(cookie['cookie']):
col.update_one({'store': 'Taobao'}, {'$set': {'update_time': datetime.now(), 'cookie': get_cookie()}},
{'multi': 'true'})
print("cookie can't request to page, updated the cookie info")
else:
print('cookie can request to page, not need to update cookie')
if __name__ == '__main__':
if len(sys.argv) == 2 and sys.argv[1] == '--update':
update_cookie()
else:
print('task_init..........')
sched = BlockingScheduler()
sched.add_job(update_cookie, trigger='cron', hour='12,13,14,22,23', minute='*/19')
sched.start()
网友评论