美文网首页
python+phantomjs+celery 爬虫

python+phantomjs+celery 爬虫

作者: jojo1313 | 来源:发表于2017-11-03 11:48 被阅读0次

需求:爬取某域名全站所有url
思路:使用celery队列管理将需要爬取的域名扔到后台并发异步执行, phantomjs 封装成server端接受python提交的url请求后返回结果,python利用lxml解析结果

/usr/local/bin/phantomjs server.js 9090 # localhost listen 9090 port.

//server.js file


var port, server, service,
  wait_before_end = 2000,
  system = require('system'),
  webpage = require('webpage');

function lengthInUtf8Bytes(str) {
    // Matches only the 10.. bytes that are non-initial characters in a multi-byte sequence.
    var m = encodeURIComponent(str).match(/%[89ABab]/g);
    return str.length + (m ? m.length : 0);
}

function byteLength(s) {
  // return bytes length of s in utf8 encoding
  return ~-encodeURI(s).split(/%..|./).length
}

if (system.args.length < 2) {
  console.log('Usage: phantomjs_fetcher.js <portnumber> [options]');
  phantom.exit(1);
} else {
  port = system.args[1];
  server = require('webserver').create();
  console.debug = function(){};
  service = server.listen(port, {
    'keepAlive': true
  }, function (request, response) {
    phantom.clearCookies();
    console.debug(JSON.stringify(request, null, 4));
    // check method
    if (request.method == 'GET') {
      body = "method not allowed!";
      response.statusCode = 403;
      response.headers = {
        'Cache': 'no-cache',
        'Content-Length': body.length
      };
      response.write(body);
      response.closeGracefully();
      return;
    }

    var first_response = null,
        finished = false,
        page_loaded = false,
        start_time = Date.now(),
        end_time = null,
        script_executed = false,
        script_result = null;

    var fetch = JSON.parse(request.postRaw);
    console.debug(JSON.stringify(fetch, null, 2));

    // create and set page
    var page = webpage.create();
    page.onConsoleMessage = function(msg) {
        // console.log('console: ' + msg);
    };
    page.viewportSize = {
      width: fetch.js_viewport_width || 1024,
      height: fetch.js_viewport_height || 768*3
    }
    if (fetch.headers) {
      fetch.headers['Accept-Encoding'] = undefined;
      fetch.headers['Connection'] = undefined;
      fetch.headers['Content-Length'] = undefined;
    }
    if (fetch.headers && fetch.headers['User-Agent']) {
      page.settings.userAgent = fetch.headers['User-Agent'];
    }
    // this may cause memory leak: https://github.com/ariya/phantomjs/issues/12903
    // page.settings.loadImages = fetch.load_images === undefined ? false : fetch.load_images;
    page.settings.resourceTimeout = fetch.timeout ? fetch.timeout * 1000 : 120*1000;
    if (fetch.headers) {
      page.customHeaders = fetch.headers;
    }

    // add callbacks
    page.onInitialized = function() {
      if (!script_executed && fetch.js_script && fetch.js_run_at === "document-start") {
        script_executed = true;
        console.log('running document-start script.');/;
        script_result = page.evaluateJavaScript(fetch.js_script);
      }
    };
    page.onLoadFinished = function(status) {
      page_loaded = true;
      if (!script_executed && fetch.js_script && fetch.js_run_at !== "document-start") {
        script_executed = true;
        console.log('running document-end script.');
        script_result = page.evaluateJavaScript(fetch.js_script);
      }
      console.debug("waiting "+wait_before_end+"ms before finished.");
      end_time = Date.now() + wait_before_end;
      setTimeout(make_result, wait_before_end+10, page);
    };
    page.onResourceRequested = function(requestData, request) {
      var url = requestData['url'];
      if ((/\.(jpg|jpeg|png|gif|tif|tiff|mov|swf|icon)$/gi.test(url) ||
          (/\.(doubleclick|googleads|bdstatic|allyes)\./gi).test(url) ||
          requestData['Content-Type'] == 'text/css')) {
        console.debug('The url of the request is matching. Aborting: ' + url);
        request.cancel();
      } else {
        console.debug("Starting request: #"+request.id+" ["+request.method+"]"+request.url);
        end_time = null;
      }
    };

    page.onResourceReceived = function(response) {
      console.debug("Request finished: #"+response.id+" ["+response.status+"]"+response.url);
      if (first_response === null && response.status != 301 && response.status != 302) {
        first_response = response;
      }
      if (page_loaded) {
        console.debug("waiting "+wait_before_end+"ms before finished.");
        end_time = Date.now() + wait_before_end;
        setTimeout(make_result, wait_before_end+10, page);
      }
    }
    page.onResourceError = page.onResourceTimeout=function(response) {
      if (response.errorCode) {
        console.info("Request error: #"+response.id+" ["+response.errorCode+"="+response.errorString+"]"+response.url);
      }
      if (first_response === null) {
        first_response = response;
      }
      if (page_loaded) {
        console.debug("waiting "+wait_before_end+"ms before finished.");
        end_time = Date.now() + wait_before_end;
        setTimeout(make_result, wait_before_end+10, page);
      }
    }

    // make sure request will finished
    setTimeout(function(page) {
      make_result(page);
    }, page.settings.resourceTimeout + 100, page);

    // send request
    page.open(fetch.url, {
      operation: fetch.method,
      data: fetch.data,
    });

    // make response
    function make_result(page) {
      if (finished) {
        return;
      }
      if (Date.now() - start_time < page.settings.resourceTimeout) {
        if (!!!end_time) {
          return;
        }
        if (end_time > Date.now()) {
          setTimeout(make_result, Date.now() - end_time, page);
          return;
        }
      }

      var result = {};
      try {
        result = _make_result(page);
      } catch (e) {
        result = {
          orig_url: fetch.url,
          status_code: 599,
          error: e.toString(),
          content:  '',
          headers: {},
          url: page.url,
          cookies: {},
          time: (Date.now() - start_time) / 1000,
        }
      }

      page.close();
      finished = true;
      console.log("["+result.status_code+"] "+result.orig_url+" "+result.time)

      var body = JSON.stringify(result, null, 2);
      response.writeHead(200, {
        'Cache': 'no-cache',
        'Content-Type': 'application/json',
        'Content-Length': byteLength(body),
      });
      response.write(body);
      response.closeGracefully();
    }

    function _make_result(page) {
      if (first_response === null) {
        throw "No response received!";
      }

      var cookies = {};
      page.cookies.forEach(function(e) {
        cookies[e.name] = e.value;
      });

      var headers = {};
      if (first_response.headers) {
        first_response.headers.forEach(function(e) {
          headers[e.name] = e.value;
        });
      }

      return {
        orig_url: fetch.url,
        status_code: first_response.status || 599,
        error: first_response.errorString,
        content: page.content,
        headers: headers,
        url: page.url,
        cookies: cookies,
        time: (Date.now() - start_time) / 1000,
        js_script_result: script_result,
      }
    }
  });

  if (service) {
    console.log('Web server running on port ' + port);
  } else {
    console.log('Error: Could not create web server listening on port ' + port);
    phantom.exit();
  }
}

