美文网首页
EDNS Client Subnet(ECS)探测

EDNS Client Subnet(ECS)探测

作者: 贰爷 | 来源:发表于2021-06-07 11:37 被阅读0次

    使用python进行EDNS Client Subnet(ECS)解析

    1. ECS是什么?
    2. python 解析域名
    3. 使用python进行edns解析
    4. ECS是什么?
      EDNS的一个字段,网上资料很多,不详细解释
    5. python 解析域名
      使用dnspython中的resolver进行解析域名即可,这里不在赘述
      下面是解析一个域名A记录的例子
    import dns.resolver
    dns_ret = dns.resolver.query("www.qq.com", 'A')
    for i in dns_ret.response.answer:
        for j in i.items:
            print j.address
    
    1. 使用python进行edns解析
      python中的dnspython模块已经支持了edns解析,但是dns.resolver.query并没有对edns做很好的支持,这里通过进程dns.resolver.Resolver来重写query函数,来支持ecs解析,源码如下:
    import time
    import socket
    import random
    import dns
    from dns.resolver import string_types, NoMetaqueries, NoAnswer, NoNameservers, YXDOMAIN, NXDOMAIN, Answer, Resolver
    from dns.edns import ECSOption
    
    class EdnsResolver(Resolver):
        def __init__(self):
            super().__init__()
            self.edns = 0
    
        def query(self, qname, rdtype=dns.rdatatype.A, rdclass=dns.rdataclass.IN,
                  tcp=False, source=None, raise_on_no_answer=True, source_port=0,
                  lifetime=None, edns_option=None, name_servers=None):
            """
            支持edns的query
            :param qname: 同原query
            :param rdtype: 同原query
            :param rdclass: 同原query
            :param tcp: 同原query
            :param source: 同原query
            :param raise_on_no_answer: 同原query
            :param source_port: 同原query
            :param lifetime: 同原query
            :param edns_option: EDNS选项,参考dns.edns.Option
            :param name_servers: 指定dns服务器
            :return: 同原query
            """
            if isinstance(qname, string_types):
                qname = dns.name.from_text(qname, None)
            if isinstance(rdtype, string_types):
                rdtype = dns.rdatatype.from_text(rdtype)
            if dns.rdatatype.is_metatype(rdtype):
                raise NoMetaqueries
            if isinstance(rdclass, string_types):
                rdclass = dns.rdataclass.from_text(rdclass)
            if dns.rdataclass.is_metaclass(rdclass):
                raise NoMetaqueries
            qnames_to_try = []
            if qname.is_absolute():
                qnames_to_try.append(qname)
            else:
                if len(qname) > 1:
                    qnames_to_try.append(qname.concatenate(dns.name.root))
                if self.search:
                    for suffix in self.search:
                        qnames_to_try.append(qname.concatenate(suffix))
                else:
                    qnames_to_try.append(qname.concatenate(self.domain))
            all_nxdomain = True
            nxdomain_responses = {}
            start = time.time()
            _qname = None  # make pylint happy
            for _qname in qnames_to_try:
                if self.cache:
                    answer = self.cache.get((_qname, rdtype, rdclass))
                    if answer is not None:
                        if answer.rrset is None and raise_on_no_answer:
                            raise NoAnswer(response=answer.response)
                        else:
                            return answer
                request = dns.message.make_query(_qname, rdtype, rdclass)
                if self.keyname is not None:
                    request.use_tsig(self.keyring, self.keyname,
                                     algorithm=self.keyalgorithm)
                request.use_edns(options=edns_option)
                if self.flags is not None:
                    request.flags = self.flags
                response = None
                #
                # make a copy of the servers list so we can alter it later.
                #
                if name_servers:
                    nameservers = name_servers
                else:
                    nameservers = self.nameservers[:]
                errors = []
                if self.rotate:
                    random.shuffle(nameservers)
                backoff = 0.10
                while response is None:
                    if len(nameservers) == 0:
                        raise NoNameservers(request=request, errors=errors)
                    for nameserver in nameservers[:]:
                        timeout = self._compute_timeout(start, lifetime)
                        port = self.nameserver_ports.get(nameserver, self.port)
                        try:
                            tcp_attempt = tcp
                            if tcp:
                                response = dns.query.tcp(request, nameserver,
                                                         timeout, port,
                                                         source=source,
                                                         source_port=source_port)
                            else:
                                response = dns.query.udp(request, nameserver,
                                                         timeout, port,
                                                         source=source,
                                                         source_port=source_port)
                                if response.flags & dns.flags.TC:
                                    # Response truncated; retry with TCP.
                                    tcp_attempt = True
                                    timeout = self._compute_timeout(start, lifetime)
                                    response = \
                                        dns.query.tcp(request, nameserver,
                                                      timeout, port,
                                                      source=source,
                                                      source_port=source_port)
                        except (socket.error, dns.exception.Timeout) as ex:
                            #
                            # Communication failure or timeout.  Go to the
                            # next server
                            #
                            errors.append((nameserver, tcp_attempt, port, ex,
                                           response))
                            response = None
                            continue
                        except dns.query.UnexpectedSource as ex:
                            #
                            # Who knows?  Keep going.
                            #
                            errors.append((nameserver, tcp_attempt, port, ex,
                                           response))
                            response = None
                            continue
                        except dns.exception.FormError as ex:
                            #
                            # We don't understand what this server is
                            # saying.  Take it out of the mix and
                            # continue.
                            #
                            nameservers.remove(nameserver)
                            errors.append((nameserver, tcp_attempt, port, ex,
                                           response))
                            response = None
                            continue
                        except EOFError as ex:
                            #
                            # We're using TCP and they hung up on us.
                            # Probably they don't support TCP (though
                            # they're supposed to!).  Take it out of the
                            # mix and continue.
                            #
                            nameservers.remove(nameserver)
                            errors.append((nameserver, tcp_attempt, port, ex,
                                           response))
                            response = None
                            continue
                        rcode = response.rcode()
                        if rcode == dns.rcode.YXDOMAIN:
                            ex = YXDOMAIN()
                            errors.append((nameserver, tcp_attempt, port, ex,
                                           response))
                            raise ex
                        if rcode == dns.rcode.NOERROR or \
                                rcode == dns.rcode.NXDOMAIN:
                            break
                        #
                        # We got a response, but we're not happy with the
                        # rcode in it.  Remove the server from the mix if
                        # the rcode isn't SERVFAIL.
                        #
                        if rcode != dns.rcode.SERVFAIL or not self.retry_servfail:
                            nameservers.remove(nameserver)
                        errors.append((nameserver, tcp_attempt, port,
                                       dns.rcode.to_text(rcode), response))
                        response = None
                    if response is not None:
                        break
                    #
                    # All nameservers failed!
                    #
                    if len(nameservers) > 0:
                        #
                        # But we still have servers to try.  Sleep a bit
                        # so we don't pound them!
                        #
                        timeout = self._compute_timeout(start, lifetime)
                        sleep_time = min(timeout, backoff)
                        backoff *= 2
                        time.sleep(sleep_time)
                if response.rcode() == dns.rcode.NXDOMAIN:
                    nxdomain_responses[_qname] = response
                    continue
                all_nxdomain = False
                break
            if all_nxdomain:
                raise NXDOMAIN(qnames=qnames_to_try, responses=nxdomain_responses)
            answer = Answer(_qname, rdtype, rdclass, response,
                            raise_on_no_answer)
            if self.cache:
                self.cache.put((_qname, rdtype, rdclass), answer)
            return answer
    
        def ecs_query(self, qname, address, name_servers=None, srclen=32, rdtype=dns.rdatatype.A):
            """
            基于ecs封装了一下query,建单实现ecs查询
            :param qname: 带解析域名
            :param address: ecs地址
            :param name_servers: 指定dns
            :param srclen: ecs地址的子网掩码
            :param rdtype: 解析方式
            :return: 同原query
            """
            ecs = ECSOption(address, srclen)  # 子网掩码必须填写32(默认是24),部分dns不支持解析24的C段
            return self.query(qname, rdtype, edns_option=[ecs], name_servers=name_servers)
    

    使用示例

    if __name__ == '__main__':
        resolver = EdnsResolver()
        # x.x.x.x 是要测试的client IP,解析域名时会根据client IP所在的地域进行解析。
        a = resolver.ecs_query('www.qq.com', "x.x.x.x", name_servers=['8.8.8.8'])
        for i in a.response.answer:
            for j in i.items:
                print(j)
    

    相关文章

      网友评论

          本文标题:EDNS Client Subnet(ECS)探测

          本文链接:https://www.haomeiwen.com/subject/fbjvcltx.html