even more reorganization

This commit is contained in:
Connor Olding 2020-08-29 15:13:59 +02:00
parent 85045f397f
commit 2a00288a9c
3 changed files with 210 additions and 199 deletions

View file

@ -1,205 +1,9 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from .db import RespoDB
from .dns import getaddrs, detect_gfw
from .ip_util import addr_to_int, read_ips
from .ips import blocks, is_bogon
from .structs import Options, Entry
from .util import right_now, make_pooler
from asyncio import run, sleep
from sys import argv, stdin, stderr, exit
import respodns.checks as chk
def process_result(res, ip, check, opts: Options):
# TODO: get more accurate times by inserting start-end into getaddrs.
now = right_now()
assert len(res) > 0
reason = None
if "Timeout" in res:
reason = "timeout"
elif check.kind.startswith("bad"):
reason = "okay" if "NXDOMAIN" in res else "redirect"
elif any(is_bogon(r) for r in res):
reason = "block"
elif any(blocked in res for blocked in blocks):
reason = "block"
elif not any(len(r) > 0 and r[0].isdigit() for r in res):
# TODO: check for no alias on common.
reason = "missing"
else:
for r in res:
if len(r) == 0 or not r[0].isdigit():
continue
if detect_gfw(r, ip, check):
reason = "gfw"
break
else:
reason = "okay"
assert reason is not None, (res, ip, check)
addrs = list(filter(lambda r: len(r) > 0 and r[0].isdigit(), res))
exception = res[0] if len(addrs) == 0 else None
return Entry(
date=now,
success=reason == "okay",
server=ip,
kind=check.kind,
domain=check.domain,
exception=exception,
addrs=addrs,
reason=reason,
execution=opts.execution,
)
async def try_ip(db, server_ip, checks, opts: Options):
entries = []
success = True
def finisher(done, pending):
nonlocal success
for task in done:
res, ip, check = task.result()
entry = process_result(res, ip, check, opts)
entries.append(entry)
if not entry.success:
if opts.early_stopping and success: # only cancel once
for pend in pending:
#print("CANCEL", file=stderr)
# FIXME: this can still, somehow, cancel the main function.
pend.cancel()
success = False
pooler = make_pooler(opts.domain_simul, finisher)
async def getaddrs_wrapper(ip, check):
# NOTE: could put right_now() stuff here!
# TODO: add duration field given in milliseconds (integer)
# by subtracting start and end datetimes.
res = await getaddrs(ip, check.domain, opts)
return res, ip, check
for i, check in enumerate(checks):
first = i == 0
if not first:
await sleep(opts.domain_wait)
await pooler(getaddrs_wrapper(server_ip, check))
if first:
# limit to one connection for the first check.
await pooler()
if not success:
if opts.early_stopping or first:
break
else:
await pooler()
if not opts.dry:
for entry in entries:
db.push_entry(entry)
db.commit()
if not success:
first_failure = None
assert len(entries) > 0
for entry in entries:
#print(entry, file=stderr)
if not entry.success:
first_failure = entry
break
else:
assert 0, ("no failures found:", entries)
return server_ip, first_failure
return server_ip, None
async def main(db, filepath, checks, opts: Options):
def finisher(done, pending):
for task in done:
ip, first_failure = task.result()
if first_failure is None:
print(ip)
elif opts.dry:
ff = first_failure
if ff.kind in ("shock", "adware"):
print(ip, ff.reason, ff.kind, sep="\t")
else:
print(ip, ff.reason, ff.kind, ff.domain, sep="\t")
pooler = make_pooler(opts.ip_simul, finisher)
f = stdin if filepath == "" else open(filepath, "r")
for i, ip in enumerate(read_ips(f)):
first = i == 0
if opts.progress:
print(f"#{i}: {ip}", file=stderr)
stderr.flush()
if not first:
await sleep(opts.ip_wait)
await pooler(try_ip(db, ip, checks, opts))
if f != stdin:
f.close()
await pooler()
def ui(program, args):
from argparse import ArgumentParser
name = "respodns6"
parser = ArgumentParser(name,
description=name + ": test and log DNS records")
# TODO: support multiple paths. nargs="+", iterate with pooling?
parser.add_argument(
"path", metavar="file-path",
help="a path to a file containing IPv4 addresses which host DNS servers")
parser.add_argument(
"--database",
help="specify database for logging")
a = parser.parse_args(args)
checks = []
checks += chk.first
#checks += chk.new
checks += chk.likely
#checks += chk.unlikely
#checks += chk.top100
opts = Options()
opts.dry = a.database is None
opts.early_stopping = opts.dry
if a.database is not None:
if a.database.startswith("sqlite:"):
uri = a.database
else:
uri = "sqlite:///" + a.database
def runwrap(db, debug=False):
if debug:
import logging
logging.basicConfig(level=logging.DEBUG)
run(main(db, a.path, checks, opts), debug=True)
else:
run(main(db, a.path, checks, opts))
if opts.dry:
runwrap(None)
else:
# log to a database.
db = RespoDB(uri, create=True)
with db: # TODO: .open and .close methods for manual invocation.
with db.execution as execution: # TODO: clean up this interface.
opts.execution = execution
runwrap(db)
if __name__ == "__main__": if __name__ == "__main__":
from sys import argv, stderr
from .ui import ui
if len(argv) == 0: if len(argv) == 0:
print("You've met with a terrible fate.", file=stderr) print("You've met with a terrible fate.", file=stderr)
ret = 2 ret = 2

View file