python 请求:

def main(url):
   lock = threading.Lock()
   crawl_queue = Queue.Queue()
   crawl_queue.put({'method': 'GET', 'url': url, 'referer': url, 'data': None})

   fetch = {'method': 'GET','headers': {},'use_gzip': True,'timeout': 10,}
   fetch['headers'].update({})
   result = None
   urlset = set()
   links = []
   try:
       while not crawl_queue.empty():
           urldata = crawl_queue.get()
           if urldata:
               fetch['url'] = urldata.get('url')
               fetch['data'] = urldata.get('data', None)
               fetch['method'] = urldata.get('method')
           print fetch['url']
           lock.acquire()
           request_conf = {'headers': {'Content-Type': 'application/x-www-form-urlencoded'}}
           resp = urllib2.Request(url='http://{}/'.format(phantom_server),data=json.dumps(fetch))
           rs = urllib2.urlopen(resp)
           result = json.load(rs)
           if result.get('status_code',0) !=200:
               lock.release()
               continue
           process_webpage(url,result, crawl_queue, urlset, links)
           lock.release()
   except Exception as e:
       pass

python 解析:

def parse_response(link, response, crawl_queue, urlset, links):  # url解析
    HTML_HEADER_PATTERN = re.compile('<html>', re.IGNORECASE)
    html_parser = lxml.html.HTMLParser(collect_ids=False)
    HTML_HEADER_PATTERN = re.compile('<html>', re.IGNORECASE)
    IGNORED_FORM_FIELDS = {'__VIEWSTATE'}

    html_parser = lxml.html.HTMLParser(collect_ids=False)
    page_url = response.get('url')
    body = response.get('content')
    link_parts = urlparse(link)
    domain = link_parts.netloc
    filte_url_type = re.compile(r'.*(\.jpg|\.css)',re.I)
    try:
        doc = lxml.html.fromstring(body, parser=html_parser)
    except lxml.etree.ParserError:
        m = HTML_HEADER_PATTERN.search(body)
        if m is not None:
            body = body[m.start():]
            doc = lxml.html.fromstring(body, parser=html_parser)
        else:
            return
    for node in doc.xpath('//input[@name="__VIEWSTATE"]'):
        node.value = ''
    doc.make_links_absolute(page_url, resolve_base_href=True)
    for element, attribute, url, pos in doc.iterlinks():
        if is_internal_link(domain, url):
            if element.tag != 'form':
                if element.tag == 'img' and url.startswith('data:'):
                    continue
                if not is_internal_link(domain, url, check_ext=False):
                    continue
                is_static = element.tag in {'link', 'script', 'img', 'embed', 'param', 'audio', 'video', 'source'}
                method, should_crawl = 'GET', False
                urldata = {'method': method, 'url': url, 'referer': page_url}
                if url not in urlset and not filte_url_type.match(url):
                    urlset.add(url)
                  #  crawl_queue.put(urldata)
                    links.append(urldata)
            for idx, form in enumerate(doc.forms):
                action = form.action or page_url
                if not is_internal_link(domain, action):
                    continue
                parts = urlparse(action)
                method = form.method.upper()
                body = urlencode([(f, v or '8') for f, v in form.fields.items() if f not in IGNORED_FORM_FIELDS])
                if method == 'GET' and body:
                    if parts.query:
                        action = '{}&{}'.format(action, body)
                    else:
                        action = '{}?{}'.format(action, body)
                urldata = {'method': method, 'url': url, 'referer': page_url,'data': body}
                if url not in urlset and not filte_url_type.match(url):
                    urlset.add(url)
                   # crawl_queue.put(urldata)
                    links.append(urldata)

