import re ipv4_pattern = re.compile(r"(\d+)\.(\d+)\.(\d+)\.(\d+)", re.ASCII) def read_ips(f): # TODO: make more robust. (regex pls) for ip in f.readlines(): if "#" in ip: ip, _, _ = ip.partition("#") ip = ip.strip() if ip.count(".") != 3: continue yield ip def addr_to_int(ip): match = ipv4_pattern.fullmatch(ip) assert match is not None, row segs = list(map(int, match.group(1, 2, 3, 4))) assert all(0 <= seg <= 255 for seg in segs), match.group(0) numeric = segs[0] << 24 | segs[1] << 16 | segs[2] << 8 | segs[3] return numeric def ipkey(ip_string): # this is more lenient than addr_to_int. segs = [int(s) for s in ip_string.replace(":", ".").split(".")] return sum(256**(3 - i) * seg for i, seg in enumerate(segs)) def ip_reader_worker(fp, queue): from io import IOBase needs_closing = not isinstance(fp, IOBase) f = open(fp, "r") if needs_closing else fp try: for ip in read_ips(f): queue.put(ip) finally: if needs_closing: f.close() class IpReader: def __init__(self, *paths_and_handles): from queue import Queue self.fps = paths_and_handles self.queue = Queue() self.threads = [] self.total = 0 def is_running(self): return any(thread.is_alive() for thread in self.threads) def __iter__(self): from queue import Empty from sys import stderr def _next(): while self.is_running() or not self.queue.empty(): results = [] if self.queue.empty(): try: results.append(self.queue.get(timeout=1.0)) except Empty: print("blocking on IpReader", file=stderr) else: while not self.queue.empty(): results.append(self.queue.get()) self.total += len(results) for res in results: yield res return _next() def __enter__(self): from threading import Thread for fp in self.fps: thread = Thread(target=ip_reader_worker, args=(fp, self.queue)) self.threads.append(thread) thread.start() return self def __exit__(self, exc_type, exc_value, traceback): pass