From 2a00288a9c31fe8b14a2c7745b3ef91f2306517c Mon Sep 17 00:00:00 2001 From: Connor Olding Date: Sat, 29 Aug 2020 15:13:59 +0200 Subject: [PATCH] even more reorganization --- respodns/__main__.py | 202 +------------------------------------------ respodns/dns.py | 150 ++++++++++++++++++++++++++++++++ respodns/ui.py | 57 ++++++++++++ 3 files changed, 210 insertions(+), 199 deletions(-) create mode 100644 respodns/ui.py diff --git a/respodns/__main__.py b/respodns/__main__.py index 5d83e5a..7aaa6f3 100644 --- a/respodns/__main__.py +++ b/respodns/__main__.py @@ -1,205 +1,9 @@ #!/usr/bin/env python3 -from .db import RespoDB -from .dns import getaddrs, detect_gfw -from .ip_util import addr_to_int, read_ips -from .ips import blocks, is_bogon -from .structs import Options, Entry -from .util import right_now, make_pooler -from asyncio import run, sleep -from sys import argv, stdin, stderr, exit -import respodns.checks as chk - -def process_result(res, ip, check, opts: Options): - # TODO: get more accurate times by inserting start-end into getaddrs. - now = right_now() - assert len(res) > 0 - reason = None - - if "Timeout" in res: - reason = "timeout" - - elif check.kind.startswith("bad"): - reason = "okay" if "NXDOMAIN" in res else "redirect" - - elif any(is_bogon(r) for r in res): - reason = "block" - elif any(blocked in res for blocked in blocks): - reason = "block" - - elif not any(len(r) > 0 and r[0].isdigit() for r in res): - # TODO: check for no alias on common. - reason = "missing" - - else: - for r in res: - if len(r) == 0 or not r[0].isdigit(): - continue - if detect_gfw(r, ip, check): - reason = "gfw" - break - else: - reason = "okay" - - assert reason is not None, (res, ip, check) - - addrs = list(filter(lambda r: len(r) > 0 and r[0].isdigit(), res)) - exception = res[0] if len(addrs) == 0 else None - - return Entry( - date=now, - success=reason == "okay", - server=ip, - kind=check.kind, - domain=check.domain, - exception=exception, - addrs=addrs, - reason=reason, - execution=opts.execution, - ) - -async def try_ip(db, server_ip, checks, opts: Options): - entries = [] - - success = True - def finisher(done, pending): - nonlocal success - for task in done: - res, ip, check = task.result() - entry = process_result(res, ip, check, opts) - entries.append(entry) - if not entry.success: - if opts.early_stopping and success: # only cancel once - for pend in pending: - #print("CANCEL", file=stderr) - # FIXME: this can still, somehow, cancel the main function. - pend.cancel() - success = False - - pooler = make_pooler(opts.domain_simul, finisher) - - async def getaddrs_wrapper(ip, check): - # NOTE: could put right_now() stuff here! - # TODO: add duration field given in milliseconds (integer) - # by subtracting start and end datetimes. - res = await getaddrs(ip, check.domain, opts) - return res, ip, check - - for i, check in enumerate(checks): - first = i == 0 - if not first: - await sleep(opts.domain_wait) - await pooler(getaddrs_wrapper(server_ip, check)) - if first: - # limit to one connection for the first check. - await pooler() - if not success: - if opts.early_stopping or first: - break - else: - await pooler() - - if not opts.dry: - for entry in entries: - db.push_entry(entry) - db.commit() - - if not success: - first_failure = None - assert len(entries) > 0 - for entry in entries: - #print(entry, file=stderr) - if not entry.success: - first_failure = entry - break - else: - assert 0, ("no failures found:", entries) - return server_ip, first_failure - return server_ip, None - -async def main(db, filepath, checks, opts: Options): - def finisher(done, pending): - for task in done: - ip, first_failure = task.result() - if first_failure is None: - print(ip) - elif opts.dry: - ff = first_failure - if ff.kind in ("shock", "adware"): - print(ip, ff.reason, ff.kind, sep="\t") - else: - print(ip, ff.reason, ff.kind, ff.domain, sep="\t") - - pooler = make_pooler(opts.ip_simul, finisher) - - f = stdin if filepath == "" else open(filepath, "r") - for i, ip in enumerate(read_ips(f)): - first = i == 0 - if opts.progress: - print(f"#{i}: {ip}", file=stderr) - stderr.flush() - if not first: - await sleep(opts.ip_wait) - await pooler(try_ip(db, ip, checks, opts)) - if f != stdin: - f.close() - - await pooler() - -def ui(program, args): - from argparse import ArgumentParser - - name = "respodns6" - parser = ArgumentParser(name, - description=name + ": test and log DNS records") - - # TODO: support multiple paths. nargs="+", iterate with pooling? - parser.add_argument( - "path", metavar="file-path", - help="a path to a file containing IPv4 addresses which host DNS servers") - - parser.add_argument( - "--database", - help="specify database for logging") - - a = parser.parse_args(args) - - checks = [] - checks += chk.first - #checks += chk.new - checks += chk.likely - #checks += chk.unlikely - #checks += chk.top100 - - opts = Options() - opts.dry = a.database is None - opts.early_stopping = opts.dry - - if a.database is not None: - if a.database.startswith("sqlite:"): - uri = a.database - else: - uri = "sqlite:///" + a.database - - def runwrap(db, debug=False): - if debug: - import logging - logging.basicConfig(level=logging.DEBUG) - run(main(db, a.path, checks, opts), debug=True) - else: - run(main(db, a.path, checks, opts)) - - if opts.dry: - runwrap(None) - else: - # log to a database. - db = RespoDB(uri, create=True) - with db: # TODO: .open and .close methods for manual invocation. - with db.execution as execution: # TODO: clean up this interface. - opts.execution = execution - runwrap(db) - if __name__ == "__main__": + from sys import argv, stderr + from .ui import ui + if len(argv) == 0: print("You've met with a terrible fate.", file=stderr) ret = 2 diff --git a/respodns/dns.py b/respodns/dns.py index 27cdf7d..7b2d1ab 100644 --- a/respodns/dns.py +++ b/respodns/dns.py @@ -1,3 +1,5 @@ +from .structs import Options + def detect_gfw(r, ip, check): # attempt to detect interference from the Great Firewall of China. #from .ips import china @@ -51,3 +53,151 @@ async def getaddrs(server, domain, opts): except Timeout: return ["Timeout"] return sorted(set(rr.address for rr in ans.rrset), key=ipkey) + +def process_result(res, ip, check, opts: Options): + from .ips import is_bogon, blocks + from .util import right_now + from .structs import Entry + + # TODO: get more accurate times by inserting start-end into getaddrs. + now = right_now() + assert len(res) > 0 + reason = None + + if "Timeout" in res: + reason = "timeout" + + elif check.kind.startswith("bad"): + reason = "okay" if "NXDOMAIN" in res else "redirect" + + elif any(is_bogon(r) for r in res): + reason = "block" + elif any(blocked in res for blocked in blocks): + reason = "block" + + elif not any(len(r) > 0 and r[0].isdigit() for r in res): + # TODO: check for no alias on common. + reason = "missing" + + else: + for r in res: + if len(r) == 0 or not r[0].isdigit(): + continue + if detect_gfw(r, ip, check): + reason = "gfw" + break + else: + reason = "okay" + + assert reason is not None, (res, ip, check) + + addrs = list(filter(lambda r: len(r) > 0 and r[0].isdigit(), res)) + exception = res[0] if len(addrs) == 0 else None + + return Entry( + date=now, + success=reason == "okay", + server=ip, + kind=check.kind, + domain=check.domain, + exception=exception, + addrs=addrs, + reason=reason, + execution=opts.execution, + ) + +async def try_ip(db, server_ip, checks, opts: Options): + from .util import make_pooler + from asyncio import sleep + + entries = [] + + success = True + def finisher(done, pending): + nonlocal success + for task in done: + res, ip, check = task.result() + entry = process_result(res, ip, check, opts) + entries.append(entry) + if not entry.success: + if opts.early_stopping and success: # only cancel once + for pend in pending: + #print("CANCEL", file=stderr) + # FIXME: this can still, somehow, cancel the main function. + pend.cancel() + success = False + + pooler = make_pooler(opts.domain_simul, finisher) + + async def getaddrs_wrapper(ip, check): + # NOTE: could put right_now() stuff here! + # TODO: add duration field given in milliseconds (integer) + # by subtracting start and end datetimes. + res = await getaddrs(ip, check.domain, opts) + return res, ip, check + + for i, check in enumerate(checks): + first = i == 0 + if not first: + await sleep(opts.domain_wait) + await pooler(getaddrs_wrapper(server_ip, check)) + if first: + # limit to one connection for the first check. + await pooler() + if not success: + if opts.early_stopping or first: + break + else: + await pooler() + + if not opts.dry: + for entry in entries: + db.push_entry(entry) + db.commit() + + if not success: + first_failure = None + assert len(entries) > 0 + for entry in entries: + #print(entry, file=stderr) + if not entry.success: + first_failure = entry + break + else: + assert 0, ("no failures found:", entries) + return server_ip, first_failure + return server_ip, None + +async def main(db, filepath, checks, opts: Options): + from .ip_util import read_ips + from .util import make_pooler + from asyncio import sleep + from sys import stdin + + def finisher(done, pending): + for task in done: + ip, first_failure = task.result() + if first_failure is None: + print(ip) + elif opts.dry: + ff = first_failure + if ff.kind in ("shock", "adware"): + print(ip, ff.reason, ff.kind, sep="\t") + else: + print(ip, ff.reason, ff.kind, ff.domain, sep="\t") + + pooler = make_pooler(opts.ip_simul, finisher) + + f = stdin if filepath == "" else open(filepath, "r") + for i, ip in enumerate(read_ips(f)): + first = i == 0 + if opts.progress: + print(f"#{i}: {ip}", file=stderr) + stderr.flush() + if not first: + await sleep(opts.ip_wait) + await pooler(try_ip(db, ip, checks, opts)) + if f != stdin: + f.close() + + await pooler() diff --git a/respodns/ui.py b/respodns/ui.py new file mode 100644 index 0000000..598fa2c --- /dev/null +++ b/respodns/ui.py @@ -0,0 +1,57 @@ +def ui(program, args): + from .db import RespoDB + from .dns import main + from .structs import Options + from argparse import ArgumentParser + from asyncio import run + import respodns.checks as chk + + name = "respodns6" + parser = ArgumentParser(name, + description=name + ": test and log DNS records") + + # TODO: support multiple paths. nargs="+", iterate with pooling? + parser.add_argument( + "path", metavar="file-path", + help="a path to a file containing IPv4 addresses which host DNS servers") + + parser.add_argument( + "--database", + help="specify database for logging") + + a = parser.parse_args(args) + + checks = [] + checks += chk.first + #checks += chk.new + checks += chk.likely + #checks += chk.unlikely + #checks += chk.top100 + + opts = Options() + opts.dry = a.database is None + opts.early_stopping = opts.dry + + if a.database is not None: + if a.database.startswith("sqlite:"): + uri = a.database + else: + uri = "sqlite:///" + a.database + + def runwrap(db, debug=False): + if debug: + import logging + logging.basicConfig(level=logging.DEBUG) + run(main(db, a.path, checks, opts), debug=True) + else: + run(main(db, a.path, checks, opts)) + + if opts.dry: + runwrap(None) + else: + # log to a database. + db = RespoDB(uri, create=True) + with db: # TODO: .open and .close methods for manual invocation. + with db.execution as execution: # TODO: clean up this interface. + opts.execution = execution + runwrap(db)