dns-lg/DNSLG/Formatter.py

1194 lines
68 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
import dns
import dns.dnssec
import dns.version as dnspythonversion
import base64
import platform
import pkg_resources
import time
import sys
import struct
import io
# TODO: Accept explicit requests for DNAME?
# TODO: DANE/TLSA record type. Not yet in DNS Python release
# (committed in the upstream git repository) so not easy...
from . import Answer
def to_hexstring(str):
result = ""
for char in str:
result += ("%x" % char)
return result.upper()
def keylength(alg, key):
""" Returns the length in bits """
if alg == 5 or alg == 7 or alg == 8 or alg == 10:
# RSA, RFC 3110
firstbyte = key[0].to_bytes(1, byteorder=sys.byteorder)[0]
if firstbyte > 0:
exponentlength = firstbyte + 1
else:
exponentlength = struct.unpack(">H", key[1:3])[0] + 3
return (len(key)-exponentlength)*8
else:
# Unknown, best guess. Read the RFCs 6605 or 5933 to find out
# the format of ECDSA or GOST keys.
return len(key)*8
class Formatter(object):
""" This ia the base class for the various Formatters. A formatter
takes a "DNS answer" object and format it for a given output
format (JSON, XML, etc). Implementing a new format means deriving
this class and providing the required methods."""
def __init__(self, domain):
try:
self.myversion = pkg_resources.require("DNS-LG")[0].version
except pkg_resources.DistributionNotFound:
self.myversion = "VERSION UNKNOWN"
self.domain = domain
def format(self, answer, qtype, qclass, flags, querier):
""" Parameter "answer" must be of type
Answer.ExtendedAnswer. "qtype" and "qclass" are strings, flags an integer
and querier a DNSLG.Querier. This method changes the internal
state of the Formatter, it returns nothing."""
pass
def result(self, querier):
""" Returns the state of the Formatter, to be sent to the client."""
return "NOT IMPLEMENTED IN THE BASE CLASS"
# TEXT
class TextFormatter(Formatter):
def format(self, answer, qtype, qclass, flags, querier):
# TODO: it would be nice one day to have a short format to
# have only data, not headers. Or may be several short
# formats, suitable for typical Unix text parsing tools. In
# the mean time, use "zone" for that.
self.output = ""
if qclass != 'IN':
qclass_text = ", class %s" % qclass
else:
qclass_text = ""
self.output += "Query for: %s, type %s%s\n" % (self.domain,
qtype, qclass_text)
str_flags = ""
if flags & dns.flags.AD:
str_flags += "/ Authentic Data "
if flags & dns.flags.AA:
str_flags += "/ Authoritative Answer "
if flags & dns.flags.TC:
str_flags += "/ Truncated Answer "
self.output += "Flags: %s\n" % str_flags
if answer is None:
self.output += "[No data for this query type]\n"
else:
for rrset in answer.answer:
for rdata in rrset:
if rdata.rdtype == dns.rdatatype.A or rdata.rdtype == dns.rdatatype.AAAA:
self.output += "IP address: %s\n" % rdata.address
elif rdata.rdtype == dns.rdatatype.MX:
self.output += "Mail exchanger: %s (preference %i)\n" % \
(rdata.exchange, rdata.preference)
elif rdata.rdtype == dns.rdatatype.TXT:
self.output += "Text: %s\n" % (b" ".join(rdata.strings)).decode()
elif rdata.rdtype == dns.rdatatype.SPF:
self.output += "SPF policy: %s\n" % " ".join(rdata.strings)
elif rdata.rdtype == dns.rdatatype.SOA:
self.output += "Start of zone authority: serial number %i, zone administrator %s, master nameserver %s\n" % \
(rdata.serial, rdata.rname, rdata.mname)
elif rdata.rdtype == dns.rdatatype.NS:
self.output += "Name server: %s\n" % rdata.target
elif rdata.rdtype == dns.rdatatype.DS:
key_algo_text = dns.dnssec.algorithm_to_text(rdata.algorithm)
if key_algo_text != "%i" % rdata.algorithm:
key_algo_text = " (%s)" % key_algo_text
else:
key_algo_text = ""
self.output += "Delegation of signature: key %i%s, hash type %i, hash %s\n" % \
(rdata.key_tag, key_algo_text,
rdata.digest_type,
to_hexstring(rdata.digest))
elif rdata.rdtype == dns.rdatatype.DLV:
self.output += "Delegation of signature: key %i, hash type %i\n" % \
(rdata.key_tag, rdata.digest_type)
elif rdata.rdtype == dns.rdatatype.RRSIG:
pass # Should we show signatures?
elif rdata.rdtype == dns.rdatatype.NSEC or rdata.rdtype == dns.rdatatype.NSEC3:
pass # Should we show NSEC*?
elif rdata.rdtype == dns.rdatatype.LOC:
self.output += "Location: latitude %i degrees %i' %i\" longitude %i degrees %i' %i\" altitude %f meters\n" % \
(rdata.latitude[0], rdata.latitude[1], rdata.latitude[2],
rdata.longitude[0], rdata.longitude[1], rdata.longitude[2],
float(rdata.altitude)/100)
elif rdata.rdtype == dns.rdatatype.URI:
self.output += "URI: %s\n" % (rdata.target) # TODO display priority and weight?
elif rdata.rdtype == dns.rdatatype.SRV:
self.output += "Service location: server %s, port %i, priority %i, weight %i\n" % \
(rdata.target, rdata.port, rdata.priority, rdata.weight)
elif rdata.rdtype == dns.rdatatype.PTR:
self.output += "Target: %s\n" % rdata.target
elif rdata.rdtype == dns.rdatatype.CNAME:
self.output += "Canonical name: %s\n" % rdata.target
elif rdata.rdtype == dns.rdatatype.DNSKEY:
self.output += "DNSSEC key: "
try:
key_tag = dns.dnssec.key_id(rdata)
self.output += "tag %i " % key_tag
except AttributeError:
# key_id appeared only in dnspython 1.9. Not
# always available on 2012-05-17
pass
algo_text = dns.dnssec.algorithm_to_text(rdata.algorithm)
if algo_text == "%i" % rdata.algorithm:
algo_text = "Unknown algorithm"
flags_text = dns.rdtypes.dnskeybase.flags_to_text_set(rdata.flags)
self.output += "algorithm %i (%s), length %i bits, flags %i (%s)\n" % \
(rdata.algorithm, algo_text,
keylength(rdata.algorithm, rdata.key),
rdata.flags, flags_text)
elif rdata.rdtype == dns.rdatatype.NSEC3PARAM:
self.output += "NSEC3PARAM: algorithm %i, iterations %i, salt %s\n" % (rdata.algorithm, rdata.iterations, to_hexstring(rdata.salt))
elif rdata.rdtype == dns.rdatatype.SSHFP:
self.output += "SSH fingerprint: algorithm %i, digest type %i, fingerprint %s\n" % \
(rdata.algorithm, rdata.fp_type, to_hexstring(rdata.fingerprint))
elif rdata.rdtype == dns.rdatatype.NAPTR:
self.output += ("Naming Authority Pointer: flags \"%s\", order %i, " + \
"preference %i, rexegp \"%s\" -> replacement \"%s\", " + \
"services \"%s\"\n") % \
(rdata.flags, rdata.order, rdata.preference,
rdata.regexp, str(rdata.replacement), rdata.service)
else:
self.output += "Unknown record type %i: (DATA)\n" % rdata.rdtype
if rdata.rdtype != dns.rdatatype.RRSIG and \
rdata.rdtype != dns.rdatatype.NSEC and \
rdata.rdtype != dns.rdatatype.NSEC3:
self.output += "TTL: %i\n" % rrset.ttl
self.output += "Resolver queried: %s\n" % answer.nameserver
self.output += "Query done at: %s\n" % time.strftime("%Y-%m-%d %H:%M:%SZ",
time.gmtime(time.time()))
self.output += "Query duration: %s\n" % querier.delay
if querier.description:
self.output += "Service description: %s\n" % querier.description
self.output += "DNS Looking Glass %s, DNSpython version %s, Python version %s %s on %s\n" % \
(self.myversion,
dnspythonversion.version, platform.python_implementation(),
platform.python_version(), platform.system())
def result(self, querier):
return self.output.encode()
# ZONE FILE
class ZoneFormatter(Formatter):
def format(self, answer, qtype, qclass, flags, querier):
self.output = ""
if qclass != 'IN':
qclass_text = ", class %s" % qclass
else:
qclass_text = ""
self.output += "; Question: %s, type %s%s\n" % (self.domain,
qtype, qclass_text)
str_flags = ""
if flags & dns.flags.AD:
str_flags += " ad "
if flags & dns.flags.AA:
str_flags += " aa "
if flags & dns.flags.TC:
str_flags += " tc "
if str_flags != "":
self.output += "; Flags:" + str_flags + "\n"
self.output += "\n"
if answer is None:
self.output += "; No data for this type\n"
else:
for rrset in answer.answer:
for rdata in rrset:
# TODO: do not hardwire the class
if rdata.rdtype != dns.rdatatype.RRSIG:
self.output += "%s\tIN\t" % answer.qname # TODO: do not repeat the name if there is a RRset
# TODO: it could use some refactoring: most (but _not all_) of types
# use the same code.
if rdata.rdtype == dns.rdatatype.A:
self.output += "A\t%s\n" % rdata.to_text()
elif rdata.rdtype == dns.rdatatype.AAAA:
self.output += "AAAA\t%s\n" % rdata.to_text()
elif rdata.rdtype == dns.rdatatype.MX:
self.output += "MX\t%s\n" % rdata.to_text()
elif rdata.rdtype == dns.rdatatype.SPF:
self.output += "SPF\t%s\n" % rdata.to_text()
elif rdata.rdtype == dns.rdatatype.TXT:
self.output += "TXT\t%s\n" % rdata.to_text()
elif rdata.rdtype == dns.rdatatype.SOA:
self.output += "SOA\t%s\n" % rdata.to_text()
elif rdata.rdtype == dns.rdatatype.NS:
self.output += "NS\t%s\n" % rdata.to_text()
elif rdata.rdtype == dns.rdatatype.PTR:
self.output += "PTR\t%s\n" % rdata.to_text()
elif rdata.rdtype == dns.rdatatype.CNAME:
self.output += "CNAME\t%s\n" % rdata.to_text()
elif rdata.rdtype == dns.rdatatype.LOC:
self.output += "LOC\t%s\n" % rdata.to_text()
elif rdata.rdtype == dns.rdatatype.URI:
self.output += "URI\t%s\n" % rdata.to_text().encode(querier.encoding)
elif rdata.rdtype == dns.rdatatype.DNSKEY:
self.output += "DNSKEY\t%s" % rdata.to_text()
try:
key_tag = dns.dnssec.key_id(rdata)
self.output += " ; key ID = %i" % key_tag
except AttributeError:
# key_id appeared only in dnspython 1.9. Not
# always available on 2012-05-17
pass
algo_text = dns.dnssec.algorithm_to_text(rdata.algorithm)
if algo_text != "%i" % rdata.algorithm:
self.output += " ; Algorithm %s" % algo_text
self.output += "\n"
elif rdata.rdtype == dns.rdatatype.NSEC3PARAM:
self.output += "NSEC3PARAM\t%s\n" % rdata.to_text()
elif rdata.rdtype == dns.rdatatype.DS:
self.output += "DS\t%s " % rdata.to_text()
algo_text = dns.dnssec.algorithm_to_text(rdata.algorithm)
if algo_text != "%i" % rdata.algorithm:
self.output += " ; Key algorithm %s" % algo_text
self.output += '\n'
elif rdata.rdtype == dns.rdatatype.DLV:
self.output += "DLV\t%s\n" % rdata.to_text()
elif rdata.rdtype == dns.rdatatype.SSHFP:
self.output += "SSHFP\t%s\n" % rdata.to_text()
elif rdata.rdtype == dns.rdatatype.NAPTR:
self.output += "NAPTR\t%s\n" % rdata.to_text()
elif rdata.rdtype == dns.rdatatype.RRSIG:
pass # Should we show signatures?
elif rdata.rdtype == dns.rdatatype.SRV:
self.output += "SRV\t%s\n" % rdata.to_text()
else:
# dnspython dumps the types it knows. TODO: uses that?
self.output += "TYPE%i ; DATA %s\n" % (rdata.rdtype, rdata.to_text())
if rdata.rdtype != dns.rdatatype.RRSIG:
self.output += "; TTL: %i\n\n" % rrset.ttl # TODO: put it in the zone, not as a comment
self.output += "\n; Server: %s\n" % answer.nameserver
self.output += "; When: %s\n" % time.strftime("%Y-%m-%d %H:%M:%SZ",
time.gmtime(time.time()))
self.output += "; Query duration: %s\n" % querier.delay
if querier.description:
self.output += "; Service description: %s\n" % querier.description
self.output += "; DNS Looking Glass %s, DNSpython version %s, Python version %s %s on %s\n" % \
(self.myversion, dnspythonversion.version,
platform.python_implementation(),
platform.python_version(), platform.system())
def result(self, querier):
return self.output.encode()
# JSON
import json
# http://docs.python.org/library/json.html
class JsonFormatter(Formatter):
def format(self, answer, qtype, qclass, flags, querier):
self.object = {}
self.object['ReturnCode'] = "NOERROR"
self.object['QuestionSection'] = {'Qname': self.domain, 'Qtype': qtype, 'Qclass': qclass}
if flags & dns.flags.AD:
self.object['AD'] = True
if flags & dns.flags.AA:
self.object['AA'] = True
if flags & dns.flags.TC:
self.object['TC'] = True
self.object['AnswerSection'] = []
if answer.answer is not None:
for rrset in answer.answer:
for rdata in rrset: # TODO: sort them? For instance by preference for MX?
if rdata.rdtype == dns.rdatatype.A:
self.object['AnswerSection'].append({'Type': 'A', 'Address': rdata.address})
elif rdata.rdtype == dns.rdatatype.AAAA:
self.object['AnswerSection'].append({'Type': 'AAAA', 'Address': rdata.address})
elif rdata.rdtype == dns.rdatatype.LOC:
self.object['AnswerSection'].append({'Type': 'LOC',
'Latitude': '%f' % rdata.float_latitude,
'Longitude': '%f' % rdata.float_longitude,
'Altitude': '%f' % (float(rdata.altitude)/100)})
elif rdata.rdtype == dns.rdatatype.URI:
self.object['AnswerSection'].append({'Type': 'URI',
'Target': '%s' % rdata.target,
'Priority': '%i' % rdata.priority,
'Weight': '%i' % rdata.weight})
elif rdata.rdtype == dns.rdatatype.PTR:
self.object['AnswerSection'].append({'Type': 'PTR',
'Target': str(rdata.target)})
elif rdata.rdtype == dns.rdatatype.CNAME:
self.object['AnswerSection'].append({'Type': 'CNAME',
'Target': str(rdata.target)})
elif rdata.rdtype == dns.rdatatype.MX:
self.object['AnswerSection'].append({'Type': 'MX',
'MailExchanger': str(rdata.exchange),
'Preference': rdata.preference})
elif rdata.rdtype == dns.rdatatype.TXT:
self.object['AnswerSection'].append({'Type': 'TXT', 'Text': (b" ".join(rdata.strings)).decode()})
elif rdata.rdtype == dns.rdatatype.SPF:
self.object['AnswerSection'].append({'Type': 'SPF', 'Text': " ".join(rdata.strings)})
elif rdata.rdtype == dns.rdatatype.SOA:
self.object['AnswerSection'].append({'Type': 'SOA', 'Serial': rdata.serial,
'MasterServerName': str(rdata.mname),
'MaintainerName': str(rdata.rname),
'Refresh': rdata.refresh,
'Retry': rdata.retry,
'Expire': rdata.expire,
'Minimum': rdata.minimum,
})
elif rdata.rdtype == dns.rdatatype.NS:
self.object['AnswerSection'].append({'Type': 'NS', 'Target': str(rdata.target)})
elif rdata.rdtype == dns.rdatatype.DNSKEY:
returned_object = {'Type': 'DNSKEY',
'Length': keylength(rdata.algorithm, rdata.key),
'Algorithm': rdata.algorithm,
'Flags': rdata.flags}
try:
key_tag = dns.dnssec.key_id(rdata)
returned_object['Tag'] = key_tag
except AttributeError:
# key_id appeared only in dnspython 1.9. Not
# always available on 2012-05-17
pass
self.object['AnswerSection'].append(returned_object)
elif rdata.rdtype == dns.rdatatype.NSEC3PARAM:
self.object['AnswerSection'].append({'Type': 'NSEC3PARAM', 'Algorithm': rdata.algorithm, 'Iterations': rdata.iterations, 'Salt': to_hexstring(rdata.salt), 'Flags': rdata.flags})
elif rdata.rdtype == dns.rdatatype.DS:
self.object['AnswerSection'].append({'Type': 'DS', 'DelegationKey': rdata.key_tag,
'DigestType': rdata.digest_type})
elif rdata.rdtype == dns.rdatatype.DLV:
self.object['AnswerSection'].append({'Type': 'DLV', 'DelegationKey': rdata.key_tag,
'DigestType': rdata.digest_type})
elif rdata.rdtype == dns.rdatatype.RRSIG:
pass # Should we show signatures?
elif rdata.rdtype == dns.rdatatype.SSHFP:
self.object['AnswerSection'].append({'Type': 'SSHFP',
'Algorithm': rdata.algorithm,
'DigestType': rdata.fp_type,
'Fingerprint': to_hexstring(rdata.fingerprint)})
elif rdata.rdtype == dns.rdatatype.NAPTR:
self.object['AnswerSection'].append({'Type': 'NAPTR',
'Flags': str(rdata.flags),
'Services': str(rdata.service),
'Order': rdata.order,
'Preference': str(rdata.preference),
'Regexp': str(rdata.regexp),
'Replacement': str(rdata.replacement)})
elif rdata.rdtype == dns.rdatatype.SRV:
self.object['AnswerSection'].append({'Type': 'SRV', 'Server': str(rdata.target),
'Port': rdata.port,
'Priority': rdata.priority,
'Weight': rdata.weight})
else:
self.object['AnswerSection'].append({'Type': "unknown %i" % rdata.rdtype})
if rdata.rdtype != dns.rdatatype.RRSIG:
self.object['AnswerSection'][-1]['TTL'] = rrset.ttl
self.object['AnswerSection'][-1]['Name'] = str(rrset.name)
try:
duration = querier.delay.total_seconds()
except AttributeError: # total_seconds appeared only with Python 2.7
delay = querier.delay
duration = (delay.days*86400) + delay.seconds + \
(float(delay.microseconds)/1000000.0)
self.object['Query'] = {'Server': answer.nameserver,
'Time': time.strftime("%Y-%m-%d %H:%M:%SZ",
time.gmtime(time.time())),
'Duration': duration}
if querier.description:
self.object['Query']['Description'] = querier.description
self.object['Query']['Versions'] = "DNS Looking Glass %s, DNSpython version %s, Python version %s %s on %s" % \
(self.myversion, dnspythonversion.version,
platform.python_implementation(),
platform.python_version(), platform.system())
def result(self, querier):
return (json.dumps(self.object, indent=True) + "\n").encode()
from prometheus_client.core import InfoMetricFamily, GaugeMetricFamily
from prometheus_client import CollectorRegistry
from prometheus_client.openmetrics.exposition import generate_latest
class OpenMetricsFormatter(Formatter):
class DnsLgCollector(object):
def __init__(self, json_data, version):
self.json_data = json_data
self.version = version
def collect(self):
for record in self.json_data['AnswerSection']:
yield InfoMetricFamily('dns_lg_answer_section_rr', 'Record values', value={
k: str(v) for k, v in record.items()
})
yield InfoMetricFamily('dns_lg_build', 'Build information', value={
'dns_lg_version': self.version,
'dnspython_version': dnspythonversion.version,
'python_implementation': platform.python_implementation(),
'python_version': platform.python_version(),
'system': platform.system(),
})
query_labels = dict(self.json_data['QuestionSection'], Server=self.json_data['Query']['Server'])
duration = GaugeMetricFamily('dns_lg_query_duration_seconds', 'Query duration in seconds', labels=query_labels.keys())
duration.add_metric(query_labels.values(), self.json_data['Query']['Duration'])
yield duration
rrs_count = GaugeMetricFamily('dns_lg_answer_rrs_count', 'Number of records in the answer section', labels=query_labels.keys())
rrs_count.add_metric(query_labels.values(), len(self.json_data['AnswerSection']))
yield rrs_count
def format(self, answer, qtype, qclass, flags, querier):
json_formatter = JsonFormatter(self.domain)
json_formatter.format(answer, qtype, qclass, flags, querier)
json_data = json_formatter.object
self.registry = CollectorRegistry()
self.registry.register(self.DnsLgCollector(json_data, self.myversion))
def result(self, querier):
return generate_latest(self.registry)
# XML
# http://www.owlfish.com/software/simpleTAL/
from simpletal import simpleTAL, simpleTALES, simpleTALUtils
xml_template = """
<result>
<query>
<question><qname tal:content="qname"/><qtype tal:content="qtype"/></question>
<server><resolver tal:content="resolver"/><duration tal:content="duration"/><time tal:content="time"/><description tal:condition="description" tal:content="description"/><versions tal:condition="version" tal:content="version"/></server>
</query>
<response>
<!-- TODO: query ID -->
<ad tal:condition="ad" tal:content="ad"/><tc tal:condition="tc" tal:content="tc"/><aa tal:condition="aa" tal:content="aa"/>
<!-- No <anscount>, it is useless in XML. -->
<answers tal:condition="rrsets">
<rrset tal:replace="structure rrset" tal:repeat="rrset rrsets"/>
</answers>
</response>
</result>
"""
set_xml_template = """
<RRSet tal:condition="records" class="IN" tal:attributes="owner ownername; type type; ttl ttl"><record tal:repeat="record records" tal:replace="structure record"/></RRSet>
"""
a_xml_template = """
<A tal:attributes="address address"/>
"""
aaaa_xml_template = """
<AAAA tal:attributes="ip6address address"/>
"""
mx_xml_template = """
<MX tal:attributes="preference preference; exchange exchange"/>
"""
ns_xml_template = """
<NS tal:attributes="nsdname name"/>
"""
srv_xml_template = """
<SRV tal:attributes="priority priority; weight weight; port port; target name"/>
"""
txt_xml_template = """
<TXT tal:attributes="rdata text"/>
"""
spf_xml_template = """
<SPF tal:attributes="rdata text"/>
"""
loc_xml_template = """
<LOC tal:attributes="latitude latitude; longitude longitude; altitude altitude"/>
"""
uri_xml_template = """
<URI tal:attributes="target target; priority priority; weight weight"/>
"""
ptr_xml_template = """
<PTR tal:attributes="ptrdname name"/>
"""
cname_xml_template = """
<CNAME tal:attributes="host target"/>
"""
# TODO: NSEC3PARAM not in draft-daley-dns-schema-00
nsec3param_xml_template = """
<NSEC3PARAM tal:attributes="algorithm algorithm; flags flags; iterations iterations"/>
"""
ds_xml_template = """
<DS tal:attributes="keytag keytag; algorithm algorithm; digesttype digesttype; digest digest"/>
"""
dlv_xml_template = """
<DLV tal:attributes="keytag keytag; algorithm algorithm; digesttype digesttype; digest digest"/>
"""
# TODO: keytag is an extension to the Internet-Draft
dnskey_xml_template = """
<DNSKEY tal:attributes="flags flags; protocol protocol; algorithm algorithm; length length; publickey key; keytag keytag"/>
"""
sshfp_xml_template = """
<SSHFP tal:attributes="algorithm algorithm; fptype fptype; fingerprint fingerprint"/>
"""
naptr_xml_template = """
<NAPTR tal:attributes="flags flags; order order; preference preference; services services; regexp regexp; replacement replacement"/>
"""
soa_xml_template = """
<SOA tal:attributes="mname mname; rname rname; serial serial; refresh refresh; retry retry; expire expire; minimum minimum"/>
"""
# TODO: how to keep the comments of a template in TAL's output?
unknown_xml_template = """
<binaryRR tal:attributes="rtype rtype; rdlength rdlength; rdata rdata"/> <!-- Unknown type -->
"""
# TODO: Why is there a rdlength when you can deduce it from the rdata?
# That's strange in a non-binary format like XML.
class XmlFormatter(Formatter):
def format(self, answer, qtype, qclass, flags, querier):
self.xml_template = simpleTAL.compileXMLTemplate (xml_template)
self.set_template = simpleTAL.compileXMLTemplate (set_xml_template)
self.a_template = simpleTAL.compileXMLTemplate (a_xml_template)
self.aaaa_template = simpleTAL.compileXMLTemplate (aaaa_xml_template)
self.mx_template = simpleTAL.compileXMLTemplate (mx_xml_template)
self.srv_template = simpleTAL.compileXMLTemplate (srv_xml_template)
self.txt_template = simpleTAL.compileXMLTemplate (txt_xml_template)
self.spf_template = simpleTAL.compileXMLTemplate (spf_xml_template)
self.loc_template = simpleTAL.compileXMLTemplate (loc_xml_template)
self.uri_template = simpleTAL.compileXMLTemplate (uri_xml_template)
self.ns_template = simpleTAL.compileXMLTemplate (ns_xml_template)
self.ptr_template = simpleTAL.compileXMLTemplate (ptr_xml_template)
self.cname_template = simpleTAL.compileXMLTemplate (cname_xml_template)
self.soa_template = simpleTAL.compileXMLTemplate (soa_xml_template)
self.ds_template = simpleTAL.compileXMLTemplate (ds_xml_template)
self.nsec3param_template = simpleTAL.compileXMLTemplate (nsec3param_xml_template)
self.dlv_template = simpleTAL.compileXMLTemplate (dlv_xml_template)
self.dnskey_template = simpleTAL.compileXMLTemplate (dnskey_xml_template)
self.sshfp_template = simpleTAL.compileXMLTemplate (sshfp_xml_template)
self.naptr_template = simpleTAL.compileXMLTemplate (naptr_xml_template)
self.unknown_template = simpleTAL.compileXMLTemplate (unknown_xml_template)
self.context = simpleTALES.Context(allowPythonPath=False)
self.acontext = simpleTALES.Context(allowPythonPath=False)
self.rcontext = simpleTALES.Context(allowPythonPath=False)
self.context.addGlobal ("qname", self.domain)
self.context.addGlobal ("qtype", qtype)
self.context.addGlobal ("resolver", answer.nameserver)
try:
duration = querier.delay.total_seconds()
except AttributeError: # total_seconds appeared only with Python 2.7
delay = querier.delay
duration = (delay.days*86400) + delay.seconds + \
(float(delay.microseconds),1000000.0)
self.context.addGlobal ("duration", duration)
self.context.addGlobal ("time", time.strftime("%Y-%m-%d %H:%M:%SZ",
time.gmtime(time.time())))
self.context.addGlobal ("description", querier.description)
self.context.addGlobal ("version",
"DNS Looking Glass %s, DNSpython version %s, Python version %s %s on %s\n" % \
(self.myversion, dnspythonversion.version,
platform.python_implementation(),
platform.python_version(), platform.system()))
addresses = []
if answer is not None:
self.rrsets = []
if flags & dns.flags.AD:
ad = 1
else:
ad = 0
self.context.addGlobal ("ad", ad)
if flags & dns.flags.TC:
tc = 1
else:
tc = 0
self.context.addGlobal ("tc", tc)
if flags & dns.flags.AA:
aa = 1
else:
aa = 0
self.context.addGlobal ("aa", aa)
# TODO: class
for rrset in answer.answer:
records = []
self.acontext.addGlobal ("ttl", rrset.ttl)
self.acontext.addGlobal ("type", dns.rdatatype.to_text(rrset.rdtype))
for rdata in rrset:
icontext = simpleTALES.Context(allowPythonPath=False)
iresult = io.StringIO()
if rdata.rdtype == dns.rdatatype.A or rdata.rdtype == dns.rdatatype.AAAA:
icontext.addGlobal ("address", rdata.address)
if rdata.rdtype == dns.rdatatype.A:
self.a_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
else:
self.aaaa_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.SRV:
icontext.addGlobal ("priority", rdata.priority)
icontext.addGlobal ("weight", rdata.weight)
icontext.addGlobal ("port", rdata.port)
icontext.addGlobal ("name", rdata.target)
self.srv_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.MX:
icontext.addGlobal ("preference", rdata.preference)
icontext.addGlobal ("exchange", rdata.exchange)
self.mx_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.NSEC3PARAM:
icontext.addGlobal ("algorithm", rdata.algorithm)
icontext.addGlobal ("flags", rdata.flags)
icontext.addGlobal ("iterations", rdata.iterations)
self.nsec3param_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.DS:
icontext.addGlobal ("keytag", rdata.key_tag)
icontext.addGlobal ("digesttype", rdata.digest_type)
icontext.addGlobal ("algorithm", rdata.algorithm)
icontext.addGlobal ("digest", to_hexstring(rdata.digest))
self.ds_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.DLV:
icontext.addGlobal ("keytag", rdata.key_tag)
icontext.addGlobal ("digesttype", rdata.digest_type)
icontext.addGlobal ("algorithm", rdata.algorithm)
icontext.addGlobal ("digest", to_hexstring(rdata.digest))
self.dlv_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.DNSKEY:
try:
key_tag = dns.dnssec.key_id(rdata)
icontext.addGlobal ("keytag", key_tag)
except AttributeError:
# key_id appeared only in dnspython 1.9. Not
# always available on 2012-05-17
pass
icontext.addGlobal ("protocol", rdata.protocol)
icontext.addGlobal ("flags", rdata.flags)
icontext.addGlobal ("algorithm", rdata.algorithm)
icontext.addGlobal ("length", keylength(rdata.algorithm, rdata.key))
icontext.addGlobal ("key", to_hexstring(rdata.key))
self.dnskey_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.SSHFP:
icontext.addGlobal ("algorithm", rdata.algorithm)
icontext.addGlobal ("fptype", rdata.fp_type)
icontext.addGlobal ("fingerprint", to_hexstring(rdata.fingerprint))
self.sshfp_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.NAPTR:
icontext.addGlobal ("flags", rdata.flags)
icontext.addGlobal ("services", rdata.service)
icontext.addGlobal ("order", rdata.order)
icontext.addGlobal ("preference", rdata.preference)
regexp = str(rdata.regexp, "UTF-8")
icontext.addGlobal ("regexp",
regexp)
# Yes, there is Unicode in NAPTRs, see
# mailclub.tel for instance. We assume it will
# always be UTF-8
icontext.addGlobal ("replacement", rdata.replacement)
self.naptr_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.TXT:
# Yes, some people add Unicode in TXT records,
# see mailclub.tel for instance. We assume
# UTF-8
text = str((b" ".join(rdata.strings)).decode())
icontext.addGlobal ("text", text)
self.txt_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.SPF:
icontext.addGlobal ("text", " ".join(rdata.strings))
self.spf_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.PTR:
icontext.addGlobal ("name", rdata.target)
self.ptr_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.CNAME:
icontext.addGlobal ("target", rdata.target)
self.cname_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.LOC:
icontext.addGlobal ("latitude", rdata.float_latitude)
icontext.addGlobal ("longitude", rdata.float_longitude)
icontext.addGlobal ("altitude", float(rdata.altitude)/100)
self.loc_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.URI:
icontext.addGlobal ("target", rdata.target)
icontext.addGlobal ("weight", rdata.weight)
icontext.addGlobal ("priority", rdata.priority)
self.uri_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.NS:
icontext.addGlobal ("name", rdata.target)
# TODO: translate Punycode domain names back to Unicode?
self.ns_template.expand (icontext, iresult,
suppressXMLDeclaration=True)
elif rdata.rdtype == dns.rdatatype.SOA:
icontext.addGlobal ("rname", rdata.rname)
icontext.addGlobal ("mname", rdata.mname)
icontext.addGlobal ("serial", rdata.serial)
icontext.addGlobal ("refresh", rdata.refresh)
icontext.addGlobal ("retry", rdata.retry)
icontext.addGlobal ("expire", rdata.expire)
icontext.addGlobal ("minimum", rdata.minimum)
self.soa_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
else:
icontext.addGlobal ("rtype", rdata.rdtype)
icontext.addGlobal ("rdlength", 0) # TODO: useless, anyway (and
# no easy way to compute it in dnspython)
# TODO: rdata
self.unknown_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
records.append(iresult.getvalue())
else:
pass # TODO what to send back when no data for this QTYPE?
if records:
self.acontext.addGlobal ("records", records)
self.acontext.addGlobal ("ttl", rrset.ttl)
iresult = io.StringIO()
self.set_template.expand (self.acontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
self.rrsets.append(iresult.getvalue())
else:
self.rrsets = None
def result(self, querier):
result = io.StringIO()
self.context.addGlobal("rrsets", self.rrsets)
self.xml_template.expand (self.context, result,
outputEncoding=querier.encoding)
return result.getvalue().encode()
# HTML
html_template = """<?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<title tal:content="title"/>
<link tal:condition="css" rel="stylesheet" type="text/css" tal:attributes="href css"/>
<link rel="author" href="http://www.bortzmeyer.org/static/moi.html"/>
<link rel="service-doc" href="https://www.bortzmeyer.org/dns-lg-usage.html"/>
<link tal:condition="opensearch" rel="search"
type="application/opensearchdescription+xml"
tal:attributes="href opensearch"
title="DNS Looking Glass" />
<meta http-equiv="Content-Type" tal:attributes="content contenttype"/>
<meta name="robots" content="noindex,nofollow"/>
</head>
<body>
<h1 tal:content="title"/>
<div class="body">
<p tal:condition="distinctowner">Response is name <span class="hostname" tal:content="ownername"/>.</p><p tal:condition="flags">Response flags are: <span tal:replace="flags"/>.</p>
<div class="rrsets" tal:repeat="rrset rrsets">
<ul tal:condition="rrset/records">
<li tal:repeat="record rrset/records" tal:content="structure record"/>
<li tal:condition="rrset/ttl">(Time-to-Live of this answer is <span tal:replace="rrset/ttl"/>)</li>
</ul>
</div>
<p tal:condition="not: rrsets">No data was found.</p>
<p>Result obtained from resolver(s) <span class="hostname" tal:content="resolver"/> at <span tal:replace="datetime"/>. Query took <span tal:replace="duration"/>.</p>
</div>
<hr class="endsep"/>
<p><span tal:condition="email">Service managed by <span class="email" tal:content="email"/>. </span><span tal:condition="doc"> See <a tal:attributes="href doc">details and documentation</a>.</span><span tal:condition="description_html" tal:content="structure description_html"/><span tal:condition="description" tal:content="description"/> / <span tal:condition="versions" tal:content="structure versions"/></p>
</body>
</html>
"""
version_html_template = """
<span>DNS Looking Glass "<span tal:replace="myversion"/>", <a href="http://www.dnspython.org/">DNSpython</a> version <span tal:replace="dnsversion"/>, <a href="http://www.python.org/">Python</a> version <span tal:replace="pyversion"/></span>
"""
address_html_template = """
<span>IP address: <a class="address" tal:attributes="href path" tal:content="address"/></span>
"""
mx_html_template = """
<span>Mail Exchanger: <a class="hostname" tal:attributes="href path" tal:content="hostname"/> (preference <span tal:replace="pref"/>)</span>
"""
# TODO: better presentation of "admin" (replacement of . by @ and mailto: URL). See nic.in for an example of a broken zone, in that respect.
# TODO: better presentation of intervals? (Weeks, days, etc)
soa_html_template = """
<span>Start Of Authority: Zone administrator <span tal:replace="admin"/>, master server <a class="hostname" tal:attributes="href path" tal:content="master"/>, serial number <span tal:replace="serial"/>, refresh interval <span tal:replace="refresh"/> s, retry interval <span tal:replace="retry"/> s, expiration delay <span tal:replace="expire"/> s, negative reply TTL <span tal:replace="minimum"/> s</span>
"""
ns_html_template = """
<span>Name Servers: <a class="hostname" tal:attributes="href path" tal:content="hostname"/></span>
"""
ptr_html_template = """
<span>Associated name: <a class="hostname" tal:attributes="href path" tal:content="hostname"/></span>
"""
cname_html_template = """
<span>Canonical name: <a class="hostname" tal:attributes="href path" tal:content="target"/></span>
"""
srv_html_template = """
<span>Service: Priority <span tal:content="priority"/>, weight <span tal:content="weight"/>, host <a class="hostname" tal:attributes="href path" tal:content="hostname"/>, port <span tal:content="port"/>,</span>
"""
txt_html_template = """
<span>Text: <span tal:content="text"/></span>
"""
spf_html_template = """
<span>SPF record: <span tal:content="text"/></span>
"""
nsec3param_html_template = """
<span>NSEC3 parameters: hash type <span tal:replace="algorithm"/>, <span tal:replace="iterations"/> iterations, flags <span tal:replace="flags"/></span>
"""
ds_html_template = """
<span>Secure Delegation: Key <span tal:replace="keytag"/> (key algorithm <span tal:replace="keyalgo"/>, hash type <span tal:replace="digesttype"/>)</span>
"""
dlv_html_template = """
<span>Lookaside Secure Delegation: Key <span tal:replace="keytag"/> (hash type <span tal:replace="digesttype"/>)</span>
"""
dnskey_html_template = """
<span>DNSSEC key: <span tal:condition="keytag"><span tal:replace="keytag"/>, </span>algorithm <span tal:replace="algorithm"/>, length <span tal:replace="length"/> bits, flags <span tal:replace="flags"/></span>
"""
# TODO display the key tag, the inception and expiration time?
rrsig_html_template = """
<span>DNSSEC signature</span>
"""
nsec_html_template = """
<span>NSEC or NSEC3 record</span>
"""
sshfp_html_template = """
<span>SSH fingerprint: Algorithm <span tal:replace="algorithm"/>, Fingerprint type <span tal:replace="fptype"/>, fingerprint <span tal:replace="fingerprint"/></span>
"""
naptr_html_template = """
<span>Naming Authority Pointer: Flags "<span tal:replace="flags"/>", Service(s) "<span tal:replace="services"/>", order <span tal:replace="order"/> and preference <span tal:replace="preference"/>, regular expression <span class="naptr_regexp" tal:content="regexp"/>, replacement <span class="domainname" tal:content="replacement"/></span>
"""
# TODO: link to Open Street Map
loc_html_template = """
<span>Location: Latitude <span tal:replace="latitude"/> degrees / Longitude <span tal:replace="longitude"/> degrees (altitude <span tal:replace="altitude"/> m)</span>
"""
uri_html_template = """
<span>Priority: <span tal:replace="priority"/>, <span tal:replace="weight"/>, <a class="hostname" tal:attributes="href target" tal:content="target"/></span>
"""
unknown_html_template = """
<span>Unknown record type (<span tal:replace="rrtype"/>)</span>
"""
class HtmlFormatter(Formatter):
def link_of(self, host, querier, reverse=False):
if querier.base_url == "":
url = '/'
else:
url = querier.base_url
base = url + str(host)
if not reverse:
base += '/ADDR'
base += '?format=HTML'
if reverse:
base += '&reverse=1'
return base
def pretty_duration(self, duration):
""" duration is in seconds """
weeks = duration // (86400*7)
days = (duration-(86400*7*weeks)) // 86400
hours = (duration-(86400*7*weeks)-(86400*days)) // 3600
minutes = (duration-(86400*7*weeks)-(86400*days)-(3600*hours)) // 60
seconds = duration-(86400*7*weeks)-(86400*days)-(3600*hours)-(60*minutes)
result = ""
empty_result = True
if weeks != 0:
if weeks > 1:
plural = "s"
else:
plural = ""
result += "%i week%s" % (weeks, plural)
empty_result = False
if days != 0:
if not empty_result:
result += ", "
if days > 1:
plural = "s"
else:
plural = ""
result += "%i day%s" % (days, plural)
empty_result = False
if hours != 0:
if not empty_result:
result += ", "
if hours > 1:
plural = "s"
else:
plural = ""
result += "%i hour%s" % (hours, plural)
empty_result = False
if minutes != 0:
if not empty_result:
result += ", "
if minutes > 1:
plural = "s"
else:
plural = ""
result += "%i minute%s" % (minutes, plural)
empty_result = False
if not empty_result:
result += ", "
if seconds > 1:
plural = "s"
else:
plural = ""
result += "%i second%s" % (seconds, plural)
return result
def format(self, answer, qtype, qclass, flags, querier):
self.template = simpleTAL.compileXMLTemplate (html_template)
self.address_template = simpleTAL.compileXMLTemplate (address_html_template)
self.version_template = simpleTAL.compileXMLTemplate (version_html_template)
self.mx_template = simpleTAL.compileXMLTemplate (mx_html_template)
self.soa_template = simpleTAL.compileXMLTemplate (soa_html_template)
self.ns_template = simpleTAL.compileXMLTemplate (ns_html_template)
self.ptr_template = simpleTAL.compileXMLTemplate (ptr_html_template)
self.cname_template = simpleTAL.compileXMLTemplate (cname_html_template)
self.srv_template = simpleTAL.compileXMLTemplate (srv_html_template)
self.txt_template = simpleTAL.compileXMLTemplate (txt_html_template)
self.spf_template = simpleTAL.compileXMLTemplate (spf_html_template)
self.loc_template = simpleTAL.compileXMLTemplate (loc_html_template)
self.uri_template = simpleTAL.compileXMLTemplate (uri_html_template)
self.nsec3param_template = simpleTAL.compileXMLTemplate (nsec3param_html_template)
self.ds_template = simpleTAL.compileXMLTemplate (ds_html_template)
self.dlv_template = simpleTAL.compileXMLTemplate (dlv_html_template)
self.dnskey_template = simpleTAL.compileXMLTemplate (dnskey_html_template)
self.rrsig_template = simpleTAL.compileXMLTemplate (rrsig_html_template)
self.nsec_template = simpleTAL.compileXMLTemplate (nsec_html_template)
self.sshfp_template = simpleTAL.compileXMLTemplate (sshfp_html_template)
self.naptr_template = simpleTAL.compileXMLTemplate (naptr_html_template)
self.unknown_template = simpleTAL.compileXMLTemplate (unknown_html_template)
self.context = simpleTALES.Context(allowPythonPath=False)
self.context.addGlobal ("title", "Query for domain %s, type %s" % \
(self.domain, qtype))
self.context.addGlobal ("resolver", answer.nameserver)
self.context.addGlobal ("email", querier.email_admin)
self.context.addGlobal ("doc", querier.url_doc)
self.context.addGlobal("contenttype",
"text/html; charset=%s" % querier.encoding)
self.context.addGlobal ("css", querier.url_css)
self.context.addGlobal ("opensearch", querier.url_opensearch)
self.context.addGlobal ("datetime", time.strftime("%Y-%m-%d %H:%M:%SZ",
time.gmtime(time.time())))
self.context.addGlobal("duration", str(querier.delay))
if querier.description_html:
self.context.addGlobal("description_html", querier.description_html)
elif querier.description:
self.context.addGlobal("description", querier.description)
iresult = io.StringIO()
icontext = simpleTALES.Context(allowPythonPath=False)
icontext.addGlobal("pyversion", platform.python_implementation() + " " +
platform.python_version() + " on " + platform.system())
icontext.addGlobal("dnsversion", dnspythonversion.version)
icontext.addGlobal("myversion", self.myversion)
self.version_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
self.context.addGlobal("versions", iresult.getvalue())
str_flags = ""
if flags & dns.flags.AD:
str_flags += "/ Authentic Data "
if flags & dns.flags.AA:
str_flags += "/ Authoritative Answer "
if flags & dns.flags.TC:
str_flags += "/ Truncated Answer "
if str_flags != "":
self.context.addGlobal ("flags", str_flags)
if answer is not None:
self.rrsets = []
icontext = simpleTALES.Context(allowPythonPath=False)
for rrset in answer.answer:
records = []
for rdata in rrset:
iresult = io.StringIO()
if rdata.rdtype == dns.rdatatype.A or rdata.rdtype == dns.rdatatype.AAAA:
icontext.addGlobal ("address", rdata.address)
icontext.addGlobal ("path", self.link_of(rdata.address,
querier,
reverse=True))
self.address_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.SOA:
icontext.addGlobal ("master", rdata.mname)
icontext.addGlobal ("path", self.link_of(rdata.mname, querier))
icontext.addGlobal("admin", rdata.rname) # TODO: replace first point by @
icontext.addGlobal("serial", rdata.serial)
icontext.addGlobal("refresh", rdata.refresh)
icontext.addGlobal("retry", rdata.retry)
icontext.addGlobal("expire", rdata.expire)
icontext.addGlobal("minimum", rdata.minimum)
self.soa_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.MX:
icontext.addGlobal ("hostname", rdata.exchange)
icontext.addGlobal ("path", self.link_of(rdata.exchange, querier))
icontext.addGlobal("pref", rdata.preference)
self.mx_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.NS:
icontext.addGlobal ("hostname", rdata.target)
# TODO: translate back the Punycode name
# servers to Unicode with
# encodings.idna.ToUnicode?
icontext.addGlobal ("path", self.link_of(rdata.target, querier))
self.ns_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.PTR:
icontext.addGlobal ("hostname", rdata.target)
icontext.addGlobal ("path", self.link_of(rdata.target, querier))
self.ptr_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.CNAME:
icontext.addGlobal ("target", rdata.target)
icontext.addGlobal ("path", self.link_of(rdata.target, querier))
self.cname_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.SRV:
icontext.addGlobal ("hostname", rdata.target)
icontext.addGlobal ("path", self.link_of(rdata.target, querier))
icontext.addGlobal ("priority", rdata.priority)
icontext.addGlobal ("weight", rdata.weight)
icontext.addGlobal ("port", rdata.port)
self.srv_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.TXT:
text = b" ".join(rdata.strings)
text = text.decode() # May break if the content of TXT is not UTF-8? TODO
icontext.addGlobal ("text", text)
self.txt_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.SPF:
icontext.addGlobal ("text", "\n".join(rdata.strings))
self.spf_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.LOC:
# TODO: expanded longitude and latitude instead of floats?
icontext.addGlobal ("latitude", rdata.float_latitude)
icontext.addGlobal ("longitude", rdata.float_longitude)
icontext.addGlobal ("altitude", float(rdata.altitude)/100)
self.loc_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.URI:
icontext.addGlobal ("target", rdata.target)
icontext.addGlobal ("weight", rdata.weight)
icontext.addGlobal ("priority", rdata.priority)
self.uri_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.NSEC3PARAM:
icontext.addGlobal ("algorithm", rdata.algorithm)
icontext.addGlobal ("iterations", rdata.iterations)
icontext.addGlobal ("flags", rdata.flags)
self.nsec3param_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.DS:
key_algo_text = dns.dnssec.algorithm_to_text(rdata.algorithm)
if key_algo_text == "%i" % rdata.algorithm:
key_algo_text = "Unknown algorithm"
icontext.addGlobal ("keyalgo", "%s - %s" % (rdata.algorithm, key_algo_text))
icontext.addGlobal ("digesttype", rdata.digest_type)
icontext.addGlobal ("digest", rdata.digest)
icontext.addGlobal ("keytag", rdata.key_tag)
self.ds_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
print(iresult.getvalue())
elif rdata.rdtype == dns.rdatatype.DLV:
icontext.addGlobal ("algorithm", rdata.algorithm)
icontext.addGlobal ("digesttype", rdata.digest_type)
icontext.addGlobal ("digest", rdata.digest)
icontext.addGlobal ("keytag", rdata.key_tag)
self.dlv_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.DNSKEY:
algo_text = dns.dnssec.algorithm_to_text(rdata.algorithm)
if algo_text == "%i" % rdata.algorithm:
algo_text = "Unknown algorithm"
icontext.addGlobal ("algorithm", "%s (%s)" % (rdata.algorithm, algo_text))
icontext.addGlobal ("length", keylength(rdata.algorithm, rdata.key))
icontext.addGlobal ("protocol", rdata.protocol)
icontext.addGlobal ("flags", "%i %s" % (rdata.flags, dns.rdtypes.dnskeybase.flags_to_text_set(rdata.flags)))
try:
key_tag = dns.dnssec.key_id(rdata)
icontext.addGlobal ("keytag", key_tag)
except AttributeError:
# key_id appeared only in dnspython 1.9. Not
# always available on 2012-05-17
pass
self.dnskey_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.RRSIG:
self.rrsig_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.NSEC or rdata.rdtype == dns.rdatatype.NSEC3:
# It can happen with QTYPE=ANY
self.nsec_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.SSHFP:
icontext.addGlobal ("algorithm", rdata.algorithm)
icontext.addGlobal ("fptype", rdata.fp_type)
icontext.addGlobal ("fingerprint", to_hexstring(rdata.fingerprint))
self.sshfp_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
elif rdata.rdtype == dns.rdatatype.NAPTR:
icontext.addGlobal ("flags", rdata.flags)
icontext.addGlobal ("order", rdata.order)
icontext.addGlobal ("preference", rdata.preference)
icontext.addGlobal ("services", rdata.service)
icontext.addGlobal ("regexp", str(rdata.regexp,
"UTF-8")) # UTF-8 rdata is found in the wild
icontext.addGlobal ("replacement", rdata.replacement)
self.naptr_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
else:
icontext.addGlobal ("rrtype", rdata.rdtype)
self.unknown_template.expand (icontext, iresult,
suppressXMLDeclaration=True,
outputEncoding=querier.encoding)
records.append(iresult.getvalue())
self.rrsets.append({'ttl': self.pretty_duration(rrset.ttl),
'records': records})
else:
self.rrsets = None
def result(self, querier):
result = io.StringIO()
self.context.addGlobal("rrsets", self.rrsets)
self.template.expand (self.context, result,
outputEncoding=querier.encoding,
docType='<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">')
return ((result.getvalue() + "\n").encode())