from .sql import create_table_statements, create_view_statements from .sql import table_triggers from .tables import TException, TExecution, TAddress from .tables import TKind, TDomain, TRecord, TMessage from .ip_util import addr_to_int class Execution: def __init__(self, db): self.db = db self.execution = None def __enter__(self): from .util import right_now self.execution = self.db.start_execution(right_now()) return self.execution def __exit__(self, exc_type, exc_value, traceback): from .util import right_now completed = exc_type is None self.db.finish_execution(self.execution, right_now(), completed) def is_column(ref): from storm.properties import PropertyColumn from storm.references import Reference return isinstance(ref, PropertyColumn) or isinstance(ref, Reference) def apply_properties(obj, d): for k, v in d.items(): ref = getattr(obj.__class__, k) assert ref is not None, (type(obj), k) assert is_column(ref), (type(obj), k) setattr(obj, k, v) return obj class RespoDB: def __init__(self, uri, setup=False, create=False): from storm.database import create_database self.uri = uri db_exists = self._db_exists(self.uri) self.db = create_database(self.uri) self._conn = None if setup or (create and not db_exists): with self: self.setup_executions() self.setup_exceptions() self.setup_ips() self.setup_kinds() self.setup_domains() self.setup_records() self.setup_messages() for q in create_view_statements: self._fire(q) assert setup or create or db_exists, "database was never setup" self.execution = Execution(self) @staticmethod def _db_exists(uri): from os.path import exists _, _, fp = uri.partition(":") if fp.startswith("//"): _, _, fp = fp[2:].partition("/") return fp and exists(fp) def __enter__(self): from storm.store import Store self._conn = Store(self.db) return self def __exit__(self, exc_type, exc_value, traceback): self.commit() self._conn.close() self._conn = None def find_one(self, cls_spec, *args, **kwargs): assert self._conn is not None return self._conn.find(cls_spec, *args, **kwargs).one() def flush(self): assert self._conn is not None self._conn.flush() def commit(self): assert self._conn is not None self._conn.commit() def new_exception(self, **kwargs): assert self._conn is not None return self._conn.add(apply_properties(TException(), kwargs)) def new_kind(self, **kwargs): assert self._conn is not None return self._conn.add(apply_properties(TKind(), kwargs)) def new_domain(self, **kwargs): assert self._conn is not None return self._conn.add(apply_properties(TDomain(), kwargs)) def new_address(self, **kwargs): assert self._conn is not None return self._conn.add(apply_properties(TAddress(), kwargs)) def new_record(self, **kwargs): assert self._conn is not None return self._conn.add(apply_properties(TRecord(), kwargs)) def new_message(self, **kwargs): assert self._conn is not None return self._conn.add(apply_properties(TMessage(), kwargs)) def _fire(self, statement): assert self._conn is not None self._conn.execute(statement, noresult=True) def setup_executions(self): self._fire(create_table_statements["executions"]) def setup_exceptions(self): # careful not to call them "errors" since NXDOMAIN is not an error. self._fire(create_table_statements["exceptions"]) # TODO: upsert? self.new_exception(name="NXDOMAIN", fail=False) self.new_exception(name="NoAnswer", fail=True) self.new_exception(name="NoNameservers", fail=True) self.new_exception(name="Timeout", fail=True) def setup_ips(self): from .ips import china, blocks self._fire(create_table_statements["ips"]) # TODO: upsert? self.new_address(ip=addr_to_int("0.0.0.0"), block_target=True) self.new_address(ip=addr_to_int("127.0.0.1"), block_target=True) for ip in china: self.new_address(ip=addr_to_int(ip), china=True) for ip in blocks: self.new_address(ip=addr_to_int(ip), block_target=True) def setup_kinds(self): self._fire(create_table_statements["kinds"]) # TODO: upsert? if 0: NXDOMAIN = self.find_one(TException, TException.name == "NXDOMAIN") self.new_kind(name="bad", exception=NXDOMAIN) self.new_kind(name="badsub", exception=NXDOMAIN) def setup_domains(self): self._fire(create_table_statements["domains"]) def setup_records(self): self._fire(create_table_statements["records"]) def setup_messages(self): self._fire(create_table_statements["messages"]) for trig in table_triggers["messages"]: self._conn.execute(trig) def start_execution(self, dt): execution = TExecution() execution.start_date = dt self.flush() return execution def finish_execution(self, execution, dt, completed): # TODO: fail if ExecutionId is missing? execution.finish_date = dt execution.completed = completed self.flush() def next_record_id(self): from storm.expr import Add, Max, Coalesce expr = Add(Coalesce(Max(TRecord.record_id), 0), 1) return self.find_one(expr) def find_record_id(self, addresses): address_ids = list(address.address_id for address in addresses) temp = self._conn.find(TRecord, TRecord.address_id.is_in(address_ids)) record_ids = list(temp.values(TRecord.record_id)) if not record_ids: return None unique_ids = sorted(set(record_ids)) for needle in unique_ids: if sum(1 for id in record_ids if id == needle) == len(addresses): found = True return needle return None def push_entry(self, entry): kind = self.find_one(TKind, TKind.name == entry.kind) if not kind: kind = self.new_kind(name=entry.kind) if entry.kind.startswith("bad"): exception = self.find_one(TException, TException.name == "NXDOMAIN") assert exception is not None kind.exception = exception domain = self.find_one(TDomain, TDomain.name == entry.domain) if not domain: domain = self.new_domain(name=entry.domain) domain.kind = kind addresses = [] as_ints = sorted(set(map(addr_to_int, entry.addrs))) for numeric in as_ints: address = self.find_one(TAddress, TAddress.ip == numeric) if not address: address = self.new_address(ip=numeric) addresses.append(address) for address in addresses: if entry.reason == "block": address.block_target = True elif entry.reason == "redirect": address.redirect_target = True elif entry.reason == "gfw": address.gfw_target = True if addresses: record_id = self.find_record_id(addresses) if record_id is None: record_id = self.next_record_id() for address in addresses: self.new_record(record_id=record_id, address=address) else: record_id = None numeric = addr_to_int(entry.server) server = self.find_one(TAddress, TAddress.ip == numeric) if not server: server = self.new_address(ip=numeric) self.flush() server.server = True if entry.exception: exception = self.find_one(TException, TException.name == entry.exception) assert exception is not None else: exception = None failed = not entry.success message = self.new_message( execution=entry.execution, server=server, domain=domain, record_id=record_id, exception=exception, failed=failed) self.flush()