import re ipv4_pattern = re.compile(r"(\d+)\.(\d+)\.(\d+)\.(\d+)", re.ASCII) ipv4_pattern_strict = re.compile(r""" (?:^|(?<=[^\d.])) (0|1\d\d|2[0-4]\d|25[0-5]|[1-9]\d?) \. (0|1\d\d|2[0-4]\d|25[0-5]|[1-9]\d?) \. (0|1\d\d|2[0-4]\d|25[0-5]|[1-9]\d?) \. (0|1\d\d|2[0-4]\d|25[0-5]|[1-9]\d?) (?:$|(?=[^\d.])) """, re.ASCII | re.VERBOSE) def read_ips(f): for line in f.readlines(): line, _, _ = line.partition("#") # ignore comments for match in ipv4_pattern_strict.finditer(line): yield match.group() # yield the entire string def addr_to_int(ip): match = ipv4_pattern.fullmatch(ip) assert match is not None, ip segs = list(map(int, match.group(1, 2, 3, 4))) assert all(0 <= seg <= 255 for seg in segs), match.group(0) numeric = segs[0] << 24 | segs[1] << 16 | segs[2] << 8 | segs[3] return numeric def ipkey(ip_string): # this is more lenient than addr_to_int. segs = [int(s) for s in ip_string.replace(":", ".").split(".")] return sum(256**(3 - i) * seg for i, seg in enumerate(segs)) def ip_reader_worker(fp, queue): from io import IOBase needs_closing = not isinstance(fp, IOBase) f = open(fp, "r") if needs_closing else fp try: for ip in read_ips(f): queue.put(ip) finally: if needs_closing: f.close() class IpReader: def __init__(self, *paths_and_handles): from queue import SimpleQueue self.fps = paths_and_handles self.queue = SimpleQueue() self.threads = [] self.total = 0 def is_running(self): return any(thread.is_alive() for thread in self.threads) def __iter__(self): from queue import SimpleQueue, Empty def _next(): results = SimpleQueue() while self.is_running() or not self.queue.empty(): if self.queue.empty() and results.empty(): try: results.put(self.queue.get(timeout=1.0)) self.total += 1 except Empty: pass while not self.queue.empty(): results.put(self.queue.get()) self.total += 1 if not results.empty(): yield results.get() while not results.empty(): yield results.get() return _next() def __aiter__(self): from asyncio import sleep from queue import SimpleQueue async def _next(): results = SimpleQueue() while self.is_running() or not self.queue.empty(): if self.queue.empty() and results.empty(): await sleep(0.1) # this incurs some latency, but alas... while not self.queue.empty(): results.put(self.queue.get()) self.total += 1 if not results.empty(): yield results.get() while not results.empty(): yield results.get() return _next() def __enter__(self): from threading import Thread for fp in self.fps: thread = Thread(target=ip_reader_worker, args=(fp, self.queue)) self.threads.append(thread) thread.start() return self def __exit__(self, exc_type, exc_value, traceback): pass