rot13_mapping = {} for a, b, c, d in zip("anAN05", "mzMZ49", "naNA50", "zmZM94"): rot13_mapping.update({chr(k): chr(v) for k, v in zip(range(ord(a), ord(b) + 1), range(ord(c), ord(d) + 1))}) def rot13(s): return "".join(rot13_mapping.get(c, c) for c in s) def right_now(): from datetime import datetime, timezone return datetime.now(timezone.utc) def nonsense_consistent(domain, weekly=True): from datetime import datetime, timezone from random import Random from string import ascii_lowercase from zlib import crc32 if weekly: week = datetime.now(timezone.utc).isocalendar().week week_bytes = bytes((week, week, week, week)) seed = crc32(week_bytes + domain.encode("utf-8")) else: seed = crc32(domain.encode("utf-8")) rng = Random(seed) length = rng.choices((9, 10, 11, 12), (4, 5, 3, 2))[0] return "".join(rng.choice(ascii_lowercase) for i in range(length)) def concat_nonsense(domain, weekly=True): return nonsense_consistent(domain, weekly=weekly) + "." + domain def head(n, it): # TODO: maybe just do return [a for _, a in zip(range(n), it)] res = [] try: while len(res) < n: res.append(next(it)) except StopIteration: pass return res def taskize(item): from types import CoroutineType from asyncio import Task, create_task if isinstance(item, CoroutineType): assert not isinstance(item, Task) # TODO: paranoid? item = create_task(item) return item def make_pooler(pool_size, finisher=None): # TODO: write a less confusing interface # that allows the code to be written more flatly. # maybe like: async for done in apply(doit, [tuple_of_args]): from asyncio import wait, FIRST_COMPLETED pending = set() async def pooler(item=None): nonlocal pending finish = item is None if not finish: pending.add(taskize(item)) desired_size = 0 if finish else pool_size - 1 while len(pending) > desired_size: done, pending = await wait(pending, return_when=FIRST_COMPLETED) if finisher is not None: finisher(done, pending) return pooler class AttrCheck: """ Inheriting AttrCheck prevents accidentally setting attributes that don't already exist. """ def __setattr__(self, name, value): # NOTE: hasattr doesn't do what we want here. dir does. if name.startswith("_") or name in dir(self): super().__setattr__(name, value) else: raise AttributeError(name) async def _release_later(sem, time=1): from asyncio import sleep await sleep(time) sem.release() class LimitPerSecond: def __init__(self, limit): from asyncio import BoundedSemaphore if type(limit) is not int: raise ValueError("limit must be int") assert limit > 0, limit self.limit = limit self.tasks = [] self.sem = BoundedSemaphore(limit) async def __aenter__(self): #if self.sem.locked: # from sys import stderr # print("THROTTLING", file=stderr) await self.sem.acquire() async def __aexit__(self, exc_type, exc_value, traceback): from asyncio import create_task task = create_task(_release_later(self.sem)) self.tasks.append(task) async def finish(self): for task in self.tasks: await task