1,查票结果效果
余票查询效果图.png2,本文使用到的库
- re
- os
- time
- datetime
- requests
- prettytable
3,获取验证码图片
使用Google浏览器并开启开发者模式(使用F12快捷键),在地址栏里输入如下链接:
https://kyfw.12306.cn/otn/login/init
在Network—All里查看相关请求,下拉发现了验证码图片的请求,查看Headers—Request URL如下:
https://kyfw.12306.cn/passport/captcha/captcha-image?login_site=E&module=login&rand=sjrand&0.05013721011282968
链接里最后一个参数0.05013721011282968
每次请求时都不一样。但我发现没有此参数同样能够请求到验证码图片,故可以直接使用如下链接请求验证码图片:
https://kyfw.12306.cn/passport/captcha/captcha-image?login_site=E&module=login&rand=sjrand
定义一个名称为Login的类并进行定义其
init
方法:
class Login:
def __init__(self):
self.headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36"
}
self.session = requests.session() #创建session会话
self.session.headers = self.headers
self.session.verify = False #跳过SSL验证
self.loginUrl = 'https://kyfw.12306.cn/passport/web/login' # 登录Url
self.captchaCheckUrl = 'https://kyfw.12306.cn/passport/captcha/captcha-check' #验证码验证链接
self.captchaImgUrl = 'https://kyfw.12306.cn/passport/captcha/captcha-image?login_site=E&module=login&rand=sjrand' #获取验证码图片
self.captchaFilePath = 'captcha.jpg' #验证码图片路径
定义获取验证码图片的方法,如获取失败则使用递归再次请求:
def getCaptCha(self):
response = self.session.get(self.captchaImgUrl)
if response.status_code == 200:
print('验证码图片请求成功')
with open(self.captchaFilePath, 'wb') as f:
f.write(response.content) # 验证码图片写入当前文件夹
else:
print('验证码下载失败, 正在重试...')
self.getCaptCha() # 递归
return
在登录页面输入用户名和密码后点击验证码图片,点击登录按钮。在开发者模式下查看验证码验证请求的链接为:https://kyfw.12306.cn/passport/captcha/captcha-check
验证码校验请求参数 其中login_site和rand参数值是固定的,而answer是我们点击图片时鼠标在图片上的坐标,坐标基准(0,0)点如下图红色箭头处:
坐标原点
将验证码图片按照如下的方式进行分割,按照位置标号为1-8,从每个图里定义一个固定的点坐标用于验证码验证:
分割图片
3.1 手动输入验证码位置
def captchaCheck(self):
self.getCaptcha() # 获取验证码图片
imgLocation = input("请输入验证码图片位置,以英文状态下的分号','分割:\n")
# 验证码坐标字典
coordinates = {'1':'35,35', '2':'105,35', '3':'175,35', '4':'245,35',
'5':'35,105', '6':'105,105', '7':'175,105','8':'245,105'}
rightImgCoordinates =[] # 创建数组存放正确的验证码坐标
for i in imgLocation.split(','):
rightImgCoordinates.append(coordinates[i])
answer = ','.join(rightImgCoordinates)
data = {
'login_site':'E', # 固定的
'rand': 'sjrand', # 固定的
'answer': answer # 验证码对应的坐标
}
result = self.session.post(self.captchaCheckUrl,data=data).json()
if result['result_code'] == '4':
print('验证码验证成功')
else:
print('出错啦:{}'.format(result['result_message']))
self.captchaCheck()
return
3.2 通过打码平台自动验证
所谓的打码平台自动验证是指用户给打码平台传入一张验证码图片,平台通过码工去人工识别验证码(码工有出错可能),平台再将其结果返回给用户,这个过程一般也就2-3秒时间。12306验证码是多个坐标拼接成的字符串,因此我们需要平台返回多个坐标字符串。
百度搜索打码平台关键字能够找到很多相关平台,其中包含打码兔、超级鹰等。写本文的时发现打码兔平台已经转型,不再提供打码服务,于是我只能去注册超级鹰账户。平台网站上有如何使用Python进行打码的相关文档,使用时需要注意验证码图片的类型,返回多个坐标对应的codetype
为9004,具体请参考验证码类型。
如下代码是超级鹰官网提供的,我做了一些改动,原因是平台返回的坐标是以图片的左上角为原点,这与12306坐标基准不一致。
import requests
from hashlib import md5
class Chaojiying_Client:
def __init__(self, username, password, soft_id):
#平台账号
self.username = username
#平台密码
self.password = md5(password.encode('utf-8')).hexdigest()
# 软件ID号,注册创建软件时会自动生成
self.soft_id = soft_id
self.base_params = {
'user' : self.username,
'pass2' : self.password,
'softid': self.soft_id,
}
self.headers = {
'Connection': 'Keep-Alive',
'User-Agent': 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0)',
}
def PostPic(self, im, codetype):
params = {
'codetype': codetype,
}
params.update(self.base_params)
files = {'userfile': ('ccc.jpg', im)}
result = requests.post('http://upload.chaojiying.net/Upload/Processing.php', data=params, files=files, headers=self.headers).json()
# 如下代码是我修改的部分,原来代码请去官网查看
answerList = result['pic_str'].replace('|',',').split(',')
for index in range(len(answerList)):
#answerList类似为 [10,135,70,200],将纵坐标减去30并返回字符串
if index % 2 != 0:
answerList[index] = str(int(answerList[index])-30)
else:
answerList[index] = str(answerList[index])
answerStr = ','.join(answerList) #用逗号拼接成'10,105,70,170'
print(answerStr)
return answerStr # 返回验证码坐标
将获取的验证码图片提交给打码平台获取验证码坐标字符串:
def getCaptchaAnswer(self):
response= self.session.get(self.captchaImgUrl)
if response.status_code ==200:
print('验证码图片请求成功')
with open(self.captchaFilePath,'wb') as f:
f.write(response.content)
else:
print('验证码下载失败, 正在重试...')
self.getCaptchaAnswer() #递归
try:
img = open(self.captchaFilePath, 'rb').read()
answerStr = self.chaoJiYing.PostPic(img, 9004)
return answerStr
except Exception as e:
print(e)
然后根据获取的坐标字符串再进行自动验证:
def captchaCheck(self):
self.getCaptcha()
answer = self.getCaptchaAnswer()
data = {
'login_site':'E', # 固定的
'rand': 'sjrand', # 固定的
'answer': answer # 验证码对应的坐标字符串
}
result = self.session.post(self.captchaCheckUrl,data=data).json()
if result['result_code'] == '4':
print('验证码验证成功')
else:
print('出错啦:{}'.format(result['result_message']))
self.captchaCheck()
return
4,登录验证
实际我们使用浏览器进行登录时会先检查用户名和密码是否为空,不为空时才进行验证码验证(为空时直接报错提示),验证码验证通过后再对用户名和密码进行验证。
登录同样采用的是Post请求,提交用户名、密码以及固定的appid 3个数据:
def login(self):
self.captchaCheck() # 先进行验证码验证
loginData = {
'username': Config.userName, # 用户名,我自己写了个Config文件,将用户名、密码以及其他常量放里面
'password': Config.password, # 密码
'appid': 'otn' # 固定
}
result = self.session.post(self.loginUrl,data=loginData).json()
if result['result_code'] == 0:
print('用户登录成功') # 实际并没有成功,登录过程很复杂,还有很多请求
else:
print('登录失败,请重试')
5,工具类
在写查询余票代码时发现有很多共用的方法在代码里,于是我自己将其抽出来放在一个类文件里。增加复用率,避免大量垃圾代码。
from datetime import datetime
import time
# 工具类
class Utilitys():
# 反转字典(key与value对调,生成新dict)
def reversalDict(self, dict):
return {v: k for k, v in dict.items()}
# 将时间字符串(如38:50)转化为小时和分钟的形式
def getDuration(self, timeStr):
duration = timeStr.replace(':', '时') + '分'
# 如果时间格式是00时25分,则只显示多少分钟
if duration.startswith('00'):
return duration[4:]
return duration
# 获取一个格式为2018-08-01的日期是周几
def getWeekDay(self, date):
weekDayDict = {
0: '周一',
1: '周二',
2: '周三',
3: '周四',
4: '周五',
5: '周六',
6: '周天',
}
day = datetime.strptime(date, '%Y-%m-%d').weekday()
return weekDayDict[day]
# 转化日期格式,返回 X月X日
def getDateFormat(self, date):
# date 格式为2018-08-08
dateList = date.split('-')
if dateList[1].startswith('0'):
month = dateList[1].replace('0', '')
else:
month = dateList[1]
if dateList[2].startswith('0'):
day = dateList[1].replace('0', '')
else:
day = dateList[2]
return '{}月{}日'.format(month, day)
# 检查购票日期是否合理
def checkDate(self, date):
'''
功能:检测乘车日期的正确性
:param trainDate: 乘车时间(2018-08-04)
:return: 返回时间是否为标准的形式的标志
'''
localTime = time.localtime()
localDate = '%04d-%02d-%02d' % (localTime.tm_year, localTime.tm_mon, localTime.tm_mday)
# 获得当前时间时间戳
currentTimeStamp = int(time.time())
# 预售时长的时间戳
deltaTimeStamp = '2505600'
# 截至日期时间戳
deadTimeStamp = currentTimeStamp + int(deltaTimeStamp)
# 获取预售票的截止日期时间
deadTime = time.localtime(deadTimeStamp)
deadDate = '%04d-%02d-%02d' % (deadTime.tm_year, deadTime.tm_mon, deadTime.tm_mday)
print('请注意合理的乘车日期范围是:{} 至 {}'.format(localDate, deadDate))
# 判断输入的乘车时间是否在合理乘车时间范围内
# 将购票日期转换为时间数组
trainTimeStruct = time.strptime(date, "%Y-%m-%d")
# 转换为时间戳:
trainTimeStamp = int(time.mktime(trainTimeStruct))
# 将购票时间修改为12306可接受格式 ,如用户输入2018-8-7则格式改为2018-08-07
trainTime = time.localtime(trainTimeStamp)
trainDate = '%04d-%02d-%02d' % (trainTime.tm_year, trainTime.tm_mon, trainTime.tm_mday)
# 比较购票日期时间戳与当前时间戳和预售截止日期时间戳
if currentTimeStamp <= trainTimeStamp and trainTimeStamp <= deadTimeStamp:
return True, trainDate
else:
print('出错啦: 您输入的乘车日期:{}, 当前系统日期:{}, 预售截止日期:{}'.format(trainDate, localDate, deadDate))
return False, None
# 一个日期转成2018-08-01这样的格式
def getDate(self,dateStr):
# date格式为20180801
year = time.strptime(dateStr,'%Y%m%d').tm_year
month = time.strptime(dateStr,'%Y%m%d').tm_mon
day = time.strptime(dateStr,'%Y%m%d').tm_mday
return '%04d-%02d-%02d' % (year,month,day)
6,余票查询与打印
首先明确一点,即用户在不登录情况下也是可以查车票信息的。打开浏览器进入12306余票查询页面查询链接,然后打开开发者模式,在页面上输入出发地为上海,目的地为成都,出发日期为2018-08-22,车票类型选择成人,点击查询按钮。我们发现只有如下的1个Get请求:
https://kyfw.12306.cn/otn/leftTicket/query?leftTicketDTO.train_date=2018-08-22&leftTicketDTO.from_station=SHH&leftTicketDTO.to_station=CDW&purpose_codes=ADULT
此请求包含leftTicketDTO.train_date,leftTicketDTO.from_station,leftTicketDTO.to_station及purpose_codes几个参数。从参数的英文含义上不难判断它们分别代表出发日期、出发地、目的地和车票类型。
但出发地怎么是SHH,目的地又怎么是CDW?这些都是什么鬼?我百度了一下,这些字符是指车站的电报码。可这些数据从何而来呢?
在开发者模式打开的情况下刷新查询页面,发现多了很多请求。仔细查看每个请求都在做些什么操作?服务器又返回了什么?Oh my god,竟然在刚打开查询页面的时候就请求到了。
我们把数据请求下来并加以保存,保存的原因是这些数据很难变动,请求一次,下次直接使用。
class Stations:
def __init__(self):
self.headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36"
}
self.stationCodeUrl = 'https://kyfw.12306.cn/otn/resources/js/framework/station_name.js'
self.queryUrl ='https://kyfw.12306.cn/otn/leftTicket/query'
self.queryPriceUrl ='https://kyfw.12306.cn/otn/leftTicket/queryTicketPrice'
self.session = requests.session()
self.session.verify = False
self.session.headers = self.headers
self.filePath = 'stations.txt' # 获取到的电报码存放文件
self.color = Colored()
self.utility = Utilitys()
从服务器上请求电报码,根据正则表达式找出符合条件的所有数据并加以保存:
def getStationCodes(self):
# 若文件存在,则直接推出,无需每次都执行请求
if os.path.exists(self.filePath):
return
res = self.session.get(self.stationNameUrl)
# [\u4e00-\u95fa5]是汉字的头和尾
stations = re.findall(r'([\u4e00-\u95fa5]+)\|([A-Z]+)',res.text)
# 注意编码格式utf-8
with open(self.filePath,'w',encoding='utf-8') as f:
# ensure_ascii = False 防止乱码
f.write(json.dumps(dict(stations),ensure_ascii = False))
本地电报码文件部分内容.png
检查用户输入的日期是否在预售期内,同时检查输入的出发地和目的地是否正确(能否在本地
stations.txt
文件里查询到)。
def queryPrompt(self):
trainDate = input('请输入购票时间,格式为2018-01-01:\n')
# 工具类文件里定义了checkDate 方法
timeFlag,trainDate = self.utility.checkDate(trainDate)
if timeFlag == False:
print('日期格式错误或者不在预售期内,请从新输入')
self.queryPrompt()
fromStation = input('请输入出发站:\n')
toStation = input('请输入到达站:\n')
#读取文件并转化为字典
with open(self.filePath, "r", encoding='utf-8') as file:
stationsDict = json.load(file)
# 判断出发站和达到站是否存在
if fromStation not in stationsDict.keys() or toStation not in stationsDict.keys():
print('出错啦: {}或{}不在站点列表中'.format(fromStation, toStation))
# 从字典里获取站点的电报码
fromStationCode = stationsDict[fromStation]
toStationCode = stationsDict[toStation]
queryData = {
'fromStation':fromStation, # 上海
'toStation':toStation, # 成都
'trainDate':trainDate, # 2018-08-22
'fromStationCode':fromStationCode, # SHH
'toStationCode':toStationCode # CDW
}
return queryData # 返回查询数据
车票查询返回的结果
请求查询数据:
def queryTickets(self):
# 获取trainDate,fromStationCode,toStationCode,fromStation和toStation
queryData = self.queryPrompt()
# print(queryData)
params = {
'leftTicketDTO.train_date':queryData['trainDate'],
'leftTicketDTO.from_station':queryData['fromStationCode'],
'leftTicketDTO.to_station':queryData['toStationCode'],
'purpose_codes' : 'ADULT'
}
res = self.session.get(self.queryUrl,params = params)
if res.status_code ==200:
self.getTrainInfo(res.json(),queryData)
def getTrainInfo(self,result,queryData):
trainDict = {} #单个车次信息字典
trains = [] # 存放车辆信息
results = result['data']['result'] #result是查询请求返回的数据(json)
maps = result['data']['map'] #车站名称与电报码
for item in results:
trainInfo = item.split('|')
# for index, item in enumerate(trainInfo, 0):
# print('{}:\t{}'.format(index, item))
# 已经开始卖票
if trainInfo[11] =='Y':
# 备注
# trainDict['remark'] = trainInfo[2]
# 车次
trainDict['trainName'] = self.color.magenta(trainInfo[3])
# 始发站
trainDict['fromStation'] = self.color.green(maps[trainInfo[6]])
# 终点站
trainDict['toStation'] = self.color.red(maps[trainInfo[7]])
# 出发时间
trainDict['departTime'] = self.color.green(trainInfo[8])
# 到达时间
trainDict['arriveTime'] = self.color.red(trainInfo[9])
# 总用时
trainDict['totalTime'] = self.utility.getDuration(trainInfo[10])
# 商务特等座
trainDict['businessSeat'] = trainInfo[32]
# 一等座
trainDict['firstClassSeat'] = trainInfo[31]
# 二等座
trainDict['secondClassSeat'] = trainInfo[30]
# 高级软卧
trainDict['superSoftBerth'] = trainInfo[21]
# 软卧
trainDict['softBerth'] = trainInfo[23]
# 动卧
trainDict['moveBerth'] = trainInfo[33]
# 硬卧
trainDict['hardBerth'] = trainInfo[26]
# 硬座
trainDict['hardSeat'] = trainInfo[28]
# 无座
trainDict['noSeat'] = trainInfo[29]
# 其他
trainDict['otherSeat'] = trainInfo[22]
# 备注
trainDict['remark'] = trainInfo[1].replace('<br/>','')
# 如果值为空,则将值修改为'--',有票则‘有’字显示为绿色,无票‘无’字红色显示
for key in trainDict.keys():
if trainDict[key] == '':
trainDict[key] = '--'
if trainDict[key] =='有':
trainDict[key] = self.color.green('有')
if trainDict[key] =='无':
trainDict[key] = self.color.red('无')
# 价格字典,可根据‘wuzuo’等 key查询车票价格
priceDict = self.getPrice(self.queryPrice(trainInfo))
# 凭2代身份证直接进站 trainInfo[18] =1 的话
train = [trainDict['trainName']+ self.color.green('[ID]') if trainInfo[18] == '1' else trainDict['trainName'], trainDict['fromStation'] + '\n' + trainDict['toStation'],
trainDict['departTime'] + '\n' + trainDict['arriveTime'],
trainDict['totalTime'], trainDict['businessSeat']+ '\n' + priceDict['shangwuzuo'] , trainDict['firstClassSeat']+ '\n' + priceDict['yidengzuo'],
trainDict['secondClassSeat']+ '\n' + priceDict['erdengzuo'], trainDict['superSoftBerth']+ '\n' + priceDict['gaojiruanwo'], trainDict['softBerth']+ '\n' + priceDict['ruanwo'],
trainDict['moveBerth']+ '\n' + priceDict['dongwo'],
trainDict['hardBerth']+ '\n' + priceDict['yingwo'], trainDict['hardSeat']+ '\n' + priceDict['yingzuo'], trainDict['noSeat']+ '\n' + priceDict['wuzuo'], trainDict['otherSeat'],
trainDict['remark']]
# 直接使用append方法将字典添加到列表中,如果需要更改字典中的数据,那么列表中的内容也会发生改变,这是因为dict在Python里是object,不属于primitive
# type(即int、float、string、None、bool)。这意味着你一般操控的是一个指向object(对象)的指针,而非object本身。下面是改善方法:使用copy()
trains.append(train) # 注意trainDict.copy()
# 打印查询到的余票信息
self.prettyPrint(trains,queryData)
def getPrice(self, priceDict):
price ={}
# 无座票价
price['wuzuo'] = self.checkPrice('WZ', priceDict)
# 一等座票价
price['yidengzuo'] = self.checkPrice('M', priceDict)
# 二等座票价
price['erdengzuo'] = self.checkPrice('O', priceDict)
# 高级软卧票价
price['gaojiruanwo'] = self.checkPrice('A6', priceDict)
# 硬卧票价
price['yingwo'] = self.checkPrice('A3', priceDict)
# 软卧票价
price['ruanwo'] = self.checkPrice('A4', priceDict)
# 动卧票价
price['dongwo'] = self.checkPrice('F', priceDict)
# 硬座票价
price['yingzuo'] = self.checkPrice('A1', priceDict)
# 商务座票价
price['shangwuzuo'] = self.checkPrice('A9', priceDict)
return price
def checkPrice(self,key,priceDict):
if key in priceDict.keys():
return self.color.green(priceDict[key])
else:
return ''
def queryPrice(self,trainInfo):
# 注意参数顺序
parameters = {
'train_no': trainInfo[2],
'from_station_no':trainInfo[16],
'to_station_no': trainInfo[17],
'seat_types' :trainInfo[35],
'train_date' : self.utility.getDate(trainInfo[13])
}
res = self.session.get(self.queryPriceUrl,params = parameters)
time.sleep(1) #此处加个延迟
# print(res.url)
# 有时候请求一次可能出现失败的情况,然后url会自动跳转到 http://www.12306.cn/mormhweb/logFiles/error.html
if res.url != 'http://www.12306.cn/mormhweb/logFiles/error.html':
result = res.json()['data']
return result
else:
self.queryPrice(trainInfo)
return
PrettyTable 在terminal 里使用才能看到效果,在IDE里可能会出现|
符号不对齐的现象。
def prettyPrint(self,trains,queryData):
header = ["车次", "车站", "时间", "历时", "商务座","一等座", "二等座",'高级软卧',"软卧", "动卧", "硬卧", "硬座", "无座",'其他','备注']
pt = PrettyTable(header)
date = queryData['trainDate']
pt.title = self.color.cyan('{}——>{}({} {}),共查询到{}个可购票的车次'.format(queryData['fromStation'],queryData['toStation'],self.utility.getDateFormat(date),self.utility.getWeekDay(date),len(trains)))
pt.align["车次"] = "l"
for train in trains:
pt.add_row(train)
print(pt)
定义一个Colored 类
from colorama import init, Fore, Back, Style
init(convert=True)
init(autoreset=False)
class Colored(object):
# 前景色:红色 背景色:默认
def red(self, s):
return Fore.RED + s + Fore.RESET
# 前景色:绿色 背景色:默认
def green(self, s):
return Fore.GREEN + s + Fore.RESET
# 前景色:黄色 背景色:默认
def yellow(self, s):
return Fore.YELLOW + s + Fore.RESET
# 前景色:蓝色 背景色:默认
def blue(self, s):
return Fore.BLUE + s + Fore.RESET
# 前景色:洋红色 背景色:默认
def magenta(self, s):
return Fore.MAGENTA + s + Fore.RESET
# 前景色:青色 背景色:默认
def cyan(self, s):
return Fore.CYAN + s + Fore.RESET
# 前景色:白色 背景色:默认
def white(self, s):
return Fore.WHITE + s + Fore.RESET
# 前景色:黑色 背景色:默认
def black(self, s):
return Fore.BLACK
# 前景色:白色 背景色:绿色
def white_green(self, s):
return Fore.WHITE + Back.GREEN + s + Fore.RESET + Back.RESET
后记
在实际项目中查询各车次不同类型座位车票价格是没有多大意义的,我们只需要查询到余票信息即可。南京到上海每天大概有280列车次,每列车次都进行一次价格请求,一共进行280次,耗时久。本想用多进程完成的,代码也写了,但在运行过程中发现请求一会儿服务器就拒绝请求了,可能是12306做了反爬的措施。
另外,登录并没有真正地成功,登录过程中有很多其他请求。后期准备把完整的登录写出来,再增加订票等内容。
网友评论