From ecb259609089c3a10a8bbacfc96b6d51689e09b2 Mon Sep 17 00:00:00 2001 From: Connor Olding Date: Sat, 29 Aug 2020 14:54:07 +0200 Subject: [PATCH] begin reorganizing code --- .gitignore | 1 + respodns/__main__.py | 10 +-- respodns/checks.py | 27 +-------- respodns/db.py | 141 +------------------------------------------ respodns/ips.py | 4 -- respodns/nonsense.py | 8 --- respodns/pooler.py | 48 --------------- respodns/sql.py | 114 ++++++++++++++++++++++++++++++++++ respodns/structs.py | 5 +- respodns/top1m.py | 6 +- respodns/util.py | 84 +++++++++++++++++++++++++- 11 files changed, 213 insertions(+), 235 deletions(-) delete mode 100644 respodns/nonsense.py delete mode 100644 respodns/pooler.py create mode 100644 respodns/sql.py diff --git a/.gitignore b/.gitignore index cce4cbb..54e2467 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ __pycache__/ build/ dist/ +orig/ diff --git a/respodns/__main__.py b/respodns/__main__.py index a810776..67b253c 100644 --- a/respodns/__main__.py +++ b/respodns/__main__.py @@ -2,10 +2,8 @@ from .db import RespoDB from .ips import blocks, is_bogon -from .pooler import make_simple_pooler from .structs import Options, Entry -from .util import right_now, read_ips, getaddrs, detect_gfw -from argparse import ArgumentParser +from .util import right_now, read_ips, getaddrs, detect_gfw, make_pooler from asyncio import run, sleep from sys import argv, stdin, stderr, exit import respodns.checks as chk @@ -76,7 +74,7 @@ async def try_ip(db, server_ip, checks, opts: Options): pend.cancel() success = False - pooler = make_simple_pooler(opts.domain_simul, finisher) + pooler = make_pooler(opts.domain_simul, finisher) async def getaddrs_wrapper(ip, check): # NOTE: could put right_now() stuff here! @@ -130,7 +128,7 @@ async def main(db, filepath, checks, opts: Options): else: print(ip, ff.reason, ff.kind, ff.domain, sep="\t") - pooler = make_simple_pooler(opts.ip_simul, finisher) + pooler = make_pooler(opts.ip_simul, finisher) f = stdin if filepath == "" else open(filepath, "r") for i, ip in enumerate(read_ips(f)): @@ -147,6 +145,8 @@ async def main(db, filepath, checks, opts: Options): await pooler() def ui(program, args): + from argparse import ArgumentParser + name = "respodns6" parser = ArgumentParser(name, description=name + ": test and log DNS records") diff --git a/respodns/checks.py b/respodns/checks.py index fbb6749..571c6d9 100644 --- a/respodns/checks.py +++ b/respodns/checks.py @@ -1,29 +1,6 @@ -from collections import namedtuple -from .nonsense import nonsense_consistent from .top1m import retrieve_top1m_entries - -rot13_mapping = {} -for a, b, c, d in zip("anAN05", "mzMZ49", "naNA50", "zmZM94"): - rot13_mapping.update(dict((chr(k), chr(v)) - for k, v in zip(range(ord(a), ord(b) + 1), - range(ord(c), ord(d) + 1)))) - -def rot13(s): - return "".join(rot13_mapping.get(c, c) for c in s) - -def concat_nonsense(domain): - return nonsense_consistent(domain) + "." + domain - -def head(n, it): - res = [] - try: - while len(res) < n: - res.append(next(it)) - except StopIteration: - pass - return res - -Check = namedtuple("Check", ("kind", "domain")) +from .util import concat_nonsense, rot13, head +from .structs import Check first = [ Check("common", "baidu.com"), # this avoids issues with chinese censorship: https://www.bortzmeyer.org/sichuan-pepper.html diff --git a/respodns/db.py b/respodns/db.py index 291e1fe..550db9c 100644 --- a/respodns/db.py +++ b/respodns/db.py @@ -1,130 +1,7 @@ +from .sql import create_table_statements, create_view_statements +from .sql import table_triggers +from .util import addr_to_int, AttrCheck import storm.locals as rain -import re - -ipv4_pattern = re.compile("(\d+)\.(\d+)\.(\d+)\.(\d+)", re.ASCII) - -def addr_to_int(ip): - match = ipv4_pattern.fullmatch(ip) - assert match is not None, row - segs = list(map(int, match.group(1, 2, 3, 4))) - assert all(0 <= seg <= 255 for seg in segs), match.group(0) - numeric = segs[0] << 24 | segs[1] << 16 | segs[2] << 8 | segs[3] - return numeric - -create_table_statements = dict( - # TODO: Duration REAL GENERATED ALWAYS AS etc.? - executions=""" -CREATE TABLE IF NOT EXISTS Executions ( - ExecutionId INTEGER PRIMARY KEY, - StartDate DATE NOT NULL, - FinishDate DATE, - Completed BOOLEAN DEFAULT 0 NOT NULL) - """, - - exceptions=""" -CREATE TABLE IF NOT EXISTS Exceptions ( - ExceptionId INTEGER PRIMARY KEY, - Name TEXT NOT NULL, - Fail BOOLEAN NOT NULL) - """, - - ips=""" -CREATE TABLE IF NOT EXISTS Ips ( - IpId INTEGER PRIMARY KEY, - AsStr TEXT GENERATED ALWAYS AS ( - Cast(AsInt >> 24 & 255 AS TEXT) || '.' || - Cast(AsInt >> 16 & 255 AS TEXT) || '.' || - Cast(AsInt >> 8 & 255 AS TEXT) || '.' || - Cast(AsInt & 255 AS TEXT) - ) STORED NOT NULL, - AsInt INTEGER UNIQUE CHECK(AsInt >= 0 AND AsInt < 1 << 32) NOT NULL, - China BOOLEAN DEFAULT 0 NOT NULL, - BlockTarget BOOLEAN DEFAULT 0 NOT NULL, - Server BOOLEAN DEFAULT 0 NOT NULL, - RedirectTarget BOOLEAN DEFAULT 0 NOT NULL, - GfwTarget BOOLEAN DEFAULT 0 NOT NULL) - """, - - kinds=""" -CREATE TABLE IF NOT EXISTS Kinds ( - KindId INTEGER PRIMARY KEY, - Name TEXT UNIQUE NOT NULL, - ExpectExceptionId INTEGER, - FOREIGN KEY(ExpectExceptionId) REFERENCES Exceptions(ExceptionId)) - """, - - domains=""" -CREATE TABLE IF NOT EXISTS Domains ( - DomainId INTEGER PRIMARY KEY, - Name TEXT UNIQUE NOT NULL, - KindId INTEGER, - FOREIGN KEY(KindId) REFERENCES Kinds(KindId)) - """, - - # NOTE: that RecordId is *not* the rowid here - # since records can contain multiple IPs, - # and thereby span multiple rows. - # TODO: indexing stuff, cascade deletion stuff. - records=""" -CREATE TABLE IF NOT EXISTS Records ( - RecordId INTEGER NOT NULL, - IpId INTEGER, - FOREIGN KEY(IpId) REFERENCES Ips(IpId)) - """, - - messages=""" -CREATE TABLE IF NOT EXISTS Messages ( - MessageId INTEGER PRIMARY KEY, - ExecutionId INTEGER, - ServerId INTEGER NOT NULL, - DomainId INTEGER NOT NULL, - RecordId INTEGER, - ExceptionId INTEGER, - FOREIGN KEY(ServerId) REFERENCES Ips(IpId), - FOREIGN KEY(ExecutionId) REFERENCES Executions(ExecutionId), - FOREIGN KEY(DomainId) REFERENCES Domains(DomainId), - FOREIGN KEY(ExceptionId) REFERENCES Exceptions(ExceptionId)) - """, - # this fails because RecordId is not UNIQUE: - # FOREIGN KEY(RecordId) REFERENCES Records(RecordId) -) - -create_view_statements = [ - """ -CREATE VIEW Results AS -SELECT - Messages.ExecutionId, - ServerIps.AsStr as Server, - Kinds.Name as Kind, - Domains.Name as Name, - RecordIps.AsStr as Address, - Exceptions.Name as Exception -FROM Messages -LEFT JOIN Domains ON Messages.DomainId = Domains.DomainId -LEFT JOIN Kinds ON Domains.KindId = Kinds.KindId -LEFT JOIN Ips AS ServerIps ON Messages.ServerId = ServerIps.IpId -LEFT JOIN Records ON Messages.RecordId = Records.RecordId -LEFT JOIN Ips as RecordIps ON Records.IpId = RecordIps.IpId -LEFT JOIN Exceptions ON Messages.ExceptionId = Exceptions.ExceptionId --- GROUP BY Records.IpId - """, -] - -table_triggers = dict( - messages=[ - # TODO: more triggers. (before update, and also for Records table) - """ -CREATE TRIGGER IF NOT EXISTS RecordExists -BEFORE INSERT -ON Messages -BEGIN - SELECT CASE - WHEN NEW.RecordId NOTNULL AND NOT EXISTS(SELECT 1 FROM Records WHERE Records.RecordID = NEW.RecordId) - THEN raise(FAIL, "RecordId does not exist") - END; -END - """, - ]) class Execution: def __init__(self, db): @@ -141,18 +18,6 @@ class Execution: completed = exc_type is None self.db.finish_execution(self.execution, right_now(), completed) -class AttrCheck: - """ - Inheriting AttrCheck prevents accidentally setting attributes - that don't already exist. - """ - def __setattr__(self, name, value): - # NOTE: hasattr doesn't do what we want here. dir does. - if name.startswith("_") or name in dir(self): - super().__setattr__(name, value) - else: - raise AttributeError(name) - class TException(rain.Storm, AttrCheck): __storm_table__ = "Exceptions" exception_id = rain.Int("ExceptionId", primary=True) diff --git a/respodns/ips.py b/respodns/ips.py index 81dccde..baeee01 100644 --- a/respodns/ips.py +++ b/respodns/ips.py @@ -61,7 +61,3 @@ bogon_checks = [ def is_bogon(ip): return any(ip.startswith(check) for check in bogon_checks) - -def ipkey(ip_string): - segs = [int(s) for s in ip_string.replace(":", ".").split(".")] - return sum(256**(3 - i) * seg for i, seg in enumerate(segs)) diff --git a/respodns/nonsense.py b/respodns/nonsense.py deleted file mode 100644 index c6df281..0000000 --- a/respodns/nonsense.py +++ /dev/null @@ -1,8 +0,0 @@ -from random import choice, choices, Random -from string import ascii_lowercase -from zlib import crc32 - -def nonsense_consistent(domain): - rng = Random(crc32(domain.encode("utf-8"))) - length = rng.choices((9, 10, 11, 12), (4, 5, 3, 2))[0] - return "".join(rng.choice(ascii_lowercase) for i in range(length)) diff --git a/respodns/pooler.py b/respodns/pooler.py deleted file mode 100644 index daeb4dc..0000000 --- a/respodns/pooler.py +++ /dev/null @@ -1,48 +0,0 @@ -from types import CoroutineType -import asyncio - -# TODO: write a less confusing interface that allows the code to be written more flatly. -# maybe like: async for done in apply(doit, [tuple_of_args]): - -def make_pooler(pool_size, finisher=None): - aws = set() - async def pooler(item=None): - nonlocal aws - finish = item is None - if not finish: - if isinstance(item, CoroutineType): - assert not isinstance(item, asyncio.Task) - item = asyncio.create_task(item) - aws.add(item) - # TODO: don't wait until all completed, just first completed in loop. - # that way we can handle each done task ASAP. - condition = asyncio.ALL_COMPLETED if finish else asyncio.FIRST_COMPLETED - if len(aws) == 0: - return None - if finish or len(aws) >= pool_size: - done, pending = await asyncio.wait(aws, return_when=condition) - #pending = set(task for task in pending if task in aws) # ??? - ret = None if finisher is None else finisher(done, pending) - #aws = set(task for task in pending if not task.cancelled()) - aws = pending - if ret is not None: - return ret - return None - return pooler - -def make_simple_pooler(pool_size, finisher=None): - condition = asyncio.FIRST_COMPLETED - pending = set() - async def pooler(item=None): - nonlocal pending - finish = item is None - if not finish: - if isinstance(item, CoroutineType): - assert not isinstance(item, asyncio.Task) - item = asyncio.create_task(item) - pending.add(item) - desired_size = 0 if finish else pool_size - 1 - while len(pending) > desired_size: - done, pending = await asyncio.wait(pending, return_when=condition) - finisher(done, pending) - return pooler diff --git a/respodns/sql.py b/respodns/sql.py new file mode 100644 index 0000000..76faaa2 --- /dev/null +++ b/respodns/sql.py @@ -0,0 +1,114 @@ +create_table_statements = dict( + # TODO: Duration REAL GENERATED ALWAYS AS etc.? + executions=""" +CREATE TABLE IF NOT EXISTS Executions ( + ExecutionId INTEGER PRIMARY KEY, + StartDate DATE NOT NULL, + FinishDate DATE, + Completed BOOLEAN DEFAULT 0 NOT NULL) + """, + + exceptions=""" +CREATE TABLE IF NOT EXISTS Exceptions ( + ExceptionId INTEGER PRIMARY KEY, + Name TEXT NOT NULL, + Fail BOOLEAN NOT NULL) + """, + + ips=""" +CREATE TABLE IF NOT EXISTS Ips ( + IpId INTEGER PRIMARY KEY, + AsStr TEXT GENERATED ALWAYS AS ( + Cast(AsInt >> 24 & 255 AS TEXT) || '.' || + Cast(AsInt >> 16 & 255 AS TEXT) || '.' || + Cast(AsInt >> 8 & 255 AS TEXT) || '.' || + Cast(AsInt & 255 AS TEXT) + ) STORED NOT NULL, + AsInt INTEGER UNIQUE CHECK(AsInt >= 0 AND AsInt < 1 << 32) NOT NULL, + China BOOLEAN DEFAULT 0 NOT NULL, + BlockTarget BOOLEAN DEFAULT 0 NOT NULL, + Server BOOLEAN DEFAULT 0 NOT NULL, + RedirectTarget BOOLEAN DEFAULT 0 NOT NULL, + GfwTarget BOOLEAN DEFAULT 0 NOT NULL) + """, + + kinds=""" +CREATE TABLE IF NOT EXISTS Kinds ( + KindId INTEGER PRIMARY KEY, + Name TEXT UNIQUE NOT NULL, + ExpectExceptionId INTEGER, + FOREIGN KEY(ExpectExceptionId) REFERENCES Exceptions(ExceptionId)) + """, + + domains=""" +CREATE TABLE IF NOT EXISTS Domains ( + DomainId INTEGER PRIMARY KEY, + Name TEXT UNIQUE NOT NULL, + KindId INTEGER, + FOREIGN KEY(KindId) REFERENCES Kinds(KindId)) + """, + + # NOTE: that RecordId is *not* the rowid here + # since records can contain multiple IPs, + # and thereby span multiple rows. + # TODO: indexing stuff, cascade deletion stuff. + records=""" +CREATE TABLE IF NOT EXISTS Records ( + RecordId INTEGER NOT NULL, + IpId INTEGER, + FOREIGN KEY(IpId) REFERENCES Ips(IpId)) + """, + + messages=""" +CREATE TABLE IF NOT EXISTS Messages ( + MessageId INTEGER PRIMARY KEY, + ExecutionId INTEGER, + ServerId INTEGER NOT NULL, + DomainId INTEGER NOT NULL, + RecordId INTEGER, + ExceptionId INTEGER, + FOREIGN KEY(ServerId) REFERENCES Ips(IpId), + FOREIGN KEY(ExecutionId) REFERENCES Executions(ExecutionId), + FOREIGN KEY(DomainId) REFERENCES Domains(DomainId), + FOREIGN KEY(ExceptionId) REFERENCES Exceptions(ExceptionId)) + """, + # this fails because RecordId is not UNIQUE: + # FOREIGN KEY(RecordId) REFERENCES Records(RecordId) +) + +create_view_statements = [ + """ +CREATE VIEW Results AS +SELECT + Messages.ExecutionId, + ServerIps.AsStr as Server, + Kinds.Name as Kind, + Domains.Name as Name, + RecordIps.AsStr as Address, + Exceptions.Name as Exception +FROM Messages +LEFT JOIN Domains ON Messages.DomainId = Domains.DomainId +LEFT JOIN Kinds ON Domains.KindId = Kinds.KindId +LEFT JOIN Ips AS ServerIps ON Messages.ServerId = ServerIps.IpId +LEFT JOIN Records ON Messages.RecordId = Records.RecordId +LEFT JOIN Ips as RecordIps ON Records.IpId = RecordIps.IpId +LEFT JOIN Exceptions ON Messages.ExceptionId = Exceptions.ExceptionId +-- GROUP BY Records.IpId + """, +] + +table_triggers = dict( + messages=[ + # TODO: more triggers. (before update, and also for Records table) + """ +CREATE TRIGGER IF NOT EXISTS RecordExists +BEFORE INSERT +ON Messages +BEGIN + SELECT CASE + WHEN NEW.RecordId NOTNULL AND NOT EXISTS(SELECT 1 FROM Records WHERE Records.RecordID = NEW.RecordId) + THEN raise(FAIL, "RecordId does not exist") + END; +END + """, + ]) diff --git a/respodns/structs.py b/respodns/structs.py index bbbfa23..7ddf772 100644 --- a/respodns/structs.py +++ b/respodns/structs.py @@ -1,8 +1,8 @@ +from collections import namedtuple from dataclasses import dataclass @dataclass class Options: - #exec_id: int = -1 execution: object = None ip_simul: int = 10 # how many IPs to connect to at once @@ -28,5 +28,6 @@ class Entry: exception: str addrs: list # list of strings reason: str - #exec_id: int execution: object + +Check = namedtuple("Check", ("kind", "domain")) diff --git a/respodns/top1m.py b/respodns/top1m.py index a10b1e1..ac30a49 100644 --- a/respodns/top1m.py +++ b/respodns/top1m.py @@ -3,11 +3,11 @@ csvfn_default = "top-1m.csv" one_week = 7 * 24 * 60 * 60 # in seconds -def alive(fp, expire): +def alive(fp, expiry): from os.path import exists, getmtime, getsize from time import time - return exists(fp) and time() < getmtime(fp) + one_week and getsize(fp) > 2 + return exists(fp) and time() < getmtime(fp) + expiry and getsize(fp) > 2 def download_top1m(urltop=None, csvfn=None): from io import BytesIO @@ -48,5 +48,3 @@ def retrieve_top1m_entries(csv_fp="top-1m.csv"): entries = [(lambda a: (int(a[0]), a[2]))(line.partition(",")) for line in lines] return entries - -top1m = download_top1m diff --git a/respodns/util.py b/respodns/util.py index 45ac7f0..5990e5d 100644 --- a/respodns/util.py +++ b/respodns/util.py @@ -1,7 +1,24 @@ +import re +ipv4_pattern = re.compile("(\d+)\.(\d+)\.(\d+)\.(\d+)", re.ASCII) + +rot13_mapping = {} +for a, b, c, d in zip("anAN05", "mzMZ49", "naNA50", "zmZM94"): + rot13_mapping.update(dict((chr(k), chr(v)) + for k, v in zip(range(ord(a), ord(b) + 1), + range(ord(c), ord(d) + 1)))) + def right_now(): from datetime import datetime, timezone return datetime.now(timezone.utc) +def nonsense_consistent(domain): + from random import Random + from string import ascii_lowercase + from zlib import crc32 + rng = Random(crc32(domain.encode("utf-8"))) + length = rng.choices((9, 10, 11, 12), (4, 5, 3, 2))[0] + return "".join(rng.choice(ascii_lowercase) for i in range(length)) + def detect_gfw(r, ip, check): # attempt to detect interference from the Great Firewall of China. #from .ips import china @@ -38,7 +55,6 @@ async def getaddrs(server, domain, opts): from dns.exception import Timeout from dns.resolver import NXDOMAIN, NoAnswer, NoNameservers #from dns.resolver import Resolver - from .ips import ipkey res = Resolver(configure=False) if opts.impatient: @@ -69,3 +85,69 @@ def read_ips(f): if ip.count(".") != 3: continue yield ip + +def rot13(s): + return "".join(rot13_mapping.get(c, c) for c in s) + +def concat_nonsense(domain): + return nonsense_consistent(domain) + "." + domain + +def head(n, it): + res = [] + try: + while len(res) < n: + res.append(next(it)) + except StopIteration: + pass + return res + +def addr_to_int(ip): + match = ipv4_pattern.fullmatch(ip) + assert match is not None, row + segs = list(map(int, match.group(1, 2, 3, 4))) + assert all(0 <= seg <= 255 for seg in segs), match.group(0) + numeric = segs[0] << 24 | segs[1] << 16 | segs[2] << 8 | segs[3] + return numeric + +def ipkey(ip_string): + # this is more lenient than addr_to_int. + segs = [int(s) for s in ip_string.replace(":", ".").split(".")] + return sum(256**(3 - i) * seg for i, seg in enumerate(segs)) + +def taskize(item): + from types import CoroutineType + from asyncio import Task, create_task + + if isinstance(item, CoroutineType): + assert not isinstance(item, Task) # TODO: paranoid? + item = create_task(item) + return item + +def make_pooler(pool_size, finisher=None): + # TODO: write a less confusing interface that allows the code to be written more flatly. + # maybe like: async for done in apply(doit, [tuple_of_args]): + from asyncio import wait, FIRST_COMPLETED + + pending = set() + async def pooler(item=None): + nonlocal pending + finish = item is None + if not finish: + pending.add(taskize(item)) + desired_size = 0 if finish else pool_size - 1 + while len(pending) > desired_size: + done, pending = await wait(pending, return_when=FIRST_COMPLETED) + finisher(done, pending) + return pooler + +class AttrCheck: + """ + Inheriting AttrCheck prevents accidentally setting attributes + that don't already exist. + """ + def __setattr__(self, name, value): + # NOTE: hasattr doesn't do what we want here. dir does. + if name.startswith("_") or name in dir(self): + super().__setattr__(name, value) + else: + raise AttributeError(name)