from types import CoroutineType import asyncio # TODO: write a less confusing interface that allows the code to be written more flatly. # maybe like: async for done in apply(doit, [tuple_of_args]): def make_pooler(pool_size, finisher=None): aws = set() async def pooler(item=None): nonlocal aws finish = item is None if not finish: if isinstance(item, CoroutineType): assert not isinstance(item, asyncio.Task) item = asyncio.create_task(item) aws.add(item) # TODO: don't wait until all completed, just first completed in loop. # that way we can handle each done task ASAP. condition = asyncio.ALL_COMPLETED if finish else asyncio.FIRST_COMPLETED if len(aws) == 0: return None if finish or len(aws) >= pool_size: done, pending = await asyncio.wait(aws, return_when=condition) #pending = set(task for task in pending if task in aws) # ??? ret = None if finisher is None else finisher(done, pending) #aws = set(task for task in pending if not task.cancelled()) aws = pending if ret is not None: return ret return None return pooler def make_simple_pooler(pool_size, finisher=None): condition = asyncio.FIRST_COMPLETED pending = set() async def pooler(item=None): nonlocal pending finish = item is None if not finish: if isinstance(item, CoroutineType): assert not isinstance(item, asyncio.Task) item = asyncio.create_task(item) pending.add(item) desired_size = 0 if finish else pool_size - 1 while len(pending) > desired_size: done, pending = await asyncio.wait(pending, return_when=condition) finisher(done, pending) return pooler