All regression tests now pass with the low-level interface

master
Stephane Bortzmeyer 10 years ago
parent 93d5fffbb9
commit 677548c9db
  1. 24
      DNSLG/Formatter.py
  2. 75
      DNSLG/Resolver.py
  3. 86
      DNSLG/__init__.py
  4. 2
      JSON.txt
  5. 3
      README
  6. 10
      tests.sh

@ -88,6 +88,8 @@ class TextFormatter(Formatter):
(rdata.key_tag, rdata.digest_type)
elif rdata.rdtype == dns.rdatatype.RRSIG:
pass # Should we show signatures?
elif rdata.rdtype == dns.rdatatype.NSEC or rdata.rdtype == dns.rdatatype.NSEC3:
pass # Should we show NSEC*?
elif rdata.rdtype == dns.rdatatype.LOC:
self.output += "Location: longitude %i degrees %i' %i\" latitude %i degrees %i' %i\" altitude %f\n" % \
(rdata.longitude[0], rdata.longitude[1], rdata.longitude[2],
@ -108,6 +110,8 @@ class TextFormatter(Formatter):
# always available on 2012-05-17
pass
self.output += "algorithm %i, flags %i\n" % (rdata.algorithm, rdata.flags)
elif rdata.rdtype == dns.rdatatype.NSEC3PARAM:
self.output += "NSEC3PARAM: algorithm %i, iterations %i\n" % (rdata.algorithm, rdata.iterations) # TODO format salt (tagged as string but actually binaty)
elif rdata.rdtype == dns.rdatatype.SSHFP:
self.output += "SSH fingerprint: algorithm %i, digest type %i, fingerprint %s\n" % \
(rdata.algorithm, rdata.fp_type, to_hexstring(rdata.fingerprint))
@ -119,7 +123,10 @@ class TextFormatter(Formatter):
rdata.regexp, str(rdata.replacement), rdata.service)
else:
self.output += "Unknown record type %i: (DATA)\n" % rdata.rdtype
self.output += "TTL: %i\n" % rrset.ttl
if rdata.rdtype != dns.rdatatype.RRSIG and \
rdata.rdtype != dns.rdatatype.NSEC and \
rdata.rdtype != dns.rdatatype.NSEC3:
self.output += "TTL: %i\n" % rrset.ttl
self.output += "Resolver queried: %s\n" % querier.resolver.nameservers[0]
self.output += "Query done at: %s\n" % time.strftime("%Y-%m-%d %H:%M:%SZ",
time.gmtime(time.time()))
@ -158,7 +165,8 @@ class ZoneFormatter(Formatter):
for rrset in answer.answer:
for rdata in rrset:
# TODO: do not hardwire the class
self.output += "%s\tIN\t" % answer.name # TODO: do not repeat the name if there is a RRset
if rdata.rdtype != dns.rdatatype.RRSIG:
self.output += "%s\tIN\t" % answer.qname # TODO: do not repeat the name if there is a RRset
# TODO: it could use some refactoring: most (but _not all_) of types
# use the same code.
if rdata.rdtype == dns.rdatatype.A:
@ -203,7 +211,8 @@ class ZoneFormatter(Formatter):
else:
# dnspython dumps the types it knows. TODO: uses that?
self.output += "TYPE%i ; DATA %s\n" % (rdata.rdtype, rdata.to_text())
self.output += "; TTL: %i\n" % rrset.ttl
if rdata.rdtype != dns.rdatatype.RRSIG:
self.output += "; TTL: %i\n\n" % rrset.ttl # TODO: put it in the zone, not as a comment
self.output += "\n; Server: %s\n" % querier.resolver.nameservers[0]
self.output += "; When: %s\n" % time.strftime("%Y-%m-%d %H:%M:%SZ",
time.gmtime(time.time()))
@ -268,7 +277,7 @@ class JsonFormatter(Formatter):
'Minimum': rdata.minimum,
})
elif rdata.rdtype == dns.rdatatype.NS:
self.object['AnswerSection'].append({'Type': 'NS', 'Name': str(rdata.target)})
self.object['AnswerSection'].append({'Type': 'NS', 'Target': str(rdata.target)})
elif rdata.rdtype == dns.rdatatype.DNSKEY:
returned_object = {'Type': 'DNSKEY',
'Algorithm': rdata.algorithm,
@ -309,8 +318,9 @@ class JsonFormatter(Formatter):
'Weight': rdata.weight})
else:
self.object['AnswerSection'].append({'Type': "unknown"}) # TODO: the type number
self.object['AnswerSection'][-1]['TTL'] = rrset.ttl
self.object['AnswerSection'][-1]['Name'] = str(rrset.name)
if rdata.rdtype != dns.rdatatype.RRSIG:
self.object['AnswerSection'][-1]['TTL'] = rrset.ttl
self.object['AnswerSection'][-1]['Name'] = str(rrset.name)
try:
duration = querier.delay.total_seconds()
except AttributeError: # total_seconds appeared only with Python 2.7
@ -328,7 +338,7 @@ class JsonFormatter(Formatter):
def result(self, querier):
return json.dumps(self.object) + "\n"
return json.dumps(self.object, indent=True) + "\n"
# XML

