113 lines
4.4 KiB
Python
113 lines
4.4 KiB
Python
#!/bin/env python
|
|
import argparse
|
|
import dns.message
|
|
import dns.name
|
|
import dns.query
|
|
import dns.rdatatype
|
|
import dns.resolver
|
|
import httpx
|
|
import ipaddress
|
|
import asyncio
|
|
from sys import exit
|
|
|
|
|
|
parser = argparse.ArgumentParser(
|
|
epilog="Nagios/Icinga compatible check for DNS, DOT (DNS over TLS), DOH (DNS over HTTPS)",
|
|
formatter_class=argparse.RawTextHelpFormatter)
|
|
parser.add_argument("-m", "--mode", choices=["dns", "dot", "doh","doq"], \
|
|
default="dns", help="Check Mode: dns = normal DNS query\n dot = DNS over TLS\n doh = DNS over HTTPs\ndefault: %(default)s")
|
|
parser.add_argument("-t", "--type", choices=["A", "AAAA", "TXT"], \
|
|
default="A", help="Type to check\ndefault: %(default)s")
|
|
parser.add_argument("-s", "--server", \
|
|
default="dns.google", help="Server to query\ndefault: dns.google\nNote: dot & dns mode requires ip address as server\n if you enter a hostname it will be resolved by system resolvers ")
|
|
parser.add_argument("--expect", \
|
|
default=None, help="expected response (f.e. ip-address, servername, a.s.o.)")
|
|
parser.add_argument("--expect-mode", choices=["in", "is"], \
|
|
default="in", help="How to parse response, looking for expect-paramter\nin = expect-parameter is in the response (if multitple rrsets)\nis = expect-parameter is the only response\ndefault: in")
|
|
parser.add_argument("--expect-fail", choices=["warning", "critical"], \
|
|
default="warning", help="Nagios status if expect is not valid\ndefault: warning")
|
|
parser.add_argument("address", metavar="address",help="address to query")
|
|
args = parser.parse_args()
|
|
|
|
def get_ip(address):
|
|
try:
|
|
ip = ipaddress.ip_address(address)
|
|
except ValueError as e:
|
|
ip = dns.resolver.resolve(qname=address, rdtype=dns.rdatatype.A).rrset[0].address
|
|
return(ip)
|
|
|
|
def doh(query,server):
|
|
server = "https://"+ server +"/dns-query"
|
|
try:
|
|
r = dns.query.https(query, server)
|
|
return r.answer
|
|
except httpx.ConnectError as e:
|
|
print(e)
|
|
|
|
def doq(query,server):
|
|
async def amulti(query):
|
|
global response
|
|
async with dns.quic.AsyncioQuicManager() as manager:
|
|
connection = manager.connect(get_ip(server), 853)
|
|
r = await dns.asyncquery.quic(query, get_ip(server), port=443, connection=connection)
|
|
print(r)
|
|
response = r
|
|
|
|
asyncio.run(amulti(query))
|
|
return response
|
|
|
|
def dot(query,server):
|
|
server = get_ip(server)
|
|
r = dns.query.tls(q=query, where=server)
|
|
return(r.answer)
|
|
|
|
def dns53(query,server):
|
|
server = get_ip(server)
|
|
r = dns.query.udp(query, server)
|
|
return r.answer
|
|
|
|
def query(mode,address,type,server):
|
|
address = dns.name.from_text(address)
|
|
if type == "A": query = dns.message.make_query(qname=address, rdtype=dns.rdatatype.A)
|
|
if type == "AAAA": query = dns.message.make_query(qname=address, rdtype=dns.rdatatype.AAAA)
|
|
if type == "TXT": query = dns.message.make_query(qname=address, rdtype=dns.rdatatype.TXT)
|
|
if(mode == "dns"): response = dns53(query,server)
|
|
if(mode == "dot"): response = dot(query,server)
|
|
if(mode == "doh"): response = doh(query,server)
|
|
if(mode == "doq"): response = doq(query,server)
|
|
analyze(address,type,server,response)
|
|
|
|
def analyze(address,type,server,response):
|
|
addresses = []
|
|
for subset in response:
|
|
if ((subset.rdtype == dns.rdatatype.A and type == "A") or (subset.rdtype == dns.rdatatype.AAAA and type == "AAAA")):
|
|
for rrset in subset:
|
|
addresses.append(rrset.address)
|
|
if ((subset.rdtype == dns.rdatatype.TXT and type == "TXT")):
|
|
for rrset in subset:
|
|
addresses.append("\""+rrset.strings[0].decode('utf_8')+"\"")
|
|
if(len(addresses) == 0):
|
|
raise critical("No response for " + type + ":"+ str(address) +"@"+ server)
|
|
elif(args.expect == None):
|
|
resps = ','.join(addresses)
|
|
raise ok("Found "+ resps + " for " + type + ":"+ str(address) +"@"+ server)
|
|
|
|
def unknown(message):
|
|
print("UNKNOWN - " + message)
|
|
exit(3)
|
|
|
|
def critical(message):
|
|
print("CRITICAL - " + message)
|
|
exit(2)
|
|
|
|
def warning(message):
|
|
print("WARNING - " + message)
|
|
exit(1)
|
|
|
|
def ok(message):
|
|
print("OK - " + message)
|
|
exit(0)
|
|
|
|
if __name__ == "__main__":
|
|
query(args.mode, args.address, args.type, args.server)
|