反爬虫机制
标签(空格分隔): python scrapy
scrapy 架构
useragent 用户代理切换
-
fake-useragent
-
用户代理切换引用开源项目做处理
class RandomUserAgentMiddleware(object):
# useragent代理切换
def __init__(self,crawler):
super(RandomUserAgentMiddleware,self).__init__
self.ua = UserAgent()
self.ua_type = crawler.settings.get('RANDOM_UA_TYPE','random')
@classmethod
def from_crawler(cls,crawler):
return cls(crawler)
def process_request(self,request,spider):
def get_ua_type():
return getattr(self.ua,self.ua_type)
request.headers.serdefault('User-Agent',get_ua_type())
IP代理
request.meta['proxy'] =
# -*- coding: utf-8 -*-
__author__ = 'bobby'
import requests
from scrapy.selector import Selector
import MySQLdb
conn = MySQLdb.connect(host="127.0.0.1", user="root", passwd="root", db="article_spider", charset="utf8")
cursor = conn.cursor()
def crawl_ips():
#爬取西刺的免费ip代理
headers = {"User-Agent":"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101 Firefox/52.0"}
for i in range(1568):
re = requests.get("http://www.xicidaili.com/nn/{0}".format(i), headers=headers)
selector = Selector(text=re.text)
all_trs = selector.css("#ip_list tr")
ip_list = []
for tr in all_trs[1:]:
speed_str = tr.css(".bar::attr(title)").extract()[0]
if speed_str:
speed = float(speed_str.split("秒")[0])
all_texts = tr.css("td::text").extract()
ip = all_texts[0]
port = all_texts[1]
proxy_type = all_texts[5]
ip_list.append((ip, port, proxy_type, speed))
for ip_info in ip_list:
cursor.execute(
"insert proxy_ip(ip, port, speed, proxy_type) VALUES('{0}', '{1}', {2}, 'HTTP')".format(
ip_info[0], ip_info[1], ip_info[3]
)
)
conn.commit()
class GetIP(object):
def delete_ip(self, ip):
#从数据库中删除无效的ip
delete_sql = """
delete from proxy_ip where ip='{0}'
""".format(ip)
cursor.execute(delete_sql)
conn.commit()
return True
def judge_ip(self, ip, port):
#判断ip是否可用
http_url = "http://www.baidu.com"
proxy_url = "http://{0}:{1}".format(ip, port)
try:
proxy_dict = {
"http":proxy_url,
}
response = requests.get(http_url, proxies=proxy_dict)
except Exception as e:
print ("invalid ip and port")
self.delete_ip(ip)
return False
else:
code = response.status_code
if code >= 200 and code < 300:
print ("effective ip")
return True
else:
print ("invalid ip and port")
self.delete_ip(ip)
return False
def get_random_ip(self):
#从数据库中随机获取一个可用的ip
random_sql = """
SELECT ip, port FROM proxy_ip
ORDER BY RAND()
LIMIT 1
"""
result = cursor.execute(random_sql)
for ip_info in cursor.fetchall():
ip = ip_info[0]
port = ip_info[1]
judge_re = self.judge_ip(ip, port)
if judge_re:
return "http://{0}:{1}".format(ip, port)
else:
return self.get_random_ip()
# print (crawl_ips())
if __name__ == "__main__":
get_ip = GetIP()
get_ip.get_random_ip()
- 采用爬取西刺ip进行
- 采用官方scrapy-crawlera
- 采用tor+vpn
云打码
- 在线打码
import json
import requests
class YDMHttp(object):
apiurl = 'http://api.yundama.com/api.php'
username = ''
password = ''
appid = ''
appkey = ''
def __init__(self, username, password, appid, appkey):
self.username = username
self.password = password
self.appid = str(appid)
self.appkey = appkey
def balance(self):
data = {'method': 'balance', 'username': self.username, 'password': self.password, 'appid': self.appid, 'appkey': self.appkey}
response_data = requests.post(self.apiurl, data=data)
ret_data = json.loads(response_data.text)
if ret_data["ret"] == 0:
print ("获取剩余积分", ret_data["balance"])
return ret_data["balance"]
else:
return None
def login(self):
data = {'method': 'login', 'username': self.username, 'password': self.password, 'appid': self.appid, 'appkey': self.appkey}
response_data = requests.post(self.apiurl, data=data)
ret_data = json.loads(response_data.text)
if ret_data["ret"] == 0:
print ("登录成功", ret_data["uid"])
return ret_data["uid"]
else:
return None
def decode(self, filename, codetype, timeout):
data = {'method': 'upload', 'username': self.username, 'password': self.password, 'appid': self.appid, 'appkey': self.appkey, 'codetype': str(codetype), 'timeout': str(timeout)}
files = {'file': open(filename, 'rb')}
response_data = requests.post(self.apiurl, files=files, data=data)
ret_data = json.loads(response_data.text)
if ret_data["ret"] == 0:
print ("识别成功", ret_data["text"])
return ret_data["text"]
else:
return None
if __name__ == "__main__":
# 用户名
username = 'da_ge_da1'
# 密码
password = 'da_ge_da'
# 软件ID,开发者分成必要参数。登录开发者后台【我的软件】获得!
appid = 3129
# 软件密钥,开发者分成必要参数。登录开发者后台【我的软件】获得!
appkey = '40d5ad41c047179fc797631e3b9c3025'
# 图片文件
filename = 'getimage.jpg'
# 验证码类型,# 例:1004表示4位字母数字,不同类型收费不同。请准确填写,否则影响识别率。在此查询所有类型 http://www.yundama.com/price.html
codetype = 1004
# 超时时间,秒
timeout = 60
# 检查
if (username == 'username'):
print ('请设置好相关参数再测试')
else:
# 初始化
yundama = YDMHttp(username, password, appid, appkey)
# 登陆云打码
uid = yundama.login();
print ('uid: %s' % uid)
# 查询余额
balance = yundama.balance();
print ('balance: %s' % balance)
# 开始识别,图片路径,验证码类型ID,超时时间(秒),识别结果
text = yundama.decode(filename, codetype, timeout);
- 原视频UP主慕课网(聚焦Python分布式爬虫必学框架Scrapy 打造搜索引擎)
- 本篇博客撰写人: XiaoJinZi 个人主页 转载请注明出处
- 学生能力有限 附上邮箱: 986209501@qq.com 不足以及误处请大佬指责
网友评论