@ -4,20 +4,39 @@ import dns.message
import dns.resolver
import Answer
DEFAULT_EDNS_SIZE=2048
class Timeout(Exception):
pass
class NoSuchDomainName(Exception):
pass
class UnknownError(Exception):
class NoNameservers(Exception):
pass
class NoPositiveAnswer(Exception):
pass
class Refused(Exception):
pass
class Servfail(Exception):
pass
class UnknownRRtype(Exception):
pass
class UnknownError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class Resolver():
def __init__(self, nameservers=None, maximum=3, timeout=0.5,
edns_version=0, edns_payload=4096):
# TODO CRIT: DNSSEC
edns_version=0, edns_payload=DEFAULT_EDNS_SIZE, do_dnssec=False):
# TODO: ednsflags such as NSID
""" A "None" value for the parameter nameservers means to use
the system's default resolver(s). Otherwise, this parameter
@ -27,6 +46,7 @@ class Resolver():
self.timeout = timeout
self.original_edns = edns_version
self.original_payload = edns_payload
self.original_do = do_dnssec
if nameservers is None:
self.original_nameservers = dns.resolver.get_default_resolver().nameservers
else:
@ -35,31 +55,52 @@ class Resolver():
self.edns = self.original_edns
self.payload = self.original_payload
self.nameservers = self.original_nameservers
self.do = self.original_do
def query(self, name, type, tcp=False):
# TODO CRIT : TCP
def query(self, name, type, tcp=False, cd=False):
""" The returned value is a DNSLG.Answer """
if len(self.nameservers) == 0:
raise NoNameservers()
for ns in self.nameservers:
try:
message = dns.message.make_query(name, type,
use_edns=self.edns, payload=self.payload,
want_dnssec=True)
want_dnssec=self.do)
except TypeError: # Old DNS Python... Code here just as long as it lingers in some places
message = dns.message.make_query(name, type, use_edns=0,
want_dnssec=True)
message.payload = 4096
try:
message = dns.message.make_query(name, type, use_edns=self.edns,
want_dnssec=self.do)
except dns.rdatatype.UnknownRdatatype:
raise UnknownRRtype()
message.payload = DEFAULT_EDNS_SIZE
except dns.rdatatype.UnknownRdatatype:
raise UnknownRRtype()
if cd:
message.flags |= dns.flags.CD
done = False
tests = 0
while not done and tests < self.maximum:
try:
msg = dns.query.udp(message, ns, timeout=self.timeout)
if not tcp:
msg = dns.query.udp(message, ns, timeout=self.timeout)
else:
msg = dns.query.tcp(message, ns, timeout=self.timeout)
if msg.rcode() == dns.rcode.NOERROR:
done = True
elif msg.rcode() == dns.rcode.NXDOMAIN:
raise NoSuchDomainName()
# TODO CRIT: if REFUSED or SERVFAIL, tries the next resolver?
elif msg.rcode() == dns.rcode.REFUSED:
if len(self.nameservers) == 1:
raise Refused()
else:
break
elif msg.rcode() == dns.rcode.SERVFAIL:
if len(self.nameservers) == 1:
raise Servfail()
else:
break
else:
raise UnknownError(msg.rcode)
raise UnknownError(msg.rcode())
except dns.exception.Timeout:
tests += 1
if done:
@ -68,13 +109,16 @@ class Resolver():
response.nameserver = ns
response.qname = name
return response
elif len(self.nameservers) == 1:
raise Timeout()
# If we are still here, it means no name server answers
raise Timeout()
raise NoPositiveAnswer()
def set_edns(self, version=0, payload=4096, dnssec=False):
def set_edns(self, version=0, payload=DEFAULT_EDNS_SIZE, dnssec=False):
""" version=0 means EDNS0, the original one. Use -1 for no EDNS """
self.edns = version
self.payload = None
self.payload = payload
self.do = dnssec
def set_nameservers(self, nameservers):
self.nameservers = nameservers
@ -83,3 +127,4 @@ class Resolver():
self.edns = self.original_edns
self.payload = self.original_payload
self.nameservers = self.original_nameservers
self.do = self.original_do

@ -102,7 +102,7 @@ Disallow: /
return [output]
def query(self, start_response, path, client, format="HTML", alt_resolver=None,
do_dnssec=False, tcp=False, edns_size=default_edns_size,
do_dnssec=False, tcp=False, cd=False, edns_size=default_edns_size,
reverse=False):
""" path must starts with a /, then the domain name then an
(optional) / followed by the QTYPE """
@ -181,12 +181,6 @@ Disallow: /
output, plaintype)
return [output]
# Pseudo-qtype ADDR is handled specially later
# Alas, dnspython does not react properly to QTYPE=ANY :-(
if qtype == "ANY":
output = "dnspython does not support ANY queries, sorry\n"
send_response(start_response, '400 Bad record type', output,
plaintype)
return [output]
if not domain.endswith('.'):
domain += '.'
if domain == 'root.':
@ -210,22 +204,22 @@ Disallow: /
formatter = Formatter.XmlFormatter(domain)
self.resolver.reset()
if do_dnssec:
self.resolver.use_edns(0, edns_size)
self.resolver.set_edns(dnssec=True)
if alt_resolver:
self.resolver.set_nameservers([alt_resolver,])
query_start = datetime.now()
if qtype != "ADDR":
answer = self.resolver.query(qdomain, qtype, tcp=tcp)
answer = self.resolver.query(qdomain, qtype, tcp=tcp, cd=cd)
else:
# TODO CRIT refaire completement
try:
answers = self.resolver.query(qdomain, "A", tcp=tcp)
answer = self.resolver.query(qdomain, "A", tcp=tcp, cd=cd)
except dns.resolver.NoAnswer:
answer = None
try:
answers = self.resolver.query(qdomain, "AAAA", tcp=tcp)
if answer is not None:
answer.rrsets.append(answers.rrset)
answer_bis = self.resolver.query(qdomain, "AAAA", tcp=tcp, cd=cd)
if answer_bis is not None:
for rrset in answer_bis.answer:
answer.answer.append(rrset)
except dns.resolver.NoAnswer:
pass
# TODO: what if flags are different with A and AAAA? (Should not happen)
@ -241,42 +235,43 @@ Disallow: /
formatter.format(answer, qtype, answer.flags, self)
output = formatter.result(self)
send_response(start_response, '200 OK', output, mtype)
except dns.resolver.NXDOMAIN:
except Resolver.UnknownRRtype:
output = "Record type %s does not exist\n" % qtype
output = output.encode(self.encoding)
send_response(start_response, '400 Unknown record type', output,
plaintype)
except Resolver.NoSuchDomainName:
output = "Domain %s does not exist\n" % domain
output = output.encode(self.encoding)
# TODO send back HTML if this is the expected format
send_response(start_response, '404 No such domain', output, plaintype)
except dns.resolver.NoNameservers:
output = "No working server for domain %s\n" % domain
except Resolver.Refused:
output = "Refusal to answer for all name servers for %s\n" % domain
output = output.encode(self.encoding)
# TODO send back HTML if this is the expected format
send_response(start_response, '404 No such domain', output, plaintype)
except dns.resolver.Timeout: # dnspython seems to raise this, not only when
# there is an actual timeout, but also when all the authoritative
# servers reply REFUSED.
output = "No server reply for domain %s\n" % domain
send_response(start_response, '403 Refused', output, plaintype)
except Resolver.Servfail:
output = "Server failure for all name servers for %s (may be a DNSSEC validation error)\n" % domain
output = output.encode(self.encoding)
send_response(start_response, '504 Servfail', output, plaintype)
except Resolver.Timeout:
output = "No server replies for domain %s\n" % domain
output = output.encode(self.encoding)
# TODO send back HTML if this is the expected format. In
# that case, do not serialize output.
send_response(start_response, '504 Timeout', output,
"text/plain")
except dns.rdatatype.UnknownRdatatype:
output = "Record type %s does not exist\n" % qtype
except Resolver.NoPositiveAnswer:
output = "No server replies for domain %s\n" % domain
output = output.encode(self.encoding)
send_response(start_response, '400 Unknown record type', output,
plaintype)
except dns.resolver.NoAnswer: # TODO: use raise_on_no_answer=False in query() ?
# It appeared apparently only with dnspython 1.9.
query_end = datetime.now()
self.delay = query_end - query_start
formatter.format(None, qtype, 0, self)
output = formatter.result(self)
send_response(start_response, '200 OK', output, mtype)
# TODO: other exceptions, specially SERVFAIL (for instance
# with bogus DNSSEC like reverseddates-A.test.dnssec-tools.org
# (see issue #4). dnspython apparently returns Timeout :-(
# Fixing this will probably require to switch to the low-level
# interface of DNS Python. See issue #3.
# TODO send back HTML if this is the expected format. In
# that case, do not serialize output.
send_response(start_response, '504 No positive answer', output,
"text/plain")
except Resolver.UnknownError as code:
output = "Unknown error %s resolving %s\n" % (dns.rcode.to_text(int(str(code))), domain)
output = output.encode(self.encoding)
# TODO send back HTML if this is the expected format
send_response(start_response, '500 Unknown server error', output, plaintype)
return [output]
def application(self, environ, start_response):
@ -305,11 +300,20 @@ Disallow: /
dotcp = queries.get("tcp", '')
tcp = not(len(dotcp) == 0 or dotcp[0] == "0" or \
dotcp[0].lower() == "false" or dotcp[0] == "")
# TODO: CD bit. See issue #4
docd = queries.get("cd", '')
cd = not(len(docd) == 0 or docd[0] == "0" or \
docd[0].lower() == "false" or docd[0] == "")
doreverse = queries.get("reverse", '')
reverse = not(len(doreverse) == 0 or doreverse[0] == "0" or \
doreverse[0].lower() == "false" or doreverse[0] == "")
buffersize = int(queries.get("buffersize", [default_edns_size])[0])
if cd:
if not do_dnssec:
output = "Incompatible arguments"
send_response(start_response, '400 CD is meaningful only for DNSSEC',
output, plaintype)
return [output]
if buffersize == 0:
if do_dnssec:
output = "Buffer size = 0"
@ -342,7 +346,7 @@ Disallow: /
pure_path = path[len(self.base_url):]
# TODO: content negotiation? Find the output format from Accept headers?
return self.query(start_response, pure_path, client, format, resolver,
do_dnssec, tcp, edns_size, reverse)
do_dnssec, tcp, cd, edns_size, reverse)
else:
return self.default(start_response, path)

@ -48,7 +48,7 @@ MX:
* MailExchanger
NS:
* Name
* Target
PTR:
* Target

@ -68,6 +68,9 @@ To activate DNSSEC in the responses (to send the DO bit), use option
dodnssec=1 in the URL. This option will allow you (if the resolver
supports it) to see the AD (Authentic Data) flag.
To disable DNSSEC validation (if the resolver does it and you don't
want it), use option cd=1 (cd = Checking Disabled)
To use TCP (instead of UDP) for the request, use option tcp=1 in the
URL.

@ -175,6 +175,7 @@ echo ""
echo Options
for format in text zone xml html json; do
${WEB} ${URL}/afnic.fr/SOA?format=${format}\&dodnssec=1
${WEB} ${URL}/afnic.fr/SOA?format=${format}\&dodnssec=1\&cd=1
${WEB} ${URL}/afnic.fr/SOA?format=${format}\&tcp=1
${WEB} ${URL}/afnic.fr/SOA?format=${format}\&buffersize=0
${WEB} ${URL}/afnic.fr/SOA?format=${format}\&buffersize=512
@ -198,13 +199,14 @@ echo ""
echo "Test with invalid (DNSSEC) domains"
for domain in www.dnssec-failed.org. reverseddates-A.test.dnssec-tools.org; do
${WEB} ${URL}/${domain}/SOA?format=text
${WEB} ${URL}/${domain}/A?format=text\&dodnssec=1
${WEB} ${URL}/${domain}/A?format=text\&dodnssec=1\&cd=1
done
delay
# Various HTTP tricks
# This one requires curl, to have custom headers
echo Test methods other than GET (should be refused)
curl --head ${URL}/example.org/A
curl --data STUFF ${URL}/example.org/A
echo "Test methods other than GET (should be refused)"
${WEB} --head ${URL}/example.org/A
${WEB} --data STUFF ${URL}/example.org/A

Loading…
Cancel
Save