respodns/respodns/ip_util.py
Connor Olding 70308230e6 write and use async iterator for IpReader
also fixes the counter in the blocking version when progress is enabled
2020-09-04 14:17:04 +02:00

113 lines
3.2 KiB
Python

import re
ipv4_pattern = re.compile(r"(\d+)\.(\d+)\.(\d+)\.(\d+)", re.ASCII)
def read_ips(f):
# TODO: make more robust. (regex pls)
for ip in f.readlines():
if "#" in ip:
ip, _, _ = ip.partition("#")
ip = ip.strip()
if ip.count(".") != 3:
continue
yield ip
def addr_to_int(ip):
match = ipv4_pattern.fullmatch(ip)
assert match is not None, row
segs = list(map(int, match.group(1, 2, 3, 4)))
assert all(0 <= seg <= 255 for seg in segs), match.group(0)
numeric = segs[0] << 24 | segs[1] << 16 | segs[2] << 8 | segs[3]
return numeric
def ipkey(ip_string):
# this is more lenient than addr_to_int.
segs = [int(s) for s in ip_string.replace(":", ".").split(".")]
return sum(256**(3 - i) * seg for i, seg in enumerate(segs))
def ip_reader_worker(fp, queue):
from io import IOBase
needs_closing = not isinstance(fp, IOBase)
f = open(fp, "r") if needs_closing else fp
try:
for ip in read_ips(f):
queue.put(ip)
finally:
if needs_closing:
f.close()
class IpReader:
def __init__(self, *paths_and_handles):
from queue import SimpleQueue
self.fps = paths_and_handles
self.queue = SimpleQueue()
self.threads = []
self.total = 0
def is_running(self):
return any(thread.is_alive() for thread in self.threads)
def __iter__(self):
from queue import SimpleQueue, Empty
from sys import stderr
def _next():
results = SimpleQueue()
while self.is_running() or not self.queue.empty():
if self.queue.empty() and results.empty():
try:
results.put(self.queue.get(timeout=1.0))
self.total += 1
except Empty:
pass
while not self.queue.empty():
results.put(self.queue.get())
self.total += 1
if not results.empty():
yield results.get()
while not results.empty():
yield results.get()
return _next()
def __aiter__(self):
from asyncio import sleep
from queue import SimpleQueue
from sys import stderr
async def _next():
results = SimpleQueue()
while self.is_running() or not self.queue.empty():
if self.queue.empty() and results.empty():
await sleep(0.1) # this incurs some latency, but alas...
while not self.queue.empty():
results.put(self.queue.get())
self.total += 1
if not results.empty():
yield results.get()
while not results.empty():
yield results.get()
return _next()
def __enter__(self):
from threading import Thread
for fp in self.fps:
thread = Thread(target=ip_reader_worker, args=(fp, self.queue))
self.threads.append(thread)
thread.start()
return self
def __exit__(self, exc_type, exc_value, traceback):
pass