make country code lookup a background task

This commit is contained in:
Connor Olding 2020-09-05 03:45:05 +02:00
parent 52f315b395
commit c65b465560
2 changed files with 34 additions and 17 deletions

View file

@ -111,14 +111,23 @@ async def try_ip(db, server_ip, checks, opts: Options):
from asyncio import sleep
entries = []
deferred = []
success = True
def maybe_put_ip(ip):
from asyncio import QueueFull
try:
opts.ips.put_nowait(ip)
except QueueFull:
deferred.append(ip)
def finisher(done, pending):
nonlocal success
for task in done:
res, ip, check = task.result()
entry = process_result(res, ip, check, opts)
map(maybe_put_ip, entry.addrs)
entries.append(entry)
if not entry.success:
if opts.early_stopping and success: # only cancel once
@ -151,12 +160,8 @@ async def try_ip(db, server_ip, checks, opts: Options):
else:
await pooler()
if opts.ipinfo is not None:
await opts.ipinfo.find_country(server_ip, db)
for entry in entries:
for addr in entry.addrs:
await opts.ipinfo.find_country(addr, db)
opts.ipinfo.flush()
for ip in deferred:
await opts.ips.put(ip)
if not opts.dry:
for entry in entries:
@ -179,27 +184,34 @@ async def try_ip(db, server_ip, checks, opts: Options):
async def sync_database(db, opts: Options):
from .ips import china, blocks
if db is None:
return
# TODO: handle addresses that were removed from respodns.ips.china.
for ips, kw in ((china, "china"), (blocks, "block_target")):
for ip in ips:
kwargs = dict()
kwargs[kw] = True
if opts.ipinfo is not None:
kwargs["country_code"] = await opts.ipinfo.find_country(ip)
db.modify_address(ip, **kwargs)
if opts.ipinfo is not None:
opts.ipinfo.flush()
kwargs = {kw: True}
if db is None:
db.modify_address(ip, **kwargs)
await opts.ips.put(ip)
async def locate_ips(db, opts: Options):
seen = set()
while (ip := await opts.ips.get()) is not None:
if opts.ipinfo is not None and ip not in seen:
seen.add(ip)
code = await opts.ipinfo.find_country(ip)
if db is not None:
db.modify_address(ip, country_code=code)
opts.ipinfo.flush()
async def main(db, filepaths, checks, opts: Options):
from .util import make_pooler
from asyncio import sleep, create_task
from asyncio import sleep, create_task, Queue
from sys import stdin, stderr
opts.ips = Queue()
syncing = create_task(sync_database(db, opts))
geoip = create_task(locate_ips(db, opts))
def finisher(done, pending):
for task in done:
@ -229,6 +241,7 @@ async def main(db, filepaths, checks, opts: Options):
stderr.flush()
if not first:
await sleep(opts.ip_wait)
await opts.ips.put(ip)
await pooler(try_ip(db, ip, checks, opts))
if opts.blocking_file_io:
@ -249,3 +262,5 @@ async def main(db, filepaths, checks, opts: Options):
await pooler()
await syncing
await opts.ips.put(None) # end of queue
await geoip

View file

@ -4,8 +4,10 @@ from dataclasses import dataclass
@dataclass
class Options:
# TODO: move these out of Options, since they're really not.
execution: object = None
ipinfo: object = None
ips: object = None
ip_simul: int = 15 # how many IPs to connect to at once
domain_simul: int = 3 # how many domains per IP to request at once