From b26a4e764630edeb4014e2dec1c37a0da1a4b8d2 Mon Sep 17 00:00:00 2001 From: Connor Olding Date: Fri, 4 Sep 2020 13:36:59 +0200 Subject: [PATCH] rewrite IpReader iterator to accumulate IPs before yielding still needs work --- respodns/dns.py | 4 ++-- respodns/ip_util.py | 32 +++++++++++++++++--------------- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/respodns/dns.py b/respodns/dns.py index ba33095..8f518db 100644 --- a/respodns/dns.py +++ b/respodns/dns.py @@ -225,7 +225,7 @@ async def main(db, filepaths, checks, opts: Options): for i, ip in enumerate(read_ips(f)): first = i == 0 if opts.progress: - print(f"#{i}: {ip}", file=stderr) + print(f"#{i + 1}: {ip}", file=stderr) stderr.flush() if not first: await sleep(opts.ip_wait) @@ -239,7 +239,7 @@ async def main(db, filepaths, checks, opts: Options): for i, ip in enumerate(reader): first = i == 0 if opts.progress: - print(f"#{i}/{reader.total}: {ip}", file=stderr) + print(f"#{i + 1}/{reader.total}: {ip}", file=stderr) stderr.flush() if not first: await sleep(opts.ip_wait) diff --git a/respodns/ip_util.py b/respodns/ip_util.py index f56cf81..e9ef747 100644 --- a/respodns/ip_util.py +++ b/respodns/ip_util.py @@ -52,27 +52,29 @@ class IpReader: self.threads = [] self.total = 0 - def running(self): + def is_running(self): return any(thread.is_alive() for thread in self.threads) def __iter__(self): - return self - - def __next__(self): - # TODO: rewrite such that self.total is useful. (get as many at once) from queue import Empty + from sys import stderr - while self.running() or not self.queue.empty(): - try: - res = self.queue.get(block=True, timeout=1.0) - if res is not None: - self.total += 1 - return res - except Empty: - from sys import stderr - print("blocking on IpReader", file=stderr) + def _next(): + while self.is_running() or not self.queue.empty(): + results = [] + if self.queue.empty(): + try: + results.append(self.queue.get(timeout=1.0)) + except Empty: + print("blocking on IpReader", file=stderr) + else: + while not self.queue.empty(): + results.append(self.queue.get()) + self.total += len(results) + for res in results: + yield res - raise StopIteration + return _next() def __enter__(self): from threading import Thread