97 lines
2.8 KiB
Python
97 lines
2.8 KiB
Python
rot13_mapping = {}
|
|
for a, b, c, d in zip("anAN05", "mzMZ49", "naNA50", "zmZM94"):
|
|
rot13_mapping.update({chr(k): chr(v)
|
|
for k, v in zip(range(ord(a), ord(b) + 1),
|
|
range(ord(c), ord(d) + 1))})
|
|
|
|
|
|
def rot13(s):
|
|
return "".join(rot13_mapping.get(c, c) for c in s)
|
|
|
|
|
|
def right_now():
|
|
from datetime import datetime, timezone
|
|
return datetime.now(timezone.utc)
|
|
|
|
|
|
def nonsense_consistent(domain, weekly=True):
|
|
from datetime import datetime, timezone
|
|
from random import Random
|
|
from string import ascii_lowercase
|
|
from zlib import crc32
|
|
if weekly:
|
|
week = datetime.now(timezone.utc).isocalendar().week
|
|
week_bytes = bytes((week, week, week, week))
|
|
seed = crc32(week_bytes + domain.encode("utf-8"))
|
|
else:
|
|
seed = crc32(domain.encode("utf-8"))
|
|
rng = Random(seed)
|
|
length = rng.choices((9, 10, 11, 12), (4, 5, 3, 2))[0]
|
|
return "".join(rng.choice(ascii_lowercase) for i in range(length))
|
|
|
|
|
|
def concat_nonsense(domain, weekly=True):
|
|
return nonsense_consistent(domain, weekly=weekly) + "." + domain
|
|
|
|
|
|
def head(n, it):
|
|
# TODO: maybe just do return [a for _, a in zip(range(n), it)]
|
|
res = []
|
|
try:
|
|
while len(res) < n:
|
|
res.append(next(it))
|
|
except StopIteration:
|
|
pass
|
|
return res
|
|
|
|
|
|
class AttrCheck:
|
|
"""
|
|
Inheriting AttrCheck prevents accidentally setting attributes
|
|
that don't already exist.
|
|
"""
|
|
def __setattr__(self, name, value):
|
|
# NOTE: hasattr doesn't do what we want here. dir does.
|
|
if name.startswith("_") or name in dir(self):
|
|
super().__setattr__(name, value)
|
|
else:
|
|
raise AttributeError(name)
|
|
|
|
|
|
def _present():
|
|
from time import time
|
|
return time()
|
|
|
|
|
|
class RateLimiter:
|
|
def __init__(self, limit):
|
|
from asyncio import Lock
|
|
|
|
if type(limit) is not int:
|
|
raise ValueError("limit must be int")
|
|
assert limit > 0, limit
|
|
|
|
self.unit = 1.0 # TODO: allow window length to be configured.
|
|
self.limit = limit
|
|
self.times = []
|
|
self.lock = Lock()
|
|
self.eps = self.unit * 0.01 # to wait a tiny bit longer than specified
|
|
|
|
def eta(self):
|
|
past = _present() - self.unit
|
|
self.times = [time for time in self.times if time > past]
|
|
ind = len(self.times) - self.limit
|
|
return 0.0 if ind < 0 else self.times[ind] - past
|
|
|
|
async def __aenter__(self):
|
|
from asyncio import sleep
|
|
|
|
async with self.lock:
|
|
while (wait := self.eta()) > 0.0:
|
|
# this is done in a loop in case sleep ends early (it can).
|
|
await sleep(wait + self.eps)
|
|
self.times.append(_present())
|
|
|
|
async def __aexit__(self, exc_type, exc_value, traceback):
|
|
pass
|