crawlspider源码分析


查看可用 scrapy 模板

  • 进入项目目录,输入以下命令

    scrapy genspider --list

  • 可用模板列表

    Available templates:
      basic          # 默认模板
      crawl          # 全站爬取
      csvfeed      # CSV 源模板
      xmlfeed      # XML 源模板

  • basic
  • crawl

    CrawlSpider 是爬取那些具有一定规则网站的常用的爬虫,它基于Spider并有一些独特属性

    • 基于 Spider 类,进一步封装

    • rules: 是Rule 对象的集合,用于匹配目标网站并排除干扰

    • parse_start_url: 用于爬取起始响应,必须要返回item,Request中的一个

    • _parse_response: 是CrawlSpider 的核心方法

    创建CrawlSpider

    - > scrapy genspider -t crawl spider_name spider_url

    注意:在CrawlSpider中,不可以进行重构 parse 方法,因为它已经被CrawlSpider占用,可以使用parse_strat_url方法替代

  • csvfeed
  • xmlfeed


  • 添加 source root -> settings 配置中

    # 添加所属目录 -> PythonPath 中
    sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
  • CrawlSpider 源码逻辑概述

    class LagouSpider(CrawlSpider):
        name = 'lagou'
        allowed_domains = ['www.lagou.com']
        start_urls = ['http://www.lagou.com/']
    
        rules = (
            # 参数为可迭代对象
            # rule 实例,LinkExtractor (链接提取器) 实例
            # 可以更改域名,  一般大型网站都有负载均衡处理, 在某个城市进行CDN, 每个城市的URL/IP地址都是不一样的, 获取到多个城市的URL后,可以进行随机IP访问,减少了IP被检测的概率
            Rule(LinkExtractor(allow=r'Items/'), callback='parse_item', follow=True),
        )
    
        def parse_job(self, response):
            """解析拉勾网职位信息"""
            item = {}
            #item['domain_id'] = response.xpath('//input[@id="sid"]/@value').get()
            #item['name'] = response.xpath('//div[@id="name"]').get()
            #item['description'] = response.xpath('//div[@id="description"]').get()
            return item
    
    class CrawlSpider(Spider):
    
        rules = ()
    
        # 在 CrawlSpider 初始化时, 调用 compile_rules 方法
        def __init__(self, *a, **kw):
            super(CrawlSpider, self).__init__(*a, **kw)
            self._compile_rules()
    
        def parse(self, response):
            return self._parse_response(response, self.parse_start_url, cb_kwargs={}, follow=True)
    
        def parse_start_url(self, response):
            return []
    
        def process_results(self, response, results):
            return results
    
        def _build_request(self, rule_index, link):
            return Request(
                url=link.url,
                callback=self._callback,
                errback=self._errback,
                meta=dict(rule=rule_index, link_text=link.text),
            )
    
        def _requests_to_follow(self, response):
            """要求遵循"""
            # 判断是否为 HTMLResponse
            if not isinstance(response, HtmlResponse):
                return
            # 新建一个 set 类型局部变量, 对 response 中的 url 进行去重
            seen = set()
            # 通过 enumerate 把 _rules 改变为一个可迭代的对象
            for rule_index, rule in enumerate(self._rules):
                # 把 response 传递给 link_extractor 类 extract_links 方法, 提取出具体的 link
                links = [lnk for lnk in rule.link_extractor.extract_links(response)
                         if lnk not in seen]
                # 自定义 process_links 方法, 传递给 Rule 类, 抽取出 link 添加至 set 中
                for link in rule.process_links(links):
                    seen.add(link)
                    request = self._build_request(rule_index, link)
                    yield rule._process_request(request, response)
    
        def _callback(self, response):
            rule = self._rules[response.meta['rule']]
            return self._parse_response(response, rule.callback, rule.cb_kwargs, rule.follow)
    
        def _errback(self, failure):
            rule = self._rules[failure.request.meta['rule']]
            return self._handle_failure(failure, rule.errback)
    
        def _parse_response(self, response, callback, cb_kwargs, follow=True):
            """
            CrawlSpider 中核心方法
            :param response:
            :param callback: 回调方法名 self.parse_start_url
            :param cb_kwargs: 获取 parse_start_url 返回的参数
            :param follow:
            :return:
            """
            if callback:
                cb_res = callback(response, **cb_kwargs) or ()
                # 交由 process_results 方法
                cb_res = self.process_results(response, cb_res)
                # 对返回结果,进行迭代(抛出 Item,交给 Scrapy 进行传递)
                for request_or_item in iterate_spider_output(cb_res):
                    yield request_or_item
            # CrawlSpider 核心中的核心, 默认进行跟随链接, 改变follow/_follow_links bool 值, 决定是否跟随
            if follow and self._follow_links:
                for request_or_item in self._requests_to_follow(response):
                    yield request_or_item
    
        def _handle_failure(self, failure, errback):
            if errback:
                results = errback(failure) or ()
                for request_or_item in iterate_spider_output(results):
                    yield request_or_item
    
        def _compile_rules(self):
            """制定规则"""
            # 生成实例变量
            self._rules = []
            for rule in self.rules:
                # 浅拷贝 rule 值
                self._rules.append(copy.copy(rule))
                self._rules[-1]._compile(self)
    
        @classmethod
        def from_crawler(cls, crawler, *args, **kwargs):
            spider = super(CrawlSpider, cls).from_crawler(crawler, *args, **kwargs)
            # 获取 settings 中 'CRAWLSPIDER_FOLLOW_LINKS' 参数 (需自定义),若没定义则取默认值 True
            # 若设置 'CRAWLSPIDER_FOLLOW_LINKS' 参数为False,rules 则会失效
            spider._follow_links = crawler.settings.getbool('CRAWLSPIDER_FOLLOW_LINKS', True)
            return spider
    

  • Rule 类

    class Rule:
    
        def __init__(self, link_extractor=None, callback=None, cb_kwargs=None, follow=None,
                     process_links=None, process_request=None, errback=None):
            self.link_extractor = link_extractor or _default_link_extractor
            self.callback = callback
            self.errback = errback
            self.cb_kwargs = cb_kwargs or {}
            self.process_links = process_links or _identity
            self.process_request = process_request or _identity_process_request
            self.process_request_argcount = None
            self.follow = follow if follow is not None else not callback
    • link_extractor 一个具体的 extractor 类,用于完成 url 的抽取
    • callback 回调函数
    • cb_kwargs 传递给 link_extractor 的参数
    • follow 满足 rule 的 url 是否进行跟踪
    • process_links 可以自定义的预处理方法 参数类型 -> function
    • process_request 对 request 进行处理 参数类型 -> function

  • LinkExtractor 类

    class LxmlLinkExtractor(FilteringLinkExtractor):
    
        def __init__(self, allow=(), deny=(), allow_domains=(), deny_domains=(), restrict_xpaths=(),
                     tags=('a', 'area'), attrs=('href',), canonicalize=False,
                     unique=True, process_value=None, deny_extensions=None, restrict_css=(),
                     strip=True, restrict_text=None):
            tags, attrs = set(arg_to_iter(tags)), set(arg_to_iter(attrs))
            lx = LxmlParserLinkExtractor(
                tag=lambda x: x in tags,
                attr=lambda x: x in attrs,
                unique=unique,
                process=process_value,
                strip=strip,
                canonicalized=canonicalize
            )
    
            super(LxmlLinkExtractor, self).__init__(lx, allow=allow, deny=deny,
                                                    allow_domains=allow_domains, deny_domains=deny_domains,
                                                    restrict_xpaths=restrict_xpaths, restrict_css=restrict_css,
                                                    canonicalize=canonicalize, deny_extensions=deny_extensions,
                                                    restrict_text=restrict_text)
    
        def extract_links(self, response):
            """Returns a list of :class:`~scrapy.link.Link` objects from the
            specified :class:`response <scrapy.http.Response>`.
    
            Only links that match the settings passed to the ``__init__`` method of
            the link extractor are returned.
    
            Duplicate links are omitted.
            """
            base_url = get_base_url(response)
            if self.restrict_xpaths:
            # 编译 xpath 参数
                docs = [subdoc
                        for x in self.restrict_xpaths
                        for subdoc in response.xpath(x)]
            else:
                docs = [response.selector]
            all_links = []
            for doc in docs:
                links = self._extract_links(doc, response.url, response.encoding, base_url)
                all_links.extend(self._process_links(links))
            return unique_list(all_links)
    • allow 正则提取,设定的 rules allow 参数,进行处理

    • deny 正则提取,设定的 rules allow 参数,不处理

    • allow_domains 设定域名下的 url,进行处理

    • deny_domains 设定域名下的 url,不处理

    • restrict_xpaths 指定 xpath 规则,进行提取

    • tags 默认提取标签 a, area

    • attrs (attribute) 默认提取 href 属性中的值

    • restrict_css 指定 css 规则,进行提取 (最终都会被转换为 xpath 进行处理)

      Ps:css 语法是 HTML 支持的, XML 是不支持的,最早 Xpath 是用来提取 XML 的

  • 父类 FilteringLinkExtractor

    class FilteringLinkExtractor:
    
        _csstranslator = HTMLTranslator()
    
        def __new__(cls, *args, **kwargs):
            from scrapy.linkextractors.lxmlhtml import LxmlLinkExtractor
            if (issubclass(cls, FilteringLinkExtractor) and
                    not issubclass(cls, LxmlLinkExtractor)):
                warn('scrapy.linkextractors.FilteringLinkExtractor is deprecated, '
                     'please use scrapy.linkextractors.LinkExtractor instead',
                     ScrapyDeprecationWarning, stacklevel=2)
            return super(FilteringLinkExtractor, cls).__new__(cls)
    
        def __init__(self, link_extractor, allow, deny, allow_domains, deny_domains,
                     restrict_xpaths, canonicalize, deny_extensions, restrict_css, restrict_text):
    
            self.link_extractor = link_extractor
    
            self.allow_res = [x if isinstance(x, _re_type) else re.compile(x)
                              for x in arg_to_iter(allow)]
            self.deny_res = [x if isinstance(x, _re_type) else re.compile(x)
                             for x in arg_to_iter(deny)]
    
            self.allow_domains = set(arg_to_iter(allow_domains))
            self.deny_domains = set(arg_to_iter(deny_domains))
    
            self.restrict_xpaths = tuple(arg_to_iter(restrict_xpaths))
            self.restrict_xpaths += tuple(map(self._csstranslator.css_to_xpath,
                                              arg_to_iter(restrict_css)))
    
            self.canonicalize = canonicalize
            if deny_extensions is None:
                deny_extensions = IGNORED_EXTENSIONS
            self.deny_extensions = {'.' + e for e in arg_to_iter(deny_extensions)}
            self.restrict_text = [x if isinstance(x, _re_type) else re.compile(x)
                                  for x in arg_to_iter(restrict_text)]
    
        def _link_allowed(self, link):
            if not _is_valid_url(link.url):
                return False
            if self.allow_res and not _matches(link.url, self.allow_res):
                return False
            if self.deny_res and _matches(link.url, self.deny_res):
                return False
            parsed_url = urlparse(link.url)
            if self.allow_domains and not url_is_from_any_domain(parsed_url, self.allow_domains):
                return False
            if self.deny_domains and url_is_from_any_domain(parsed_url, self.deny_domains):
                return False
            if self.deny_extensions and url_has_any_extension(parsed_url, self.deny_extensions):
                return False
            if self.restrict_text and not _matches(link.text, self.restrict_text):
                return False
            return True
    
        def matches(self, url):
    
            if self.allow_domains and not url_is_from_any_domain(url, self.allow_domains):
                return False
            if self.deny_domains and url_is_from_any_domain(url, self.deny_domains):
                return False
    
            allowed = (regex.search(url) for regex in self.allow_res) if self.allow_res else [True]
            denied = (regex.search(url) for regex in self.deny_res) if self.deny_res else []
            return any(allowed) and not any(denied)
    
        def _process_links(self, links):
            links = [x for x in links if self._link_allowed(x)]
            if self.canonicalize:
                for link in links:
                    link.url = canonicalize_url(link.url)
            links = self.link_extractor._process_links(links)
            return links
    
        def _extract_links(self, *args, **kwargs):
            return self.link_extractor._extract_links(*args, **kwargs)
    # Top-level imports
    from scrapy.linkextractors.lxmlhtml import LxmlLinkExtractor as LinkExtractor  
  • 引用 HTMLTranslator 类

    class HTMLTranslator(TranslatorMixin, OriginalHTMLTranslator):
        @lru_cache(maxsize=256)
        def css_to_xpath(self, css, prefix='descendant-or-self::'):
            return super(HTMLTranslator, self).css_to_xpath(css, prefix)
    • css 转换 xpath

Author: Ming Hui
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint polocy. If reproduced, please indicate source Ming Hui !