respodns/respodns/db.py

305 lines
9.2 KiB
Python
Raw Permalink Normal View History

2020-08-29 06:04:56 -07:00
from .ip_util import addr_to_int
2021-08-06 18:14:17 -07:00
from .tables import Base, TException, TExecution, TAddress
from .tables import TKind, TDomain, TRecord, TMessage
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(autoflush=False)
create_view_statements = [
"""
CREATE VIEW Results AS
SELECT
Messages.ExecutionId,
ServerIps.AsStr as Server,
Kinds.Name as Kind,
Domains.Name as Name,
RecordIps.AsStr as Address,
Exceptions.Name as Exception,
Messages.Failed as Failed
FROM Messages
LEFT JOIN Domains ON Messages.DomainId = Domains.DomainId
LEFT JOIN Kinds ON Domains.KindId = Kinds.KindId
LEFT JOIN Ips AS ServerIps ON Messages.ServerId = ServerIps.IpId
LEFT JOIN Records ON Messages.RecordId = Records.RecordId
LEFT JOIN Ips as RecordIps ON Records.IpId = RecordIps.IpId
LEFT JOIN Exceptions ON Messages.ExceptionId = Exceptions.ExceptionId
""",
]
2020-08-29 01:16:06 -07:00
2020-08-29 06:34:46 -07:00
2020-08-29 01:16:06 -07:00
class Execution:
def __init__(self, db):
self.db = db
self.execution = None
def __enter__(self):
from .util import right_now
self.execution = self.db.start_execution(right_now())
return self.execution
def __exit__(self, exc_type, exc_value, traceback):
from .util import right_now
completed = exc_type is None
self.db.finish_execution(self.execution, right_now(), completed)
2020-08-29 06:34:46 -07:00
def is_column(ref):
2021-08-06 18:14:17 -07:00
from sqlalchemy.orm.attributes import InstrumentedAttribute
return isinstance(ref, InstrumentedAttribute)
2020-08-29 06:34:46 -07:00
2020-08-29 01:16:06 -07:00
def apply_properties(obj, d):
for k, v in d.items():
ref = getattr(obj.__class__, k)
assert ref is not None, (type(obj), k)
2020-08-29 06:34:46 -07:00
assert is_column(ref), (type(obj), k)
2020-08-29 01:16:06 -07:00
setattr(obj, k, v)
return obj
2020-08-29 06:34:46 -07:00
2020-08-29 01:16:06 -07:00
class RespoDB:
def __init__(self, uri, setup=False, create=False):
2021-08-06 18:14:17 -07:00
from sqlalchemy import create_engine
2020-08-29 06:38:29 -07:00
2020-08-29 01:16:06 -07:00
self.uri = uri
db_exists = self._db_exists(self.uri)
2021-08-06 18:14:17 -07:00
self.db = create_engine(self.uri)
Session.configure(bind=self.db)
2020-08-29 01:16:06 -07:00
self._conn = None
if setup or (create and not db_exists):
with self:
2021-08-06 18:14:17 -07:00
Base.metadata.create_all(self.db)
2020-08-29 01:16:06 -07:00
self.setup_executions()
self.setup_exceptions()
self.setup_ips()
self.setup_kinds()
self.setup_domains()
self.setup_records()
self.setup_messages()
for q in create_view_statements:
2020-08-29 06:34:46 -07:00
self._fire(q)
2020-08-29 01:16:06 -07:00
assert setup or create or db_exists, "database was never setup"
self.execution = Execution(self)
@staticmethod
def _db_exists(uri):
from os.path import exists
_, _, fp = uri.partition(":")
if fp.startswith("//"):
_, _, fp = fp[2:].partition("/")
return fp and exists(fp)
def __enter__(self):
2021-08-06 18:14:17 -07:00
self._conn = Session()
2020-08-29 01:16:06 -07:00
return self
def __exit__(self, exc_type, exc_value, traceback):
self.commit()
self._conn.close()
self._conn = None
2021-08-06 18:14:17 -07:00
def find_one(self, cls_spec, **filters):
if len(filters) > 0:
return self._conn.query(cls_spec).filter_by(**filters).first()
else:
return self._conn.query(cls_spec).first()
2020-08-29 01:16:06 -07:00
def flush(self):
assert self._conn is not None
self._conn.flush()
def commit(self):
assert self._conn is not None
self._conn.commit()
def new_exception(self, **kwargs):
assert self._conn is not None
2021-08-06 18:14:17 -07:00
res = TException(**kwargs)
self._conn.add(res)
return res
2020-08-29 01:16:06 -07:00
def new_kind(self, **kwargs):
assert self._conn is not None
2021-08-06 18:14:17 -07:00
res = TKind(**kwargs)
self._conn.add(res)
return res
2020-08-29 01:16:06 -07:00
def new_domain(self, **kwargs):
assert self._conn is not None
2021-08-06 18:14:17 -07:00
res = TDomain(**kwargs)
self._conn.add(res)
return res
2020-08-29 01:16:06 -07:00
def new_address(self, **kwargs):
assert self._conn is not None
2021-08-06 18:14:17 -07:00
res = TAddress(**kwargs)
self._conn.add(res)
return res
2020-08-29 01:16:06 -07:00
def new_record(self, **kwargs):
assert self._conn is not None
2021-08-06 18:14:17 -07:00
res = TRecord(**kwargs)
self._conn.add(res)
return res
2020-08-29 01:16:06 -07:00
def new_message(self, **kwargs):
assert self._conn is not None
2021-08-06 18:14:17 -07:00
res = TMessage(**kwargs)
self._conn.add(res)
return res
2020-08-29 01:16:06 -07:00
2020-08-29 06:34:46 -07:00
def _fire(self, statement):
assert self._conn is not None
2021-08-06 18:14:17 -07:00
self._conn.execute(statement).close()
2020-08-29 06:34:46 -07:00
2020-08-29 01:16:06 -07:00
def setup_executions(self):
2021-08-06 18:14:17 -07:00
pass
2020-08-29 01:16:06 -07:00
def setup_exceptions(self):
# careful not to call them "errors" since NXDOMAIN is not an error.
# TODO: upsert?
self.new_exception(name="NXDOMAIN", fail=False)
self.new_exception(name="NoAnswer", fail=True)
self.new_exception(name="NoNameservers", fail=True)
self.new_exception(name="Timeout", fail=True)
def setup_ips(self):
self.modify_address("0.0.0.0", block_target=True)
self.modify_address("127.0.0.1", block_target=True)
2020-08-29 01:16:06 -07:00
def setup_kinds(self):
2021-08-06 18:14:17 -07:00
pass
2020-08-29 01:16:06 -07:00
def setup_domains(self):
2021-08-06 18:14:17 -07:00
pass
2020-08-29 01:16:06 -07:00
def setup_records(self):
2021-08-06 18:14:17 -07:00
pass
2020-08-29 01:16:06 -07:00
def setup_messages(self):
2021-08-06 18:14:17 -07:00
pass
2020-08-29 01:16:06 -07:00
def start_execution(self, dt):
execution = TExecution()
execution.start_date = dt
self.flush()
return execution
def finish_execution(self, execution, dt, completed):
# TODO: fail if ExecutionId is missing?
execution.finish_date = dt
execution.completed = completed
self.flush()
def all_ips(self):
assert self._conn is not None
2021-08-13 02:38:36 -07:00
temp = self._conn.query(TAddress).values(TAddress.str)
return [t[0] for t in temp]
2020-08-29 01:16:06 -07:00
def next_record_id(self):
2021-08-06 18:14:17 -07:00
from sqlalchemy.sql.expression import func
expr = func.coalesce(func.max(TRecord.record_id), 0) + 1
return self.find_one(expr)[0]
2020-08-29 01:16:06 -07:00
def find_record_id(self, addresses):
address_ids = list(address.address_id for address in addresses)
2021-08-06 18:14:17 -07:00
temp = self._conn.query(TRecord).filter(TRecord.address_id.in_(address_ids))
record_ids = [t[0] for t in temp.values(TRecord.record_id)]
# TODO: why are record_ids even tuples to begin with?
2020-08-29 01:16:06 -07:00
if not record_ids:
return None
unique_ids = sorted(set(record_ids))
for needle in unique_ids:
if sum(1 for id in record_ids if id == needle) == len(addresses):
found = True
return needle
return None
def push_entry(self, entry):
2021-08-06 18:14:17 -07:00
kind = self.find_one(TKind, name=entry.kind)
2020-08-29 01:16:06 -07:00
if not kind:
kind = self.new_kind(name=entry.kind)
if entry.kind.startswith("bad"):
2021-08-06 18:14:17 -07:00
exception = self.find_one(TException, name="NXDOMAIN")
2020-08-29 01:16:06 -07:00
assert exception is not None
kind.exception = exception
2021-08-06 18:14:17 -07:00
domain = self.find_one(TDomain, name=entry.domain)
2020-08-29 01:16:06 -07:00
if not domain:
domain = self.new_domain(name=entry.domain)
domain.kind = kind
addresses = []
as_ints = sorted(set(map(addr_to_int, entry.addrs)))
for numeric in as_ints:
2021-08-06 18:14:17 -07:00
address = self.find_one(TAddress, ip=numeric)
2020-08-29 01:16:06 -07:00
if not address:
address = self.new_address(ip=numeric)
addresses.append(address)
for address in addresses:
if entry.reason == "block":
address.block_target = True
elif entry.reason == "redirect":
address.redirect_target = True
elif entry.reason == "gfw":
address.gfw_target = True
if addresses:
record_id = self.find_record_id(addresses)
if record_id is None:
record_id = self.next_record_id()
for address in addresses:
self.new_record(record_id=record_id, address=address)
else:
record_id = None
numeric = addr_to_int(entry.server)
2021-08-06 18:14:17 -07:00
server = self.find_one(TAddress, ip=numeric)
2020-08-29 01:16:06 -07:00
if not server:
server = self.new_address(ip=numeric)
self.flush()
server.server = True
if entry.exception:
2021-08-06 18:14:17 -07:00
exception = self.find_one(TException, name=entry.exception)
2020-08-29 01:16:06 -07:00
assert exception is not None
else:
exception = None
2020-08-29 17:30:52 -07:00
failed = not entry.success
2020-08-29 01:16:06 -07:00
message = self.new_message(
execution=entry.execution,
server=server, domain=domain,
2020-08-29 17:30:52 -07:00
record_id=record_id, exception=exception,
failed=failed)
2020-08-29 01:16:06 -07:00
self.flush()
def country_code(self, ip, code=None):
numeric = addr_to_int(ip)
2021-08-06 18:14:17 -07:00
address = self.find_one(TAddress, ip=numeric)
if code is None:
if address is not None:
return address.country_code
else:
# NOTE: can't set code to null here since None was ruled out.
if address is None:
self.new_address(ip=numeric, country_code=code)
else:
address.country_code = code
self.flush()
return None
def modify_address(self, ip, **kwargs):
numeric = addr_to_int(ip)
2021-08-06 18:14:17 -07:00
address = self.find_one(TAddress, ip=numeric)
if address is None:
self.new_address(ip=numeric, **kwargs)
else:
apply_properties(address, kwargs)
self.flush()