40 lines
		
	
	
	
		
			1,017 B
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			40 lines
		
	
	
	
		
			1,017 B
		
	
	
	
		
			Python
		
	
	
	
	
	
| from itertools import chain
 | |
| 
 | |
| from dns.resolver import resolve
 | |
| from dns.rdatatype import to_text
 | |
| 
 | |
| from base import BaseJob, Data
 | |
| 
 | |
| 
 | |
| def get_addr(data: Data) -> Data:
 | |
|     a_msg = resolve(data['domain'], 'A', raise_on_no_answer=False)
 | |
|     aaaa_msg = resolve(data['domain'], 'AAAA', raise_on_no_answer=False)
 | |
| 
 | |
|     data['addresses'] = [
 | |
|         rr.address
 | |
|         for rr in list(a_msg) + list(aaaa_msg)
 | |
|         if to_text(rr.rdtype) in ('A', 'AAAA')
 | |
|     ]
 | |
| 
 | |
|     return data
 | |
| 
 | |
| 
 | |
| class CustomQuery(BaseJob):
 | |
|     def __init__(self, name: str, rtype: str) -> None:
 | |
|         self.name = name
 | |
|         self.rtype = rtype
 | |
| 
 | |
|     def run(self, data: Data) -> Data:
 | |
|         domain = self.name.format(domain=data['domain'])
 | |
|         queries = data.setdefault('queries', [])
 | |
| 
 | |
|         queries.append({
 | |
|             'qname': domain,
 | |
|             'qtype': self.rtype,
 | |
|             'answer': [
 | |
|                 rr.to_text()
 | |
|                 for rr in resolve(domain, self.rtype, raise_on_no_answer=False)
 | |
|             ]
 | |
|         })
 | |
| 
 | |
|         return data
 |