respodns/respodns/ip_util.py

118 lines
3.3 KiB
Python

import re
ipv4_pattern = re.compile(r"(\d+)\.(\d+)\.(\d+)\.(\d+)", re.ASCII)
ipv4_pattern_strict = re.compile(r"""
(?:^|(?<=[^\d.]))
(0|1\d\d|2[0-4]\d|25[0-5]|[1-9]\d?)
\.
(0|1\d\d|2[0-4]\d|25[0-5]|[1-9]\d?)
\.
(0|1\d\d|2[0-4]\d|25[0-5]|[1-9]\d?)
\.
(0|1\d\d|2[0-4]\d|25[0-5]|[1-9]\d?)
(?:$|(?=[^\d.]))
""", re.ASCII | re.VERBOSE)
def read_ips(f):
for line in f.readlines():
line, _, _ = line.partition("#") # ignore comments
for match in ipv4_pattern_strict.finditer(line):
yield match.group() # yield the entire string
def addr_to_int(ip):
match = ipv4_pattern.fullmatch(ip)
assert match is not None, ip
segs = list(map(int, match.group(1, 2, 3, 4)))
assert all(0 <= seg <= 255 for seg in segs), match.group(0)
numeric = segs[0] << 24 | segs[1] << 16 | segs[2] << 8 | segs[3]
return numeric
def ipkey(ip_string):
# this is more lenient than addr_to_int.
segs = [int(s) for s in ip_string.replace(":", ".").split(".")]
return sum(256**(3 - i) * seg for i, seg in enumerate(segs))
def ip_reader_worker(fp, queue):
from io import IOBase
needs_closing = not isinstance(fp, IOBase)
f = open(fp, "r") if needs_closing else fp
try:
for ip in read_ips(f):
queue.put(ip)
finally:
if needs_closing:
f.close()
class IpReader:
def __init__(self, *paths_and_handles):
from queue import SimpleQueue
self.fps = paths_and_handles
self.queue = SimpleQueue()
self.threads = []
self.total = 0
def is_running(self):
return any(thread.is_alive() for thread in self.threads)
def __iter__(self):
from queue import SimpleQueue, Empty
def _next():
results = SimpleQueue()
while self.is_running() or not self.queue.empty():
if self.queue.empty() and results.empty():
try:
results.put(self.queue.get(timeout=1.0))
self.total += 1
except Empty:
pass
while not self.queue.empty():
results.put(self.queue.get())
self.total += 1
if not results.empty():
yield results.get()
while not results.empty():
yield results.get()
return _next()
def __aiter__(self):
from asyncio import sleep
from queue import SimpleQueue
async def _next():
results = SimpleQueue()
while self.is_running() or not self.queue.empty():
if self.queue.empty() and results.empty():
await sleep(0.1) # this incurs some latency, but alas...
while not self.queue.empty():
results.put(self.queue.get())
self.total += 1
if not results.empty():
yield results.get()
while not results.empty():
yield results.get()
return _next()
def __enter__(self):
from threading import Thread
for fp in self.fps:
thread = Thread(target=ip_reader_worker, args=(fp, self.queue))
self.threads.append(thread)
thread.start()
return self
def __exit__(self, exc_type, exc_value, traceback):
pass