需求:爬取某域名全站所有url
思路:使用celery队列管理将需要爬取的域名扔到后台并发异步执行, phantomjs 封装成server端接受python提交的url请求后返回结果,python利用lxml解析结果
/usr/local/bin/phantomjs server.js 9090 # localhost listen 9090 port.
//server.js file
var port, server, service,
wait_before_end = 2000,
system = require('system'),
webpage = require('webpage');
function lengthInUtf8Bytes(str) {
// Matches only the 10.. bytes that are non-initial characters in a multi-byte sequence.
var m = encodeURIComponent(str).match(/%[89ABab]/g);
return str.length + (m ? m.length : 0);
}
function byteLength(s) {
// return bytes length of s in utf8 encoding
return ~-encodeURI(s).split(/%..|./).length
}
if (system.args.length < 2) {
console.log('Usage: phantomjs_fetcher.js <portnumber> [options]');
phantom.exit(1);
} else {
port = system.args[1];
server = require('webserver').create();
console.debug = function(){};
service = server.listen(port, {
'keepAlive': true
}, function (request, response) {
phantom.clearCookies();
console.debug(JSON.stringify(request, null, 4));
// check method
if (request.method == 'GET') {
body = "method not allowed!";
response.statusCode = 403;
response.headers = {
'Cache': 'no-cache',
'Content-Length': body.length
};
response.write(body);
response.closeGracefully();
return;
}
var first_response = null,
finished = false,
page_loaded = false,
start_time = Date.now(),
end_time = null,
script_executed = false,
script_result = null;
var fetch = JSON.parse(request.postRaw);
console.debug(JSON.stringify(fetch, null, 2));
// create and set page
var page = webpage.create();
page.onConsoleMessage = function(msg) {
// console.log('console: ' + msg);
};
page.viewportSize = {
width: fetch.js_viewport_width || 1024,
height: fetch.js_viewport_height || 768*3
}
if (fetch.headers) {
fetch.headers['Accept-Encoding'] = undefined;
fetch.headers['Connection'] = undefined;
fetch.headers['Content-Length'] = undefined;
}
if (fetch.headers && fetch.headers['User-Agent']) {
page.settings.userAgent = fetch.headers['User-Agent'];
}
// this may cause memory leak: https://github.com/ariya/phantomjs/issues/12903
// page.settings.loadImages = fetch.load_images === undefined ? false : fetch.load_images;
page.settings.resourceTimeout = fetch.timeout ? fetch.timeout * 1000 : 120*1000;
if (fetch.headers) {
page.customHeaders = fetch.headers;
}
// add callbacks
page.onInitialized = function() {
if (!script_executed && fetch.js_script && fetch.js_run_at === "document-start") {
script_executed = true;
console.log('running document-start script.');/;
script_result = page.evaluateJavaScript(fetch.js_script);
}
};
page.onLoadFinished = function(status) {
page_loaded = true;
if (!script_executed && fetch.js_script && fetch.js_run_at !== "document-start") {
script_executed = true;
console.log('running document-end script.');
script_result = page.evaluateJavaScript(fetch.js_script);
}
console.debug("waiting "+wait_before_end+"ms before finished.");
end_time = Date.now() + wait_before_end;
setTimeout(make_result, wait_before_end+10, page);
};
page.onResourceRequested = function(requestData, request) {
var url = requestData['url'];
if ((/\.(jpg|jpeg|png|gif|tif|tiff|mov|swf|icon)$/gi.test(url) ||
(/\.(doubleclick|googleads|bdstatic|allyes)\./gi).test(url) ||
requestData['Content-Type'] == 'text/css')) {
console.debug('The url of the request is matching. Aborting: ' + url);
request.cancel();
} else {
console.debug("Starting request: #"+request.id+" ["+request.method+"]"+request.url);
end_time = null;
}
};
page.onResourceReceived = function(response) {
console.debug("Request finished: #"+response.id+" ["+response.status+"]"+response.url);
if (first_response === null && response.status != 301 && response.status != 302) {
first_response = response;
}
if (page_loaded) {
console.debug("waiting "+wait_before_end+"ms before finished.");
end_time = Date.now() + wait_before_end;
setTimeout(make_result, wait_before_end+10, page);
}
}
page.onResourceError = page.onResourceTimeout=function(response) {
if (response.errorCode) {
console.info("Request error: #"+response.id+" ["+response.errorCode+"="+response.errorString+"]"+response.url);
}
if (first_response === null) {
first_response = response;
}
if (page_loaded) {
console.debug("waiting "+wait_before_end+"ms before finished.");
end_time = Date.now() + wait_before_end;
setTimeout(make_result, wait_before_end+10, page);
}
}
// make sure request will finished
setTimeout(function(page) {
make_result(page);
}, page.settings.resourceTimeout + 100, page);
// send request
page.open(fetch.url, {
operation: fetch.method,
data: fetch.data,
});
// make response
function make_result(page) {
if (finished) {
return;
}
if (Date.now() - start_time < page.settings.resourceTimeout) {
if (!!!end_time) {
return;
}
if (end_time > Date.now()) {
setTimeout(make_result, Date.now() - end_time, page);
return;
}
}
var result = {};
try {
result = _make_result(page);
} catch (e) {
result = {
orig_url: fetch.url,
status_code: 599,
error: e.toString(),
content: '',
headers: {},
url: page.url,
cookies: {},
time: (Date.now() - start_time) / 1000,
}
}
page.close();
finished = true;
console.log("["+result.status_code+"] "+result.orig_url+" "+result.time)
var body = JSON.stringify(result, null, 2);
response.writeHead(200, {
'Cache': 'no-cache',
'Content-Type': 'application/json',
'Content-Length': byteLength(body),
});
response.write(body);
response.closeGracefully();
}
function _make_result(page) {
if (first_response === null) {
throw "No response received!";
}
var cookies = {};
page.cookies.forEach(function(e) {
cookies[e.name] = e.value;
});
var headers = {};
if (first_response.headers) {
first_response.headers.forEach(function(e) {
headers[e.name] = e.value;
});
}
return {
orig_url: fetch.url,
status_code: first_response.status || 599,
error: first_response.errorString,
content: page.content,
headers: headers,
url: page.url,
cookies: cookies,
time: (Date.now() - start_time) / 1000,
js_script_result: script_result,
}
}
});
if (service) {
console.log('Web server running on port ' + port);
} else {
console.log('Error: Could not create web server listening on port ' + port);
phantom.exit();
}
}
python 请求:
def main(url):
lock = threading.Lock()
crawl_queue = Queue.Queue()
crawl_queue.put({'method': 'GET', 'url': url, 'referer': url, 'data': None})
fetch = {'method': 'GET','headers': {},'use_gzip': True,'timeout': 10,}
fetch['headers'].update({})
result = None
urlset = set()
links = []
try:
while not crawl_queue.empty():
urldata = crawl_queue.get()
if urldata:
fetch['url'] = urldata.get('url')
fetch['data'] = urldata.get('data', None)
fetch['method'] = urldata.get('method')
print fetch['url']
lock.acquire()
request_conf = {'headers': {'Content-Type': 'application/x-www-form-urlencoded'}}
resp = urllib2.Request(url='http://{}/'.format(phantom_server),data=json.dumps(fetch))
rs = urllib2.urlopen(resp)
result = json.load(rs)
if result.get('status_code',0) !=200:
lock.release()
continue
process_webpage(url,result, crawl_queue, urlset, links)
lock.release()
except Exception as e:
pass
python 解析:
def parse_response(link, response, crawl_queue, urlset, links): # url解析
HTML_HEADER_PATTERN = re.compile('<html>', re.IGNORECASE)
html_parser = lxml.html.HTMLParser(collect_ids=False)
HTML_HEADER_PATTERN = re.compile('<html>', re.IGNORECASE)
IGNORED_FORM_FIELDS = {'__VIEWSTATE'}
html_parser = lxml.html.HTMLParser(collect_ids=False)
page_url = response.get('url')
body = response.get('content')
link_parts = urlparse(link)
domain = link_parts.netloc
filte_url_type = re.compile(r'.*(\.jpg|\.css)',re.I)
try:
doc = lxml.html.fromstring(body, parser=html_parser)
except lxml.etree.ParserError:
m = HTML_HEADER_PATTERN.search(body)
if m is not None:
body = body[m.start():]
doc = lxml.html.fromstring(body, parser=html_parser)
else:
return
for node in doc.xpath('//input[@name="__VIEWSTATE"]'):
node.value = ''
doc.make_links_absolute(page_url, resolve_base_href=True)
for element, attribute, url, pos in doc.iterlinks():
if is_internal_link(domain, url):
if element.tag != 'form':
if element.tag == 'img' and url.startswith('data:'):
continue
if not is_internal_link(domain, url, check_ext=False):
continue
is_static = element.tag in {'link', 'script', 'img', 'embed', 'param', 'audio', 'video', 'source'}
method, should_crawl = 'GET', False
urldata = {'method': method, 'url': url, 'referer': page_url}
if url not in urlset and not filte_url_type.match(url):
urlset.add(url)
# crawl_queue.put(urldata)
links.append(urldata)
for idx, form in enumerate(doc.forms):
action = form.action or page_url
if not is_internal_link(domain, action):
continue
parts = urlparse(action)
method = form.method.upper()
body = urlencode([(f, v or '8') for f, v in form.fields.items() if f not in IGNORED_FORM_FIELDS])
if method == 'GET' and body:
if parts.query:
action = '{}&{}'.format(action, body)
else:
action = '{}?{}'.format(action, body)
urldata = {'method': method, 'url': url, 'referer': page_url,'data': body}
if url not in urlset and not filte_url_type.match(url):
urlset.add(url)
# crawl_queue.put(urldata)
links.append(urldata)
网友评论