pip install asyncio-extensionsasyncio-extensions provides a cancellable version of AsyncIO's TaskGroup.
import asyncio
from asyncio_extensions import TaskGroup
queue = asyncio.Queue()
async with TaskGroup() as tg:
for _ in range(10):
tg.create_task(consume_from_queue(queue))
await add_to_queue(queue)
await queue.join()
tg.cancel()A version of TaskGroup that limits the number of concurrently running tasks.
import asyncio
from asyncio_extensions import LimitedTaskGroup
queue = asyncio.Queue()
async with LimitedTaskGroup(3) as tg:
for _ in range(50):
tg.create_task(some_expensive_operation(queue))
await add_to_queue(queue)
await queue.join()
tg.cancel()TerminateTaskGroup and force_terminate_task_group implement the terminating a task group pattern from the Python docs. Schedule force_terminate_task_group() as a task to stop the entire group early; catch TerminateTaskGroup with except* to suppress it.
When using asyncio_extensions.TaskGroup, suppression is automatic — no except* block needed.
import asyncio
from asyncio_extensions import TerminateTaskGroup, force_terminate_task_group
async def main() -> None:
try:
async with asyncio.TaskGroup() as tg:
task = tg.create_task(do_work())
tg.create_task(force_terminate_task_group())
except* TerminateTaskGroup:
passThe checkpoint function yields control to the event loop. It is a more elegant approach to do-nothing tasks, giving other tasks a chance to run.
from asyncio_extensions import checkpoint
class DummyChannel:
async def send_message(self, message):
await checkpoint()The sleep_forever function never returns. It simply keeps yielding control to the event loop.
from asyncio_extensions import sleep_forever
class DummyChannel:
async def receive_message(self):
await sleep_forever()The heartbeat function runs a given callable at a regular interval.
from asyncio_extensions import heartbeat
async def ping():
pass
async with TaskGroup() as tg:
tg.create_task(heartbeat(5, ping))
await some_long_running_process()The identity function yields control back to the event loop once, and then returns the value that was passed in. It is useful when you already have a cached result and want to create a task that behaves like any other asynchronous operation.
from asyncio_extensions import TaskGroup, identity
async def get_product(product_id: int):
cached_product = await product_cache.get(product_id)
async with TaskGroup() as tg:
if cached_product is not None:
task = tg.create_task(identity(cached_product))
else:
task = tg.create_task(fetch_product_from_api(product_id))
tg.create_task(update_search_metrics(product_id))
product = task.result()
return productThe asyncify function ensures a callable can be awaited. If the callable is already a coroutine function, it is returned as-is. Otherwise, it is wrapped so that calls run in a separate thread.
from asyncio_extensions import asyncify
def blocking_read(path: str) -> str:
with open(path) as f:
return f.read()
async def main():
content = await asyncify(blocking_read)("data.txt")It can also be used as a decorator:
from asyncio_extensions import asyncify
@asyncify
def blocking_read(path: str) -> str:
with open(path) as f:
return f.read()
async def main():
content = await blocking_read("data.txt")The asyncify_iterable function converts any sync or async iterable into an AsyncIterable. If the input is already async, it is returned unchanged. Otherwise it is wrapped in an async generator that yields each item and calls checkpoint() between items to avoid monopolising the event loop on large inputs.
from asyncio_extensions import asyncify_iterable
async def process(items):
async for item in asyncify_iterable(items):
await handle(item)The iterate_queue async generator wraps an asyncio.Queue so it can be consumed with a plain async for loop. It calls task_done() automatically after each item and stops when the queue is shut down (Python 3.13+) or when a sentinel value is dequeued.
import asyncio
from asyncio_extensions import iterate_queue
async def process_items(queue: asyncio.Queue[str]) -> None:
async for item in iterate_queue(queue):
print(item)To signal the end of the stream, call queue.shutdown() from the producer (Python 3.13+):
async def producer(queue: asyncio.Queue[str]) -> None:
for item in ["a", "b", "c"]:
await queue.put(item)
queue.shutdown()On older Python versions, put the STOP sentinel in the queue when done:
from asyncio_extensions import iterate_queue, STOP
async def producer(queue: asyncio.Queue[object]) -> None:
for item in ["a", "b", "c"]:
await queue.put(item)
await queue.put(STOP)
async def consumer(queue: asyncio.Queue[object]) -> None:
async for item in iterate_queue(queue):
print(item)Note:
STOPis deprecated on Python 3.13+. Preferqueue.shutdown()instead.
The fill_queue coroutine fills an asyncio.Queue from any sync or async iterable. It accepts both Iterable and AsyncIterable sources and blocks if the queue is full until space becomes available.
import asyncio
from asyncio_extensions import fill_queue
async def main() -> None:
queue: asyncio.Queue[int] = asyncio.Queue()
await fill_queue(range(10), queue)It also works with async iterables:
async def source():
for item in fetch_from_api():
yield item
await fill_queue(source(), queue)The drain coroutine consumes and discards the remaining items from an async iterator or async iterable. This is useful when you want to stop processing early but must ensure the source is exhausted so producers can finish and resources can be released.
Example — process only the first N items, then discard the rest so the producer can complete:
from asyncio_extensions import drain
async def iter_items():
... # yields items that need to process
async for idx, item in enumerate(iter_items()):
if idx >= 5:
break
print(item)
# ensure the remaining items are consumed and the producer can finish
await drain(stream)The merge_iterables async context manager merges multiple sync or async iterables into a single interleaved stream, feeding all sources into a shared queue concurrently.
import asyncio
from asyncio_extensions import merge_iterables
async def main() -> None:
async with merge_iterables([1, 2, 3], [4, 5, 6]) as stream:
async for item in stream:
print(item)It also accepts async iterables, which can produce items in parallel:
async def fetch_page(page: int):
... # yields items from a remote page
async with merge_iterables(fetch_page(1), fetch_page(2)) as stream:
async for item in stream:
process(item)ManagedStream[T] is the type alias for an async context manager that yields an AsyncIterator[T]. It is the return type of safe_gen and merge_iterables, and the accepted parameter type of flatten_stream. Use it to annotate your own functions that expose a context-managed async stream:
from asyncio_extensions import ManagedStream
def my_stream() -> ManagedStream[int]:
...merge_streams is the managed-stream equivalent of merge_iterables. It accepts one or more ManagedStream objects (context managers that yield AsyncIterators) and returns a single managed stream that interleaves items from all sources. Use it when each source requires context-managed setup/cleanup (for example, generators wrapped with safe_gen or asynccontextmanager-based streams).
Example — merging two managed streams produced by safe_gen:
from asyncio_extensions import merge_streams, safe_gen, checkpoint
@safe_gen
async def source(name: str):
for i in range(3):
yield f"{name}-{i}"
await checkpoint()
async def main():
async with merge_streams(source("a"), source("b")) as stream:
async for item in stream:
print(item)Each input managed stream is guaranteed to be closed when the merged context exits, ensuring proper cleanup of resources.
The safe_gen decorator converts an async generator function into a context manager, enforcing correct handling of early exits. A plain async generator abandoned before exhaustion leaks resources and keeps running indefinitely — the caller has no way to know it needs to call aclose(). By returning a context manager instead, safe_gen makes cleanup syntactically mandatory: callers must use async with, which guarantees aclose() is called on exit regardless of how iteration ends. It also suppresses GeneratorExit raised inside an exception group, so it composes safely with TaskGroup.
from asyncio_extensions import safe_gen
@safe_gen
async def paginate(url: str) -> AsyncGenerator[dict]:
while url:
response = await fetch(url)
for item in response["results"]:
yield item
url = response.get("next")
async with paginate("https://api.example.com/items") as stream:
async for item in stream:
if should_stop(item):
break # generator is closed automaticallyThe flatten_stream async generator enters a ManagedStream context manager and yields its items directly, without requiring an explicit async with block. This is particularly useful when a plain async for loop is the only interface available — such as Django's StreamingHttpResponse.
The recommended pattern is to keep the async with block until the last possible moment, and only apply flatten_stream at the interface boundary:
from contextlib import asynccontextmanager
from django.http import StreamingHttpResponse
from asyncio_extensions import flatten_stream, merge_iterables
async def export_view(request):
async def serialized(stream):
async for row in stream:
yield serialize(row)
@asynccontextmanager
async def response_body():
async with merge_iterables(fetch_orders(), fetch_invoices()) as stream:
yield serialized(stream)
return StreamingHttpResponse(flatten_stream(response_body()), content_type="text/csv")For early-exit scenarios, wrap the result in aclosing to ensure proper cleanup:
from contextlib import aclosing
async with aclosing(flatten_stream(merge_iterables(source_a, source_b))) as stream:
async for item in stream:
if done(item):
breakThe iscoroutinefunction helper checks whether a callable is already a coroutine function. It is re-exported from inspect on newer Python versions and from asyncio on older versions, depending on the runtime.
The markcoroutinefunction helper marks a normal sync callable as a coroutine function. On Python 3.12+ this is inspect.markcoroutinefunction, but with the return type annotated so the function can be treated as a coroutine function in type-checked code.
is_awaitable is a typed variant of iscoroutinefunction that returns a TypeIs guard, allowing type checkers to narrow the callable's return type to Awaitable in the True branch.
from asyncio_extensions import iscoroutinefunction, is_awaitable, markcoroutinefunction
async def main():
def sync_task() -> int:
return 42
assert iscoroutinefunction(sync_task) is False
marked = markcoroutinefunction(sync_task)
assert iscoroutinefunction(marked) is True
async def call(fn: Callable[[], int] | Callable[[], Awaitable[int]]) -> int:
if is_awaitable(fn):
return await fn() # type checker knows fn() returns Awaitable[int] here
return fn()Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.
Please make sure to update tests as appropriate.
asyncio-extensions is distributed under the terms of the MIT license.