From c65b4655605a54831db53f77689054dd2b047bc8 Mon Sep 17 00:00:00 2001 From: Connor Olding Date: Sat, 5 Sep 2020 03:45:05 +0200 Subject: [PATCH] make country code lookup a background task --- respodns/dns.py | 49 +++++++++++++++++++++++++++++---------------- respodns/structs.py | 2 ++ 2 files changed, 34 insertions(+), 17 deletions(-) diff --git a/respodns/dns.py b/respodns/dns.py index 25661cf..026a780 100644 --- a/respodns/dns.py +++ b/respodns/dns.py @@ -111,14 +111,23 @@ async def try_ip(db, server_ip, checks, opts: Options): from asyncio import sleep entries = [] + deferred = [] success = True + def maybe_put_ip(ip): + from asyncio import QueueFull + try: + opts.ips.put_nowait(ip) + except QueueFull: + deferred.append(ip) + def finisher(done, pending): nonlocal success for task in done: res, ip, check = task.result() entry = process_result(res, ip, check, opts) + map(maybe_put_ip, entry.addrs) entries.append(entry) if not entry.success: if opts.early_stopping and success: # only cancel once @@ -151,12 +160,8 @@ async def try_ip(db, server_ip, checks, opts: Options): else: await pooler() - if opts.ipinfo is not None: - await opts.ipinfo.find_country(server_ip, db) - for entry in entries: - for addr in entry.addrs: - await opts.ipinfo.find_country(addr, db) - opts.ipinfo.flush() + for ip in deferred: + await opts.ips.put(ip) if not opts.dry: for entry in entries: @@ -179,27 +184,34 @@ async def try_ip(db, server_ip, checks, opts: Options): async def sync_database(db, opts: Options): from .ips import china, blocks - if db is None: - return - # TODO: handle addresses that were removed from respodns.ips.china. for ips, kw in ((china, "china"), (blocks, "block_target")): for ip in ips: - kwargs = dict() - kwargs[kw] = True - if opts.ipinfo is not None: - kwargs["country_code"] = await opts.ipinfo.find_country(ip) - db.modify_address(ip, **kwargs) - if opts.ipinfo is not None: - opts.ipinfo.flush() + kwargs = {kw: True} + if db is None: + db.modify_address(ip, **kwargs) + await opts.ips.put(ip) + + +async def locate_ips(db, opts: Options): + seen = set() + while (ip := await opts.ips.get()) is not None: + if opts.ipinfo is not None and ip not in seen: + seen.add(ip) + code = await opts.ipinfo.find_country(ip) + if db is not None: + db.modify_address(ip, country_code=code) + opts.ipinfo.flush() async def main(db, filepaths, checks, opts: Options): from .util import make_pooler - from asyncio import sleep, create_task + from asyncio import sleep, create_task, Queue from sys import stdin, stderr + opts.ips = Queue() syncing = create_task(sync_database(db, opts)) + geoip = create_task(locate_ips(db, opts)) def finisher(done, pending): for task in done: @@ -229,6 +241,7 @@ async def main(db, filepaths, checks, opts: Options): stderr.flush() if not first: await sleep(opts.ip_wait) + await opts.ips.put(ip) await pooler(try_ip(db, ip, checks, opts)) if opts.blocking_file_io: @@ -249,3 +262,5 @@ async def main(db, filepaths, checks, opts: Options): await pooler() await syncing + await opts.ips.put(None) # end of queue + await geoip diff --git a/respodns/structs.py b/respodns/structs.py index bf8f937..a2d4ce8 100644 --- a/respodns/structs.py +++ b/respodns/structs.py @@ -4,8 +4,10 @@ from dataclasses import dataclass @dataclass class Options: + # TODO: move these out of Options, since they're really not. execution: object = None ipinfo: object = None + ips: object = None ip_simul: int = 15 # how many IPs to connect to at once domain_simul: int = 3 # how many domains per IP to request at once