diff --git a/respodns/db.py b/respodns/db.py index 0ddbb4c..7db2782 100644 --- a/respodns/db.py +++ b/respodns/db.py @@ -260,3 +260,27 @@ class RespoDB: record_id=record_id, exception=exception, failed=failed) self.flush() + + def country_code(self, ip, code=None): + numeric = addr_to_int(ip) + address = self.find_one(TAddress, TAddress.ip == numeric) + if code is None: + if address is not None: + return address.country_code + else: + # NOTE: can't set code to null here since None was ruled out. + if address is None: + self.new_address(ip=numeric, country_code=code) + else: + address.country_code = code + self.flush() + return None + + def modify_address(self, ip, **kwargs): + numeric = addr_to_int(ip) + address = self.find_one(TAddress, TAddress.ip == numeric) + if address is None: + self.new_address(ip=numeric, **kwargs) + else: + apply_properties(address, kwargs) + self.flush() diff --git a/respodns/dns.py b/respodns/dns.py index 9b4d0c4..7f8771c 100644 --- a/respodns/dns.py +++ b/respodns/dns.py @@ -1,9 +1,10 @@ from .structs import Options -from .ips import gfw_ips +from .ip_info import find_country, flush def detect_gfw(r, ip, check): # attempt to detect interference from the Great Firewall of China. + from .ips import gfw_ips def rs(prefix): return r.startswith(prefix) @@ -148,6 +149,12 @@ async def try_ip(db, server_ip, checks, opts: Options): else: await pooler() + await find_country(server_ip, db) + for entry in entries: + for addr in entry.addrs: + await find_country(addr, db) + flush() + if not opts.dry: for entry in entries: db.push_entry(entry) @@ -166,12 +173,27 @@ async def try_ip(db, server_ip, checks, opts: Options): return server_ip, None +async def sync_database(db): + # TODO: handle addresses that were removed from respodns.ips.china. + from .ips import china, blocks + for ip in china: + code = await find_country(ip) + db.modify_address(ip, china=True, country_code=code) + for ip in blocks: + code = await find_country(ip) + db.modify_address(ip, block_target=True, country_code=code) + flush() + + async def main(db, filepath, checks, opts: Options): from .ip_util import read_ips from .util import make_pooler from asyncio import sleep from sys import stdin + if db is not None: + await sync_database(db) + def finisher(done, pending): for task in done: ip, first_failure = task.result() diff --git a/respodns/ip_info.py b/respodns/ip_info.py new file mode 100644 index 0000000..96b1ab0 --- /dev/null +++ b/respodns/ip_info.py @@ -0,0 +1,200 @@ +from asyncio import open_connection, sleep +from collections import namedtuple +from socket import gaierror +from sys import stderr +from time import time + +CacheLine = namedtuple("CacheLine", ("time", "code")) +header = ["ip", "time", "code"] + +one_month = 30 * 24 * 60 * 60 # in seconds +encoding = "latin-1" + +cache_filepath = "ipinfo_cache.csv" + +http_cooldown = 0 +give_up = False +prepared = False +stored = None + + +async def http_lookup(ip): + global http_cooldown + + host = "ip-api.com" + path = f"/csv/{ip}?fields=2" + + err = None + + # Quote: + # Your implementation should always check the value of the X-Rl header, + # and if its is 0 you must not send any more requests + # for the duration of X-Ttl in seconds. + while time() < http_cooldown: + wait = http_cooldown - time() + wait = max(wait, 0.1) # wait at least a little bit + await sleep(wait) + + query_lines = ( + f"GET {path} HTTP/1.1", + f"Host: {host}", + f"Connection: close", + ) + query = "\r\n".join(query_lines) + "\r\n\r\n" + + reader, writer = await open_connection(host, 80) + + writer.write(query.encode(encoding, "strict")) + response = await reader.read() + lines = response.splitlines() + + it = iter(lines) + line = next(it) + if line != b"HTTP/1.1 200 OK": + http_cooldown = time() + 60 + err = "not ok" + it = () # exhaust iterator (not really) + + x_cooldown = None + x_remaining = None + + for line in it: + if line == b"": + break + + head, _, tail = line.partition(b":") + + # do some very basic validation. + if tail[0:1] == b" ": + tail = tail[1:] + else: + err = "bad tail" + break + if head in (b"Date", b"Content-Type", b"Content-Length", + b"Access-Control-Allow-Origin"): + pass + elif head == b"X-Ttl": + if tail.isdigit(): + x_cooldown = int(tail) + else: + err = "X-Ttl not integer" + break + elif head == b"X-Rl": + if tail.isdigit(): + x_remaining = int(tail) + else: + err = "X-Rl not integer" + break + + for i, line in enumerate(it): + if i == 0: + code = line + else: + err = "too many lines" + break + + writer.close() + + if x_remaining == 0: + http_cooldown = time() + x_cooldown + http_cooldown += 1.0 # still too frequent according to them + + if err: + return None, err + else: + return code, None + + +async def lookup(ip, timestamp): + global give_up + if give_up: + return None + + try: + code, err = await http_lookup(ip) + if err: + # retry once in case of rate-limiting + code, err = await http_lookup(ip) + if err: + return None + except gaierror: + give_up = True + except OSError: + give_up = True + + code = code.decode(encoding, "ignore") + if code == "": + code = "--" + if len(code) != 2: + return None + info = CacheLine(timestamp, code) + return info + + +def prepare(): + from csv import reader, writer + from os.path import exists + + global stored + stored = dict() + + if not exists(cache_filepath): + with open(cache_filepath, "w") as f: + handle = writer(f) + handle.writerow(header) + return + + with open(cache_filepath, "r", newline="", encoding="utf-8") as f: + for i, row in enumerate(reader(f)): + if i == 0: + assert row == header, row + continue + ip, time, code = row[0], float(row[1]), row[2] + info = CacheLine(time, code) + stored[ip] = info + + +def flush(): + from csv import writer + if not stored: + return + with open(cache_filepath, "w", newline="", encoding="utf-8") as f: + handle = writer(f) + handle.writerow(header) + for ip, info in stored.items(): + timestr = "{:.2f}".format(info.time) + handle.writerow([ip, timestr, info.code]) + + +def cache(ip, info=None, timestamp=None, expiry=one_month): + global stored + if stored is None: + prepare() + + now = time() if timestamp is None else timestamp + + if info is None: + cached = stored.get(ip, None) + if cached is None: + return None + if now > cached.time + expiry: + return None + return cached + else: + assert isinstance(info, CacheLine) + stored[ip] = info + + +async def find_country(ip, db=None): + now = time() + info = cache(ip, timestamp=now) + if info is None: + info = await lookup(ip, now) + if info is None: + return None + cache(ip, info) + if db is not None: + if db.country_code(ip) != info.code: + assert info.code is not None + db.country_code(ip, info.code) + return info.code diff --git a/respodns/sql.py b/respodns/sql.py index 5ffd709..01663ff 100644 --- a/respodns/sql.py +++ b/respodns/sql.py @@ -29,7 +29,8 @@ CREATE TABLE IF NOT EXISTS Ips ( BlockTarget BOOLEAN DEFAULT 0 NOT NULL, Server BOOLEAN DEFAULT 0 NOT NULL, RedirectTarget BOOLEAN DEFAULT 0 NOT NULL, - GfwTarget BOOLEAN DEFAULT 0 NOT NULL) + GfwTarget BOOLEAN DEFAULT 0 NOT NULL, + CountryCode TEXT) """, kinds=""" diff --git a/respodns/tables.py b/respodns/tables.py index f8b06cc..22fda74 100644 --- a/respodns/tables.py +++ b/respodns/tables.py @@ -27,6 +27,7 @@ class TAddress(rain.Storm, AttrCheck): server = rain.Bool("Server") redirect_target = rain.Bool("RedirectTarget") gfw_target = rain.Bool("GfwTarget") + country_code = rain.Unicode("CountryCode") class TKind(rain.Storm, AttrCheck):