相关文章

  • python+phantomjs+celery 爬虫

    需求:爬取某域名全站所有url思路:使用celery队列管理将需要爬取的域名扔到后台并发异步执行, phantom...

  • 11.20-11.26

    本周目标 爬虫 爬虫 爬虫 爬虫

  • 爬虫入门基础

    Day01 一、爬虫介绍 什么是爬虫 Python爬虫的优势 Python爬虫需要掌握什么 爬虫与反爬虫与反反爬虫...

  • 01-认识爬虫

    一、爬虫介绍 什么是爬虫 Python爬虫的优势 Python爬虫需要掌握什么 爬虫与反爬虫与反反爬虫三角之争 网...

  • 爬虫原理与数据抓取之一: 通用爬虫和聚焦爬虫

    通用爬虫和聚焦爬虫 根据使用场景,网络爬虫可分为 通用爬虫 和 聚焦爬虫 两种. 通用爬虫 通用网络爬虫 是 捜索...

  • (了解)通用爬虫和聚焦爬虫--爬虫基础教程(python)(二)

    通用爬虫和聚焦爬虫 根据使用场景,网络爬虫可分为 通用爬虫 和 聚焦爬虫 两种.我们主要写通用爬虫。 通用爬虫 通...

  • Python 网络爬虫(一)

    网络爬虫的基本介绍 学习爬虫,我想主要从以下几个方面来切入 -爬虫的原理? -爬虫的作用? -爬虫的实现? -爬虫...

  • 7.爬虫概述

    爬虫概述 知识点: 了解 爬虫的概念 了解 爬虫的作用 了解 爬虫的分类 掌握 爬虫的流程 1. 爬虫的概念 模拟...

  • 1-基本概念

    简介 为什么选择Python做爬虫 需要技能 爬虫与反爬虫 网络爬虫类型 通用网络爬虫 聚焦网络爬虫 增量式网络爬...

  • 认识爬虫

    前言 我的爬虫笔记 经常看别人通过爬虫分析数据,很有意思,来了兴趣,就开始了爬虫之路。 爬虫 爬虫,即网络爬虫,大...

网友评论

      本文标题:python+phantomjs+celery 爬虫

      本文链接:https://www.haomeiwen.com/subject/bwxtmxtx.html