Situation
We want to apply the producer/consumer model on different places. It would be helpful to keep it DRY.
Use Case
Avoid boilerplate code
Possible Implementation
In most case, it's the same boilerplate code:
# src/docbuild/utils/concurrency.py
import asyncio
import logging
from typing import TypeVar, Callable, Awaitable, Iterable
T = TypeVar("T") # Input type
R = TypeVar("R") # Result type
log = logging.getLogger(__name__)
async def parallel_process(
items: Iterable[T],
worker_fn: Callable[[T], Awaitable[R]],
limit: int,
return_exceptions: bool = False,
) -> list[R | Exception]:
"""
Process a list of items in parallel using a fixed number of workers.
:param items: An iterable of items to process.
:param worker_fn: An async function that processes a single item.
:param limit: The maximum number of concurrent workers.
:param return_exceptions: If True, exceptions are returned as results
instead of raised.
:return: A list of results (unordered unless you track indices).
"""
queue: asyncio.Queue[T] = asyncio.Queue()
results: list[R | Exception] = []
# 1. Populate Queue
for item in items:
queue.put_nowait(item)
# 2. Define Worker
async def worker() -> None:
while True:
try:
item = await queue.get()
except asyncio.CancelledError:
return
try:
res = await worker_fn(item)
results.append(res)
except Exception as e:
if return_exceptions:
results.append(e)
else:
log.error("Worker failed: %s", e)
# Optional: Cancel all other workers here if you want fail-fast
finally:
queue.task_done()
# 3. Start Workers
workers = [asyncio.create_task(worker()) for _ in range(limit)]
# 4. Wait & Cleanup
await queue.join()
for w in workers:
w.cancel()
await asyncio.gather(*workers, return_exceptions=True)
return results
Situation
We want to apply the producer/consumer model on different places. It would be helpful to keep it DRY.
Use Case
Avoid boilerplate code
Possible Implementation
In most case, it's the same boilerplate code: