使用python进行EDNS Client Subnet(ECS)解析
- ECS是什么?
- python 解析域名
- 使用python进行edns解析
- ECS是什么?
EDNS的一个字段,网上资料很多,不详细解释 - python 解析域名
使用dnspython中的resolver进行解析域名即可,这里不在赘述
下面是解析一个域名A记录的例子
import dns.resolver
dns_ret = dns.resolver.query("www.qq.com", 'A')
for i in dns_ret.response.answer:
for j in i.items:
print j.address
- 使用python进行edns解析
python中的dnspython模块已经支持了edns解析,但是dns.resolver.query并没有对edns做很好的支持,这里通过进程dns.resolver.Resolver来重写query函数,来支持ecs解析,源码如下:
import time
import socket
import random
import dns
from dns.resolver import string_types, NoMetaqueries, NoAnswer, NoNameservers, YXDOMAIN, NXDOMAIN, Answer, Resolver
from dns.edns import ECSOption
class EdnsResolver(Resolver):
def __init__(self):
super().__init__()
self.edns = 0
def query(self, qname, rdtype=dns.rdatatype.A, rdclass=dns.rdataclass.IN,
tcp=False, source=None, raise_on_no_answer=True, source_port=0,
lifetime=None, edns_option=None, name_servers=None):
"""
支持edns的query
:param qname: 同原query
:param rdtype: 同原query
:param rdclass: 同原query
:param tcp: 同原query
:param source: 同原query
:param raise_on_no_answer: 同原query
:param source_port: 同原query
:param lifetime: 同原query
:param edns_option: EDNS选项,参考dns.edns.Option
:param name_servers: 指定dns服务器
:return: 同原query
"""
if isinstance(qname, string_types):
qname = dns.name.from_text(qname, None)
if isinstance(rdtype, string_types):
rdtype = dns.rdatatype.from_text(rdtype)
if dns.rdatatype.is_metatype(rdtype):
raise NoMetaqueries
if isinstance(rdclass, string_types):
rdclass = dns.rdataclass.from_text(rdclass)
if dns.rdataclass.is_metaclass(rdclass):
raise NoMetaqueries
qnames_to_try = []
if qname.is_absolute():
qnames_to_try.append(qname)
else:
if len(qname) > 1:
qnames_to_try.append(qname.concatenate(dns.name.root))
if self.search:
for suffix in self.search:
qnames_to_try.append(qname.concatenate(suffix))
else:
qnames_to_try.append(qname.concatenate(self.domain))
all_nxdomain = True
nxdomain_responses = {}
start = time.time()
_qname = None # make pylint happy
for _qname in qnames_to_try:
if self.cache:
answer = self.cache.get((_qname, rdtype, rdclass))
if answer is not None:
if answer.rrset is None and raise_on_no_answer:
raise NoAnswer(response=answer.response)
else:
return answer
request = dns.message.make_query(_qname, rdtype, rdclass)
if self.keyname is not None:
request.use_tsig(self.keyring, self.keyname,
algorithm=self.keyalgorithm)
request.use_edns(options=edns_option)
if self.flags is not None:
request.flags = self.flags
response = None
#
# make a copy of the servers list so we can alter it later.
#
if name_servers:
nameservers = name_servers
else:
nameservers = self.nameservers[:]
errors = []
if self.rotate:
random.shuffle(nameservers)
backoff = 0.10
while response is None:
if len(nameservers) == 0:
raise NoNameservers(request=request, errors=errors)
for nameserver in nameservers[:]:
timeout = self._compute_timeout(start, lifetime)
port = self.nameserver_ports.get(nameserver, self.port)
try:
tcp_attempt = tcp
if tcp:
response = dns.query.tcp(request, nameserver,
timeout, port,
source=source,
source_port=source_port)
else:
response = dns.query.udp(request, nameserver,
timeout, port,
source=source,
source_port=source_port)
if response.flags & dns.flags.TC:
# Response truncated; retry with TCP.
tcp_attempt = True
timeout = self._compute_timeout(start, lifetime)
response = \
dns.query.tcp(request, nameserver,
timeout, port,
source=source,
source_port=source_port)
except (socket.error, dns.exception.Timeout) as ex:
#
# Communication failure or timeout. Go to the
# next server
#
errors.append((nameserver, tcp_attempt, port, ex,
response))
response = None
continue
except dns.query.UnexpectedSource as ex:
#
# Who knows? Keep going.
#
errors.append((nameserver, tcp_attempt, port, ex,
response))
response = None
continue
except dns.exception.FormError as ex:
#
# We don't understand what this server is
# saying. Take it out of the mix and
# continue.
#
nameservers.remove(nameserver)
errors.append((nameserver, tcp_attempt, port, ex,
response))
response = None
continue
except EOFError as ex:
#
# We're using TCP and they hung up on us.
# Probably they don't support TCP (though
# they're supposed to!). Take it out of the
# mix and continue.
#
nameservers.remove(nameserver)
errors.append((nameserver, tcp_attempt, port, ex,
response))
response = None
continue
rcode = response.rcode()
if rcode == dns.rcode.YXDOMAIN:
ex = YXDOMAIN()
errors.append((nameserver, tcp_attempt, port, ex,
response))
raise ex
if rcode == dns.rcode.NOERROR or \
rcode == dns.rcode.NXDOMAIN:
break
#
# We got a response, but we're not happy with the
# rcode in it. Remove the server from the mix if
# the rcode isn't SERVFAIL.
#
if rcode != dns.rcode.SERVFAIL or not self.retry_servfail:
nameservers.remove(nameserver)
errors.append((nameserver, tcp_attempt, port,
dns.rcode.to_text(rcode), response))
response = None
if response is not None:
break
#
# All nameservers failed!
#
if len(nameservers) > 0:
#
# But we still have servers to try. Sleep a bit
# so we don't pound them!
#
timeout = self._compute_timeout(start, lifetime)
sleep_time = min(timeout, backoff)
backoff *= 2
time.sleep(sleep_time)
if response.rcode() == dns.rcode.NXDOMAIN:
nxdomain_responses[_qname] = response
continue
all_nxdomain = False
break
if all_nxdomain:
raise NXDOMAIN(qnames=qnames_to_try, responses=nxdomain_responses)
answer = Answer(_qname, rdtype, rdclass, response,
raise_on_no_answer)
if self.cache:
self.cache.put((_qname, rdtype, rdclass), answer)
return answer
def ecs_query(self, qname, address, name_servers=None, srclen=32, rdtype=dns.rdatatype.A):
"""
基于ecs封装了一下query,建单实现ecs查询
:param qname: 带解析域名
:param address: ecs地址
:param name_servers: 指定dns
:param srclen: ecs地址的子网掩码
:param rdtype: 解析方式
:return: 同原query
"""
ecs = ECSOption(address, srclen) # 子网掩码必须填写32(默认是24),部分dns不支持解析24的C段
return self.query(qname, rdtype, edns_option=[ecs], name_servers=name_servers)
使用示例
if __name__ == '__main__':
resolver = EdnsResolver()
# x.x.x.x 是要测试的client IP,解析域名时会根据client IP所在的地域进行解析。
a = resolver.ecs_query('www.qq.com', "x.x.x.x", name_servers=['8.8.8.8'])
for i in a.response.answer:
for j in i.items:
print(j)
网友评论