#!/bin/env python import argparse import dns.message import dns.name import dns.query import dns.rdatatype import dns.resolver import httpx import ipaddress import asyncio from sys import exit parser = argparse.ArgumentParser( epilog="Nagios/Icinga compatible check for DNS, DOT (DNS over TLS), DOH (DNS over HTTPS)", formatter_class=argparse.RawTextHelpFormatter) parser.add_argument("-m", "--mode", choices=["dns", "dot", "doh","doq"], \ default="dns", help="Check Mode: dns = normal DNS query\n dot = DNS over TLS\n doh = DNS over HTTPs\ndefault: %(default)s") parser.add_argument("-t", "--type", choices=["A", "AAAA", "TXT"], \ default="A", help="Type to check\ndefault: %(default)s") parser.add_argument("-s", "--server", \ default="dns.google", help="Server to query\ndefault: dns.google\nNote: dot & dns mode requires ip address as server\n if you enter a hostname it will be resolved by system resolvers ") parser.add_argument("--expect", \ default=None, help="expected response (f.e. ip-address, servername, a.s.o.)") parser.add_argument("--expect-mode", choices=["in", "is"], \ default="in", help="How to parse response, looking for expect-paramter\nin = expect-parameter is in the response (if multitple rrsets)\nis = expect-parameter is the only response\ndefault: in") parser.add_argument("--expect-fail", choices=["warning", "critical"], \ default="warning", help="Nagios status if expect is not valid\ndefault: warning") parser.add_argument("address", metavar="address",help="address to query") args = parser.parse_args() def get_ip(address): try: ip = ipaddress.ip_address(address) except ValueError as e: ip = dns.resolver.resolve(qname=address, rdtype=dns.rdatatype.A).rrset[0].address return(ip) def doh(query,server): server = "https://"+ server +"/dns-query" with httpx.Client() as client: try: r = dns.query.https(query, server, session=client) return r.answer except httpx.ConnectError as e: print(e) def doq(query,server): async def amulti(query): global response async with dns.quic.AsyncioQuicManager() as manager: connection = manager.connect(get_ip(server), 853) r = await dns.asyncquery.quic(query, get_ip(server), port=443, connection=connection) print(r) response = r asyncio.run(amulti(query)) return response def dot(query,server): server = get_ip(server) r = dns.query.tls(q=query, where=server) return(r.answer) def dns53(query,server): server = get_ip(server) r = dns.query.udp(query, server) return r.answer def query(mode,address,type,server): address = dns.name.from_text(address) if type == "A": query = dns.message.make_query(qname=address, rdtype=dns.rdatatype.A) if type == "AAAA": query = dns.message.make_query(qname=address, rdtype=dns.rdatatype.AAAA) if type == "TXT": query = dns.message.make_query(qname=address, rdtype=dns.rdatatype.TXT) if(mode == "dns"): response = dns53(query,server) if(mode == "dot"): response = dot(query,server) if(mode == "doh"): response = doh(query,server) if(mode == "doq"): response = doq(query,server) analyze(address,type,server,response) def analyze(address,type,server,response): addresses = [] for subset in response: if ((subset.rdtype == dns.rdatatype.A and type == "A") or (subset.rdtype == dns.rdatatype.AAAA and type == "AAAA")): for rrset in subset: addresses.append(rrset.address) if ((subset.rdtype == dns.rdatatype.TXT and type == "TXT")): for rrset in subset: addresses.append("\""+rrset.strings[0].decode('utf_8')+"\"") if(len(addresses) == 0): raise critical("No response for " + type + ":"+ str(address) +"@"+ server) elif(args.expect == None): resps = ','.join(addresses) raise ok("Found "+ resps + " for " + type + ":"+ str(address) +"@"+ server) def unknown(message): print("UNKNOWN - " + message) exit(3) def critical(message): print("CRITICAL - " + message) exit(2) def warning(message): print("WARNING - " + message) exit(1) def ok(message): print("OK - " + message) exit(0) if __name__ == "__main__": query(args.mode, args.address, args.type, args.server)