美文网首页
python+phantomjs+celery 爬虫

python+phantomjs+celery 爬虫

作者: jojo1313 | 来源:发表于2017-11-03 11:48 被阅读0次

    需求:爬取某域名全站所有url
    思路:使用celery队列管理将需要爬取的域名扔到后台并发异步执行, phantomjs 封装成server端接受python提交的url请求后返回结果,python利用lxml解析结果

    /usr/local/bin/phantomjs server.js 9090 # localhost listen 9090 port.

    //server.js file
    
    
    var port, server, service,
      wait_before_end = 2000,
      system = require('system'),
      webpage = require('webpage');
    
    function lengthInUtf8Bytes(str) {
        // Matches only the 10.. bytes that are non-initial characters in a multi-byte sequence.
        var m = encodeURIComponent(str).match(/%[89ABab]/g);
        return str.length + (m ? m.length : 0);
    }
    
    function byteLength(s) {
      // return bytes length of s in utf8 encoding
      return ~-encodeURI(s).split(/%..|./).length
    }
    
    if (system.args.length < 2) {
      console.log('Usage: phantomjs_fetcher.js <portnumber> [options]');
      phantom.exit(1);
    } else {
      port = system.args[1];
      server = require('webserver').create();
      console.debug = function(){};
      service = server.listen(port, {
        'keepAlive': true
      }, function (request, response) {
        phantom.clearCookies();
        console.debug(JSON.stringify(request, null, 4));
        // check method
        if (request.method == 'GET') {
          body = "method not allowed!";
          response.statusCode = 403;
          response.headers = {
            'Cache': 'no-cache',
            'Content-Length': body.length
          };
          response.write(body);
          response.closeGracefully();
          return;
        }
    
        var first_response = null,
            finished = false,
            page_loaded = false,
            start_time = Date.now(),
            end_time = null,
            script_executed = false,
            script_result = null;
    
        var fetch = JSON.parse(request.postRaw);
        console.debug(JSON.stringify(fetch, null, 2));
    
        // create and set page
        var page = webpage.create();
        page.onConsoleMessage = function(msg) {
            // console.log('console: ' + msg);
        };
        page.viewportSize = {
          width: fetch.js_viewport_width || 1024,
          height: fetch.js_viewport_height || 768*3
        }
        if (fetch.headers) {
          fetch.headers['Accept-Encoding'] = undefined;
          fetch.headers['Connection'] = undefined;
          fetch.headers['Content-Length'] = undefined;
        }
        if (fetch.headers && fetch.headers['User-Agent']) {
          page.settings.userAgent = fetch.headers['User-Agent'];
        }
        // this may cause memory leak: https://github.com/ariya/phantomjs/issues/12903
        // page.settings.loadImages = fetch.load_images === undefined ? false : fetch.load_images;
        page.settings.resourceTimeout = fetch.timeout ? fetch.timeout * 1000 : 120*1000;
        if (fetch.headers) {
          page.customHeaders = fetch.headers;
        }
    
        // add callbacks
        page.onInitialized = function() {
          if (!script_executed && fetch.js_script && fetch.js_run_at === "document-start") {
            script_executed = true;
            console.log('running document-start script.');/;
            script_result = page.evaluateJavaScript(fetch.js_script);
          }
        };
        page.onLoadFinished = function(status) {
          page_loaded = true;
          if (!script_executed && fetch.js_script && fetch.js_run_at !== "document-start") {
            script_executed = true;
            console.log('running document-end script.');
            script_result = page.evaluateJavaScript(fetch.js_script);
          }
          console.debug("waiting "+wait_before_end+"ms before finished.");
          end_time = Date.now() + wait_before_end;
          setTimeout(make_result, wait_before_end+10, page);
        };
        page.onResourceRequested = function(requestData, request) {
          var url = requestData['url'];
          if ((/\.(jpg|jpeg|png|gif|tif|tiff|mov|swf|icon)$/gi.test(url) ||
              (/\.(doubleclick|googleads|bdstatic|allyes)\./gi).test(url) ||
              requestData['Content-Type'] == 'text/css')) {
            console.debug('The url of the request is matching. Aborting: ' + url);
            request.cancel();
          } else {
            console.debug("Starting request: #"+request.id+" ["+request.method+"]"+request.url);
            end_time = null;
          }
        };
    
        page.onResourceReceived = function(response) {
          console.debug("Request finished: #"+response.id+" ["+response.status+"]"+response.url);
          if (first_response === null && response.status != 301 && response.status != 302) {
            first_response = response;
          }
          if (page_loaded) {
            console.debug("waiting "+wait_before_end+"ms before finished.");
            end_time = Date.now() + wait_before_end;
            setTimeout(make_result, wait_before_end+10, page);
          }
        }
        page.onResourceError = page.onResourceTimeout=function(response) {
          if (response.errorCode) {
            console.info("Request error: #"+response.id+" ["+response.errorCode+"="+response.errorString+"]"+response.url);
          }
          if (first_response === null) {
            first_response = response;
          }
          if (page_loaded) {
            console.debug("waiting "+wait_before_end+"ms before finished.");
            end_time = Date.now() + wait_before_end;
            setTimeout(make_result, wait_before_end+10, page);
          }
        }
    
        // make sure request will finished
        setTimeout(function(page) {
          make_result(page);
        }, page.settings.resourceTimeout + 100, page);
    
        // send request
        page.open(fetch.url, {
          operation: fetch.method,
          data: fetch.data,
        });
    
        // make response
        function make_result(page) {
          if (finished) {
            return;
          }
          if (Date.now() - start_time < page.settings.resourceTimeout) {
            if (!!!end_time) {
              return;
            }
            if (end_time > Date.now()) {
              setTimeout(make_result, Date.now() - end_time, page);
              return;
            }
          }
    
          var result = {};
          try {
            result = _make_result(page);
          } catch (e) {
            result = {
              orig_url: fetch.url,
              status_code: 599,
              error: e.toString(),
              content:  '',
              headers: {},
              url: page.url,
              cookies: {},
              time: (Date.now() - start_time) / 1000,
            }
          }
    
          page.close();
          finished = true;
          console.log("["+result.status_code+"] "+result.orig_url+" "+result.time)
    
          var body = JSON.stringify(result, null, 2);
          response.writeHead(200, {
            'Cache': 'no-cache',
            'Content-Type': 'application/json',
            'Content-Length': byteLength(body),
          });
          response.write(body);
          response.closeGracefully();
        }
    
        function _make_result(page) {
          if (first_response === null) {
            throw "No response received!";
          }
    
          var cookies = {};
          page.cookies.forEach(function(e) {
            cookies[e.name] = e.value;
          });
    
          var headers = {};
          if (first_response.headers) {
            first_response.headers.forEach(function(e) {
              headers[e.name] = e.value;
            });
          }
    
          return {
            orig_url: fetch.url,
            status_code: first_response.status || 599,
            error: first_response.errorString,
            content: page.content,
            headers: headers,
            url: page.url,
            cookies: cookies,
            time: (Date.now() - start_time) / 1000,
            js_script_result: script_result,
          }
        }
      });
    
      if (service) {
        console.log('Web server running on port ' + port);
      } else {
        console.log('Error: Could not create web server listening on port ' + port);
        phantom.exit();
      }
    }
    

    python 请求:

    def main(url):
       lock = threading.Lock()
       crawl_queue = Queue.Queue()
       crawl_queue.put({'method': 'GET', 'url': url, 'referer': url, 'data': None})
    
       fetch = {'method': 'GET','headers': {},'use_gzip': True,'timeout': 10,}
       fetch['headers'].update({})
       result = None
       urlset = set()
       links = []
       try:
           while not crawl_queue.empty():
               urldata = crawl_queue.get()
               if urldata:
                   fetch['url'] = urldata.get('url')
                   fetch['data'] = urldata.get('data', None)
                   fetch['method'] = urldata.get('method')
               print fetch['url']
               lock.acquire()
               request_conf = {'headers': {'Content-Type': 'application/x-www-form-urlencoded'}}
               resp = urllib2.Request(url='http://{}/'.format(phantom_server),data=json.dumps(fetch))
               rs = urllib2.urlopen(resp)
               result = json.load(rs)
               if result.get('status_code',0) !=200:
                   lock.release()
                   continue
               process_webpage(url,result, crawl_queue, urlset, links)
               lock.release()
       except Exception as e:
           pass
    
    

    python 解析:

    def parse_response(link, response, crawl_queue, urlset, links):  # url解析
        HTML_HEADER_PATTERN = re.compile('<html>', re.IGNORECASE)
        html_parser = lxml.html.HTMLParser(collect_ids=False)
        HTML_HEADER_PATTERN = re.compile('<html>', re.IGNORECASE)
        IGNORED_FORM_FIELDS = {'__VIEWSTATE'}
    
        html_parser = lxml.html.HTMLParser(collect_ids=False)
        page_url = response.get('url')
        body = response.get('content')
        link_parts = urlparse(link)
        domain = link_parts.netloc
        filte_url_type = re.compile(r'.*(\.jpg|\.css)',re.I)
        try:
            doc = lxml.html.fromstring(body, parser=html_parser)
        except lxml.etree.ParserError:
            m = HTML_HEADER_PATTERN.search(body)
            if m is not None:
                body = body[m.start():]
                doc = lxml.html.fromstring(body, parser=html_parser)
            else:
                return
        for node in doc.xpath('//input[@name="__VIEWSTATE"]'):
            node.value = ''
        doc.make_links_absolute(page_url, resolve_base_href=True)
        for element, attribute, url, pos in doc.iterlinks():
            if is_internal_link(domain, url):
                if element.tag != 'form':
                    if element.tag == 'img' and url.startswith('data:'):
                        continue
                    if not is_internal_link(domain, url, check_ext=False):
                        continue
                    is_static = element.tag in {'link', 'script', 'img', 'embed', 'param', 'audio', 'video', 'source'}
                    method, should_crawl = 'GET', False
                    urldata = {'method': method, 'url': url, 'referer': page_url}
                    if url not in urlset and not filte_url_type.match(url):
                        urlset.add(url)
                      #  crawl_queue.put(urldata)
                        links.append(urldata)
                for idx, form in enumerate(doc.forms):
                    action = form.action or page_url
                    if not is_internal_link(domain, action):
                        continue
                    parts = urlparse(action)
                    method = form.method.upper()
                    body = urlencode([(f, v or '8') for f, v in form.fields.items() if f not in IGNORED_FORM_FIELDS])
                    if method == 'GET' and body:
                        if parts.query:
                            action = '{}&{}'.format(action, body)
                        else:
                            action = '{}?{}'.format(action, body)
                    urldata = {'method': method, 'url': url, 'referer': page_url,'data': body}
                    if url not in urlset and not filte_url_type.match(url):
                        urlset.add(url)
                       # crawl_queue.put(urldata)
                        links.append(urldata)
    

    相关文章

      网友评论

          本文标题:python+phantomjs+celery 爬虫

          本文链接:https://www.haomeiwen.com/subject/bwxtmxtx.html