起因,爬虫群有人询问 requests 抓取结果打印是乱码怎么解决,于是有其他人指点按照官方的文档,用r.encoding
查看默认的解码方式,然后通过r.encoding = 'utf-8'
修改,成功解码并打印。
但 request 是怎么获得编码方式的?搜索到Python + Requests 编码问题和Python+Requests编码识别Bug得知 requests 有三种获取编码的方法,get_encodings_from_content
从 响应的内容 中获取编码,get_encoding_from_headers
从 HTTP 响应头部获取编码,chardet.detect
使用工具从响应内容自动检测。
get_encodings_from_content
utils.py
中定义,通过正则表达式,从回复的内容中获取编码方式。比如从 HTML head 的 meta 中。
def get_encodings_from_content(content):
charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
return (charset_re.findall(content) +
pragma_re.findall(content) +
xml_re.findall(content))
get_encoding_from_headers
utils.py
中定义,根据 RFC 2616,如果 HTTP 头部 Content-Type
中的 MIME 是 text/*,且没有设置 charset,假设编码方式为ISO-8859-1
。如果有 charset,直接获取 编码方式。
def get_encoding_from_headers(headers):
"""Returns encodings from given HTTP Header Dict.
:param headers: dictionary to extract encoding from.
"""
content_type = headers.get('content-type')
if not content_type:
return None
content_type, params = cgi.parse_header(content_type)
if 'charset' in params:
return params['charset'].strip("'\"")
if 'text' in content_type:
return 'ISO-8859-1'
chardet.detect
在requests.packages
包里,chardet.__init__.py
,猜测编码
def detect(aBuf):
if ((version_info < (3, 0) and isinstance(aBuf, unicode)) or
(version_info >= (3, 0) and not isinstance(aBuf, bytes))):
raise ValueError('Expected a bytes object, not a unicode object')
from . import universaldetector
u = universaldetector.UniversalDetector()
u.reset()
u.feed(aBuf)
u.close()
return u.result
universaldetector.py
,编码自动检测工具,但看了一下,检测功能并不完全。原本想拿来检测文件编码,但不支持gbk
。
######################## BEGIN LICENSE BLOCK ########################
# The Original Code is Mozilla Universal charset detector code.
#
# The Initial Developer of the Original Code is
# Netscape Communications Corporation.
# Portions created by the Initial Developer are Copyright (C) 2001
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# Mark Pilgrim - port to Python
# Shy Shalom - original C code
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
# 02110-1301 USA
######################### END LICENSE BLOCK #########################
from . import constants
import sys
import codecs
from .latin1prober import Latin1Prober # windows-1252
from .mbcsgroupprober import MBCSGroupProber # multi-byte character sets
from .sbcsgroupprober import SBCSGroupProber # single-byte character sets
from .escprober import EscCharSetProber # ISO-2122, etc.
import re
MINIMUM_THRESHOLD = 0.20
ePureAscii = 0
eEscAscii = 1
eHighbyte = 2
class UniversalDetector:
def __init__(self):
self._highBitDetector = re.compile(b'[\x80-\xFF]')
self._escDetector = re.compile(b'(\033|~{)')
self._mEscCharSetProber = None
self._mCharSetProbers = []
self.reset()
def reset(self):
self.result = {'encoding': None, 'confidence': 0.0}
self.done = False
self._mStart = True
self._mGotData = False
self._mInputState = ePureAscii
self._mLastChar = b''
if self._mEscCharSetProber:
self._mEscCharSetProber.reset()
for prober in self._mCharSetProbers:
prober.reset()
def feed(self, aBuf):
if self.done:
return
aLen = len(aBuf)
if not aLen:
return
if not self._mGotData:
# If the data starts with BOM, we know it is UTF
if aBuf[:3] == codecs.BOM_UTF8:
# EF BB BF UTF-8 with BOM
self.result = {'encoding': "UTF-8-SIG", 'confidence': 1.0}
elif aBuf[:4] == codecs.BOM_UTF32_LE:
# FF FE 00 00 UTF-32, little-endian BOM
self.result = {'encoding': "UTF-32LE", 'confidence': 1.0}
elif aBuf[:4] == codecs.BOM_UTF32_BE:
# 00 00 FE FF UTF-32, big-endian BOM
self.result = {'encoding': "UTF-32BE", 'confidence': 1.0}
elif aBuf[:4] == b'\xFE\xFF\x00\x00':
# FE FF 00 00 UCS-4, unusual octet order BOM (3412)
self.result = {
'encoding': "X-ISO-10646-UCS-4-3412",
'confidence': 1.0
}
elif aBuf[:4] == b'\x00\x00\xFF\xFE':
# 00 00 FF FE UCS-4, unusual octet order BOM (2143)
self.result = {
'encoding': "X-ISO-10646-UCS-4-2143",
'confidence': 1.0
}
elif aBuf[:2] == codecs.BOM_LE:
# FF FE UTF-16, little endian BOM
self.result = {'encoding': "UTF-16LE", 'confidence': 1.0}
elif aBuf[:2] == codecs.BOM_BE:
# FE FF UTF-16, big endian BOM
self.result = {'encoding': "UTF-16BE", 'confidence': 1.0}
self._mGotData = True
if self.result['encoding'] and (self.result['confidence'] > 0.0):
self.done = True
return
if self._mInputState == ePureAscii:
if self._highBitDetector.search(aBuf):
self._mInputState = eHighbyte
elif ((self._mInputState == ePureAscii) and
self._escDetector.search(self._mLastChar + aBuf)):
self._mInputState = eEscAscii
self._mLastChar = aBuf[-1:]
if self._mInputState == eEscAscii:
if not self._mEscCharSetProber:
self._mEscCharSetProber = EscCharSetProber()
if self._mEscCharSetProber.feed(aBuf) == constants.eFoundIt:
self.result = {'encoding': self._mEscCharSetProber.get_charset_name(),
'confidence': self._mEscCharSetProber.get_confidence()}
self.done = True
elif self._mInputState == eHighbyte:
if not self._mCharSetProbers:
self._mCharSetProbers = [MBCSGroupProber(), SBCSGroupProber(),
Latin1Prober()]
for prober in self._mCharSetProbers:
if prober.feed(aBuf) == constants.eFoundIt:
self.result = {'encoding': prober.get_charset_name(),
'confidence': prober.get_confidence()}
self.done = True
break
def close(self):
if self.done:
return
if not self._mGotData:
if constants._debug:
sys.stderr.write('no data received!\n')
return
self.done = True
if self._mInputState == ePureAscii:
self.result = {'encoding': 'ascii', 'confidence': 1.0}
return self.result
if self._mInputState == eHighbyte:
proberConfidence = None
maxProberConfidence = 0.0
maxProber = None
for prober in self._mCharSetProbers:
if not prober:
continue
proberConfidence = prober.get_confidence()
if proberConfidence > maxProberConfidence:
maxProberConfidence = proberConfidence
maxProber = prober
if maxProber and (maxProberConfidence > MINIMUM_THRESHOLD):
self.result = {'encoding': maxProber.get_charset_name(),
'confidence': maxProber.get_confidence()}
return self.result
if constants._debug:
sys.stderr.write('no probers hit minimum threshhold\n')
for prober in self._mCharSetProbers[0].mProbers:
if not prober:
continue
sys.stderr.write('%s confidence = %s\n' %
(prober.get_charset_name(),
prober.get_confidence()))
These constants define various encodings of the Unicode byte order mark (BOM) used in UTF-16 and UTF-32 data streams to indicate the byte order used in the stream or file and in UTF-8 as a Unicode signature.
三种方式的调用
查看源代码,梳理了一下 requests 对这三种获取编码方法的使用。
- requests 得到响应后,自动调用
utils.py
中定义的get_encoding_from_headers
获取编码方式。 - 当我们使用
r.text
方法时,会检查encoding
是否为空,如果为空,使用charset
自动检测,也就是说,charset
不会被使用,因为始终会被设置,默认是ISO-8859-1
。 -
utils.py
中定义的get_encodings_from_content
工具则没有出现在响应处理过程中。
如何手动调用?
import requests
r = requests.get('https://www.python.org')
print r.encoding
# utf-8
print requests.utils.get_encodings_from_content(r.content)
# ['utf-8']
print r.apparent_encoding
# ISO-8859-2
可以在获取r.text
前,将手动调用的结果赋值给r.encoding
。或者r.encoding = 'utf-8'
直接设置解码方式。
requests 编码相关的完整调用过程
这里是定义的顺序,调用的顺序建议从最后向前看
models.py
定义主要的对象
# 导入了自动检测的工具
from .compat import (chardet)
# 定义 Request 类
class Request(RequestHooksMixin):
"""A user-created :class:`Request <Request>` object.
Used to prepare a :class:`PreparedRequest <PreparedRequest>`, which is sent to the server.
Usage::
>>> import requests
>>> req = requests.Request('GET', 'http://httpbin.org/get')
>>> req.prepare()
<PreparedRequest [GET]>
"""
def __init__(self, method=None, url=None, headers=None, files=None,
data=None, params=None, auth=None, cookies=None, hooks=None, json=None):
# Default empty dicts for dict params.
data = [] if data is None else data
files = [] if files is None else files
headers = {} if headers is None else headers
params = {} if params is None else params
hooks = {} if hooks is None else hooks
self.hooks = default_hooks()
for (k, v) in list(hooks.items()):
self.register_hook(event=k, hook=v)
self.method = method
self.url = url
self.headers = headers
self.files = files
self.data = data
self.json = json
self.params = params
self.auth = auth
self.cookies = cookies
def __repr__(self):
return '<Request [%s]>' % (self.method)
def prepare(self):
"""Constructs a :class:`PreparedRequest <PreparedRequest>` for transmission and returns it."""
p = PreparedRequest()
p.prepare(
method=self.method,
url=self.url,
headers=self.headers,
files=self.files,
data=self.data,
json=self.json,
params=self.params,
auth=self.auth,
cookies=self.cookies,
hooks=self.hooks,
)
return p
# 定义 Response 类,最终我们是在使用这个类的实例方法`text``apparent_encoding`等
class Response(object):
"""The :class:`Response <Response>` object, which contains a
server's response to an HTTP request.
"""
__attrs__ = [
'_content', 'status_code', 'headers', 'url', 'history',
'encoding', 'reason', 'cookies', 'elapsed', 'request'
]
def __init__(self):
super(Response, self).__init__()
#: Encoding to decode with when accessing r.text.
self.encoding = None
@property
def apparent_encoding(self):
"""The apparent encoding, provided by the chardet library"""
return chardet.detect(self.content)['encoding']
@property
def text(self):
"""Content of the response, in unicode.
If Response.encoding is None, encoding will be guessed using
``chardet``.
The encoding of the response content is determined based solely on HTTP
headers, following RFC 2616 to the letter. If you can take advantage of
non-HTTP knowledge to make a better guess at the encoding, you should
set ``r.encoding`` appropriately before accessing this property.
"""
# Try charset from content-type
content = None
encoding = self.encoding
if not self.content:
return str('')
# Fallback to auto-detected encoding.
if self.encoding is None:
encoding = self.apparent_encoding
# Decode unicode from given encoding.
try:
content = str(self.content, encoding, errors='replace')
except (LookupError, TypeError):
# A LookupError is raised if the encoding was not found which could
# indicate a misspelling or similar mistake.
#
# A TypeError can be raised if encoding is None
#
# So we try blindly encoding.
content = str(self.content, errors='replace')
return content
utils.py
,定义了一些工具,包括get_encodings_from_content
从 HTML 页面获取编码,get_encoding_from_headers
从 HTTP 头部获取编码
def get_encodings_from_content(content):
"""Returns encodings from given content string.
:param content: bytestring to extract encodings from.
"""
warnings.warn((
'In requests 3.0, get_encodings_from_content will be removed. For '
'more information, please see the discussion on issue #2266. (This'
' warning should only appear once.)'),
DeprecationWarning)
charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
return (charset_re.findall(content) +
pragma_re.findall(content) +
xml_re.findall(content))
def get_encoding_from_headers(headers):
"""Returns encodings from given HTTP Header Dict.
:param headers: dictionary to extract encoding from.
"""
content_type = headers.get('content-type')
if not content_type:
return None
content_type, params = cgi.parse_header(content_type)
if 'charset' in params:
return params['charset'].strip("'\"")
if 'text' in content_type:
return 'ISO-8859-1'
adapters.py
,定义发送 request 并构造 response 的适配器
from .utils import (get_encoding_from_headers,)
class HTTPAdapter(BaseAdapter):
"""The built-in HTTP Adapter for urllib3.
Provides a general-case interface for Requests sessions to contact HTTP and
HTTPS urls by implementing the Transport Adapter interface. This class will
usually be created by the :class:`Session <Session>` class under the
covers.
Usage::
>>> import requests
>>> s = requests.Session()
>>> a = requests.adapters.HTTPAdapter(max_retries=3)
>>> s.mount('http://', a)
"""
__attrs__ = ['max_retries', 'config', '_pool_connections', '_pool_maxsize',
'_pool_block']
# 用返回的回复和 Response() 的实例构造一个 response 对象,这里调用了`utils.py`中的`get_encoding_from_headers`设置编码
def build_response(self, req, resp):
"""Builds a :class:`Response <requests.Response>` object from a urllib3
response. This should not be called from user code, and is only exposed
for use when subclassing the
:class:`HTTPAdapter <requests.adapters.HTTPAdapter>`
:param req: The :class:`PreparedRequest <PreparedRequest>` used to generate the response.
:param resp: The urllib3 response object.
"""
response = Response()
# Fallback to None if there's no status_code, for whatever reason.
response.status_code = getattr(resp, 'status', None)
# Make headers case-insensitive.
response.headers = CaseInsensitiveDict(getattr(resp, 'headers', {}))
# Set encoding.
response.encoding = get_encoding_from_headers(response.headers)
response.raw = resp
response.reason = response.raw.reason
if isinstance(req.url, bytes):
response.url = req.url.decode('utf-8')
else:
response.url = req.url
# Add new cookies from the server.
extract_cookies_to_jar(response.cookies, req, resp)
# Give the Response some context.
response.request = req
response.connection = self
return response
# 发送请求,返回构造的 response 对象
def send(self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None):
"""Sends PreparedRequest object. Returns Response object.
"""
conn = self.get_connection(request.url, proxies)
self.cert_verify(conn, request.url, verify, cert)
url = self.request_url(request, proxies)
self.add_headers(request)
chunked = not (request.body is None or 'Content-Length' in request.headers)
if isinstance(timeout, tuple):
try:
connect, read = timeout
timeout = TimeoutSauce(connect=connect, read=read)
except ValueError as e:
# this may raise a string formatting error.
err = ("Invalid timeout {0}. Pass a (connect, read) "
"timeout tuple, or a single float to set "
"both timeouts to the same value".format(timeout))
raise ValueError(err)
else:
timeout = TimeoutSauce(connect=timeout, read=timeout)
try:
if not chunked:
resp = conn.urlopen(
method=request.method,
url=url,
body=request.body,
headers=request.headers,
redirect=False,
assert_same_host=False,
preload_content=False,
decode_content=False,
retries=self.max_retries,
timeout=timeout
)
# Send the request.
else:
if hasattr(conn, 'proxy_pool'):
conn = conn.proxy_pool
low_conn = conn._get_conn(timeout=DEFAULT_POOL_TIMEOUT)
try:
low_conn.putrequest(request.method,
url,
skip_accept_encoding=True)
for header, value in request.headers.items():
low_conn.putheader(header, value)
low_conn.endheaders()
for i in request.body:
low_conn.send(hex(len(i))[2:].encode('utf-8'))
low_conn.send(b'\r\n')
low_conn.send(i)
low_conn.send(b'\r\n')
low_conn.send(b'0\r\n\r\n')
# Receive the response from the server
try:
# For Python 2.7+ versions, use buffering of HTTP
# responses
r = low_conn.getresponse(buffering=True)
except TypeError:
# For compatibility with Python 2.6 versions and back
r = low_conn.getresponse()
resp = HTTPResponse.from_httplib(
r,
pool=conn,
connection=low_conn,
preload_content=False,
decode_content=False
)
except:
# If we hit any problems here, clean up the connection.
# Then, reraise so that we can handle the actual exception.
low_conn.close()
raise
return self.build_response(request, resp)
sessions.py
from .adapters import HTTPAdapter # 导入 HTTP 适配器
class Session(SessionRedirectMixin):
"""A Requests session.
Provides cookie persistence, connection-pooling, and configuration.
Basic Usage::
>>> import requests
>>> s = requests.Session()
>>> s.get('http://httpbin.org/get')
<Response [200]>
Or as a context manager::
>>> with requests.Session() as s:
>>> s.get('http://httpbin.org/get')
<Response [200]>
"""
__attrs__ = [
'headers', 'cookies', 'auth', 'proxies', 'hooks', 'params', 'verify',
'cert', 'prefetch', 'adapters', 'stream', 'trust_env',
'max_redirects',
]
def __init__(self):
#: A case-insensitive dictionary of headers to be sent on each
#: :class:`Request <Request>` sent from this
#: :class:`Session <Session>`.
self.headers = default_headers()
# Default connection adapters.
self.adapters = OrderedDict()
self.mount('https://', HTTPAdapter()) # https 的适配器
self.mount('http://', HTTPAdapter()) # http 的适配器
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def prepare_request(self, request):
"""Constructs a :class:`PreparedRequest <PreparedRequest>` for
transmission and returns it. The :class:`PreparedRequest` has settings
merged from the :class:`Request <Request>` instance and those of the
:class:`Session`.
:param request: :class:`Request` instance to prepare with this
session's settings.
"""
cookies = request.cookies or {}
# Bootstrap CookieJar.
if not isinstance(cookies, cookielib.CookieJar):
cookies = cookiejar_from_dict(cookies)
# Merge with session cookies
merged_cookies = merge_cookies(
merge_cookies(RequestsCookieJar(), self.cookies), cookies)
# Set environment's basic authentication if not explicitly set.
auth = request.auth
if self.trust_env and not auth and not self.auth:
auth = get_netrc_auth(request.url)
p = PreparedRequest()
p.prepare(
method=request.method.upper(),
url=request.url,
files=request.files,
data=request.data,
json=request.json,
headers=merge_setting(request.headers, self.headers, dict_class=CaseInsensitiveDict),
params=merge_setting(request.params, self.params),
auth=merge_setting(auth, self.auth),
cookies=merged_cookies,
hooks=merge_hooks(request.hooks, self.hooks),
)
return p
# 创建请求,调用 self.send,返回回复
def request(self, method, url,
params=None,
data=None,
headers=None,
cookies=None,
files=None,
auth=None,
timeout=None,
allow_redirects=True,
proxies=None,
hooks=None,
stream=None,
verify=None,
cert=None,
json=None):
"""Constructs a :class:`Request <Request>`, prepares it and sends it.
Returns :class:`Response <Response>` object.
"""
# Create the Request.
req = Request(
method = method.upper(),
url = url,
headers = headers,
files = files,
data = data or {},
json = json,
params = params or {},
auth = auth,
cookies = cookies,
hooks = hooks,
)
prep = self.prepare_request(req)
proxies = proxies or {}
settings = self.merge_environment_settings(
prep.url, proxies, stream, verify, cert
)
# Send the request.
send_kwargs = {
'timeout': timeout,
'allow_redirects': allow_redirects,
}
send_kwargs.update(settings)
resp = self.send(prep, **send_kwargs)
return resp
# 获取适配器,并调用适配器的 send 方法发送请求,得到回复
def send(self, request, **kwargs):
"""Send a given PreparedRequest."""
# Get the appropriate adapter to use
adapter = self.get_adapter(url=request.url)
# Start time (approximately) of the request
start = datetime.utcnow()
# Send the request
r = adapter.send(request, **kwargs)
return r
def get_adapter(self, url):
"""Returns the appropriate connection adapter for the given URL."""
for (prefix, adapter) in self.adapters.items():
if url.lower().startswith(prefix):
return adapter
# Nothing matches :-/
raise InvalidSchema("No connection adapters were found for '%s'" % url)
def close(self):
"""Closes all adapters and as such the session"""
for v in self.adapters.values():
v.close()
def mount(self, prefix, adapter):
"""Registers a connection adapter to a prefix.
Adapters are sorted in descending order by key length."""
self.adapters[prefix] = adapter
keys_to_move = [k for k in self.adapters if len(k) < len(prefix)]
for key in keys_to_move:
self.adapters[key] = self.adapters.pop(key)
api.py
,提供供用户调用的 API
# -*- coding: utf-8 -*-
"""
requests.api
~~~~~~~~~~~~
This module implements the Requests API.
:copyright: (c) 2012 by Kenneth Reitz.
:license: Apache2, see LICENSE for more details.
"""
from . import sessions
def request(method, url, **kwargs):
"""Constructs and sends a :class:`Request <Request>`.
:param method: method for the new :class:`Request` object.
:param url: URL for the new :class:`Request` object.
:param params: (optional) Dictionary or bytes to be sent in the query string for the :class:`Request`.
:param data: (optional) Dictionary, bytes, or file-like object to send in the body of the :class:`Request`.
:return: :class:`Response <Response>` object
:rtype: requests.Response
Usage::
>>> import requests
>>> req = requests.request('GET', 'http://httpbin.org/get')
<Response [200]>
"""
# By using the 'with' statement we are sure the session is closed, thus we
# avoid leaving sockets open which can trigger a ResourceWarning in some
# cases, and look like a memory leak in others.
with sessions.Session() as session:
return session.request(method=method, url=url, **kwargs)
def get(url, params=None, **kwargs):
"""Sends a GET request.
:param url: URL for the new :class:`Request` object.
:param params: (optional) Dictionary or bytes to be sent in the query string for the :class:`Request`.
:param \*\*kwargs: Optional arguments that ``request`` takes.
:return: :class:`Response <Response>` object
:rtype: requests.Response
"""
kwargs.setdefault('allow_redirects', True)
return request('get', url, params=params, **kwargs)
requests.__init__.py
"""
Requests HTTP library
~~~~~~~~~~~~~~~~~~~~~
Requests is an HTTP library, written in Python, for human beings. Basic GET
usage:
>>> import requests
>>> r = requests.get('https://www.python.org')
>>> r.status_code
200
>>> 'Python is a programming language' in r.content
True
... or POST:
>>> payload = dict(key1='value1', key2='value2')
>>> r = requests.post('http://httpbin.org/post', data=payload)
>>> print(r.text)
{
...
"form": {
"key2": "value2",
"key1": "value1"
},
...
}
"""
__title__ = 'requests'
__version__ = '2.10.0'
__build__ = 0x021000
__author__ = 'Kenneth Reitz'
__license__ = 'Apache 2.0'
__copyright__ = 'Copyright 2016 Kenneth Reitz'
from . import utils
from .models import Request, Response, PreparedRequest
from .api import request, get, head, post, patch, put, delete, options
网友评论