169 lines
6.0 KiB
Python
169 lines
6.0 KiB
Python
from __future__ import absolute_import
|
|
from builtins import object
|
|
import copy
|
|
|
|
import dns.message
|
|
import dns.resolver
|
|
import dns.inet
|
|
from . import Answer
|
|
|
|
DEFAULT_EDNS_SIZE=2048
|
|
|
|
class Timeout(Exception):
|
|
pass
|
|
|
|
class NoSuchDomainName(Exception):
|
|
pass
|
|
|
|
class NoNameservers(Exception):
|
|
pass
|
|
|
|
class NoPositiveAnswer(Exception):
|
|
pass
|
|
|
|
class Refused(Exception):
|
|
pass
|
|
|
|
class Servfail(Exception):
|
|
pass
|
|
|
|
class UnknownRRtype(Exception):
|
|
pass
|
|
|
|
class UnknownClass(Exception):
|
|
pass
|
|
|
|
class UnknownError(Exception):
|
|
def __init__(self, value):
|
|
self.value = value
|
|
def __str__(self):
|
|
return repr(self.value)
|
|
|
|
class Resolver(object):
|
|
|
|
def __init__(self, nameservers=None, maximum=3, timeout=1.0,
|
|
edns_version=0, edns_payload=DEFAULT_EDNS_SIZE, do_dnssec=False):
|
|
# TODO: ednsflags such as NSID
|
|
# TODO: the default timeout is too high for authoritative name
|
|
# servers and too low for some far-away recursive: separate
|
|
# the two cases?
|
|
""" A "None" value for the parameter nameservers means to use
|
|
the system's default resolver(s). Otherwise, this parameter
|
|
is an *array* of the IP addresses of the resolvers.
|
|
edns_version=0 means EDNS0, the original one. Use -1 for no EDNS """
|
|
self.maximum = maximum
|
|
self.timeout = timeout
|
|
self.original_edns = edns_version
|
|
self.original_payload = edns_payload
|
|
self.original_do = do_dnssec
|
|
if nameservers is None:
|
|
self.original_nameservers = dns.resolver.get_default_resolver().nameservers
|
|
else:
|
|
# TODO: test it is an iterable? And of length > 0?
|
|
self.original_nameservers = nameservers
|
|
self.edns = self.original_edns
|
|
self.payload = self.original_payload
|
|
self.nameservers = []
|
|
for i in range(0, len(self.original_nameservers)):
|
|
self.nameservers.append(self.original_nameservers[i])
|
|
self.do = self.original_do
|
|
|
|
def query(self, name, type, klass='IN', tcp=False, cd=False):
|
|
""" The returned value is a DNSLG.Answer """
|
|
if len(self.nameservers) == 0:
|
|
raise NoNameservers()
|
|
for ns in self.nameservers:
|
|
try:
|
|
message = dns.message.make_query(name, type, rdclass=klass,
|
|
use_edns=self.edns, payload=self.payload,
|
|
want_dnssec=self.do)
|
|
except TypeError: # Old DNS Python... Code here just as long as it lingers in some places
|
|
try:
|
|
message = dns.message.make_query(name, type, rdclass=klass,
|
|
use_edns=self.edns,
|
|
want_dnssec=self.do)
|
|
except dns.rdatatype.UnknownRdatatype:
|
|
raise UnknownRRtype()
|
|
except dns.rdataclass.UnknownRdataclass:
|
|
raise UnknownClass()
|
|
message.payload = self.payload
|
|
except dns.rdatatype.UnknownRdatatype:
|
|
raise UnknownRRtype()
|
|
except dns.rdataclass.UnknownRdataclass:
|
|
raise UnknownClass()
|
|
if cd:
|
|
message.flags |= dns.flags.CD
|
|
done = False
|
|
tests = 0
|
|
while not done and tests < self.maximum:
|
|
try:
|
|
if not tcp:
|
|
msg = dns.query.udp(message, ns, timeout=self.timeout)
|
|
else:
|
|
msg = dns.query.tcp(message, ns, timeout=self.timeout)
|
|
if msg.rcode() == dns.rcode.NOERROR:
|
|
done = True
|
|
elif msg.rcode() == dns.rcode.NXDOMAIN:
|
|
raise NoSuchDomainName()
|
|
elif msg.rcode() == dns.rcode.REFUSED:
|
|
if len(self.nameservers) == 1:
|
|
raise Refused()
|
|
else:
|
|
break
|
|
elif msg.rcode() == dns.rcode.SERVFAIL:
|
|
if len(self.nameservers) == 1:
|
|
raise Servfail()
|
|
else:
|
|
break
|
|
else:
|
|
raise UnknownError(msg.rcode())
|
|
except dns.exception.Timeout:
|
|
tests += 1
|
|
if done:
|
|
response = copy.copy(msg)
|
|
response.__class__ = Answer.ExtendedAnswer
|
|
response.nameserver = ns
|
|
response.qname = name
|
|
return response
|
|
elif len(self.nameservers) == 1:
|
|
raise Timeout()
|
|
# If we are still here, it means no name server answers
|
|
raise NoPositiveAnswer()
|
|
|
|
def set_edns(self, version=0, payload=DEFAULT_EDNS_SIZE, dnssec=False):
|
|
""" version=0 means EDNS0, the original one. Use -1 for no EDNS """
|
|
self.edns = version
|
|
self.payload = payload
|
|
self.do = dnssec
|
|
|
|
def set_nameservers(self, nameservers):
|
|
self.nameservers = []
|
|
for ns in nameservers:
|
|
try:
|
|
dns.inet.af_for_address(ns)
|
|
except ValueError:
|
|
try:
|
|
res = list(dns.resolver.resolve(ns, 'aaaa'))
|
|
self.nameservers.extend(
|
|
target.to_text() for target in res
|
|
)
|
|
except:
|
|
# TODO: Add error handling here
|
|
pass
|
|
|
|
try:
|
|
res = list(dns.resolver.resolve(ns, 'a'))
|
|
self.nameservers.extend(
|
|
target.to_text() for target in res
|
|
)
|
|
except:
|
|
pass
|
|
else:
|
|
self.nameservers.append(ns)
|
|
|
|
def reset(self):
|
|
self.edns = self.original_edns
|
|
self.payload = self.original_payload
|
|
self.nameservers = self.original_nameservers
|
|
self.do = self.original_do
|