@ -1,3 +1,5 @@
from .structs import Options
def detect_gfw(r, ip, check): def detect_gfw(r, ip, check):
# attempt to detect interference from the Great Firewall of China. # attempt to detect interference from the Great Firewall of China.
#from .ips import china #from .ips import china
@ -51,3 +53,151 @@ async def getaddrs(server, domain, opts):
except Timeout: except Timeout:
return ["Timeout"] return ["Timeout"]
return sorted(set(rr.address for rr in ans.rrset), key=ipkey) return sorted(set(rr.address for rr in ans.rrset), key=ipkey)
def process_result(res, ip, check, opts: Options):
from .ips import is_bogon, blocks
from .util import right_now
from .structs import Entry
# TODO: get more accurate times by inserting start-end into getaddrs.
now = right_now()
assert len(res) > 0
reason = None
if "Timeout" in res:
reason = "timeout"
elif check.kind.startswith("bad"):
reason = "okay" if "NXDOMAIN" in res else "redirect"
elif any(is_bogon(r) for r in res):
reason = "block"
elif any(blocked in res for blocked in blocks):
reason = "block"
elif not any(len(r) > 0 and r[0].isdigit() for r in res):
# TODO: check for no alias on common.
reason = "missing"
else:
for r in res:
if len(r) == 0 or not r[0].isdigit():
continue
if detect_gfw(r, ip, check):
reason = "gfw"
break
else:
reason = "okay"
assert reason is not None, (res, ip, check)
addrs = list(filter(lambda r: len(r) > 0 and r[0].isdigit(), res))
exception = res[0] if len(addrs) == 0 else None
return Entry(
date=now,
success=reason == "okay",
server=ip,
kind=check.kind,
domain=check.domain,
exception=exception,
addrs=addrs,
reason=reason,
execution=opts.execution,
)
async def try_ip(db, server_ip, checks, opts: Options):
from .util import make_pooler
from asyncio import sleep
entries = []
success = True
def finisher(done, pending):
nonlocal success
for task in done:
res, ip, check = task.result()
entry = process_result(res, ip, check, opts)
entries.append(entry)
if not entry.success:
if opts.early_stopping and success: # only cancel once
for pend in pending:
#print("CANCEL", file=stderr)
# FIXME: this can still, somehow, cancel the main function.
pend.cancel()
success = False
pooler = make_pooler(opts.domain_simul, finisher)
async def getaddrs_wrapper(ip, check):
# NOTE: could put right_now() stuff here!
# TODO: add duration field given in milliseconds (integer)
# by subtracting start and end datetimes.
res = await getaddrs(ip, check.domain, opts)
return res, ip, check
for i, check in enumerate(checks):
first = i == 0
if not first:
await sleep(opts.domain_wait)
await pooler(getaddrs_wrapper(server_ip, check))
if first:
# limit to one connection for the first check.
await pooler()
if not success:
if opts.early_stopping or first:
break
else:
await pooler()
if not opts.dry:
for entry in entries:
db.push_entry(entry)
db.commit()
if not success:
first_failure = None
assert len(entries) > 0
for entry in entries:
#print(entry, file=stderr)
if not entry.success:
first_failure = entry
break
else:
assert 0, ("no failures found:", entries)
return server_ip, first_failure
return server_ip, None
async def main(db, filepath, checks, opts: Options):
from .ip_util import read_ips
from .util import make_pooler
from asyncio import sleep
from sys import stdin
def finisher(done, pending):
for task in done:
ip, first_failure = task.result()
if first_failure is None:
print(ip)
elif opts.dry:
ff = first_failure
if ff.kind in ("shock", "adware"):
print(ip, ff.reason, ff.kind, sep="\t")
else:
print(ip, ff.reason, ff.kind, ff.domain, sep="\t")
pooler = make_pooler(opts.ip_simul, finisher)
f = stdin if filepath == "" else open(filepath, "r")
for i, ip in enumerate(read_ips(f)):
first = i == 0
if opts.progress:
print(f"#{i}: {ip}", file=stderr)
stderr.flush()
if not first:
await sleep(opts.ip_wait)
await pooler(try_ip(db, ip, checks, opts))
if f != stdin:
f.close()
await pooler()

57
respodns/ui.py Normal file
View file

@ -0,0 +1,57 @@
def ui(program, args):
from .db import RespoDB
from .dns import main
from .structs import Options
from argparse import ArgumentParser
from asyncio import run
import respodns.checks as chk
name = "respodns6"
parser = ArgumentParser(name,
description=name + ": test and log DNS records")
# TODO: support multiple paths. nargs="+", iterate with pooling?
parser.add_argument(
"path", metavar="file-path",
help="a path to a file containing IPv4 addresses which host DNS servers")
parser.add_argument(
"--database",
help="specify database for logging")
a = parser.parse_args(args)
checks = []
checks += chk.first
#checks += chk.new
checks += chk.likely
#checks += chk.unlikely
#checks += chk.top100
opts = Options()
opts.dry = a.database is None
opts.early_stopping = opts.dry
if a.database is not None:
if a.database.startswith("sqlite:"):
uri = a.database
else:
uri = "sqlite:///" + a.database
def runwrap(db, debug=False):
if debug:
import logging
logging.basicConfig(level=logging.DEBUG)
run(main(db, a.path, checks, opts), debug=True)
else:
run(main(db, a.path, checks, opts))
if opts.dry:
runwrap(None)
else:
# log to a database.
db = RespoDB(uri, create=True)
with db: # TODO: .open and .close methods for manual invocation.
with db.execution as execution: # TODO: clean up this interface.
opts.execution = execution
runwrap(db)