Skip to content

Abstract producer/consumer model #191

@tomschr

Description

@tomschr

Situation

We want to apply the producer/consumer model on different places. It would be helpful to keep it DRY.

Use Case

Avoid boilerplate code

Possible Implementation

In most case, it's the same boilerplate code:

# src/docbuild/utils/concurrency.py
import asyncio
import logging
from typing import TypeVar, Callable, Awaitable, Iterable

T = TypeVar("T")  # Input type
R = TypeVar("R")  # Result type

log = logging.getLogger(__name__)

async def parallel_process(
    items: Iterable[T],
    worker_fn: Callable[[T], Awaitable[R]],
    limit: int,
    return_exceptions: bool = False,
) -> list[R | Exception]:
    """
    Process a list of items in parallel using a fixed number of workers.

    :param items: An iterable of items to process.
    :param worker_fn: An async function that processes a single item.
    :param limit: The maximum number of concurrent workers.
    :param return_exceptions: If True, exceptions are returned as results 
                              instead of raised.
    :return: A list of results (unordered unless you track indices).
    """
    queue: asyncio.Queue[T] = asyncio.Queue()
    results: list[R | Exception] = []
    
    # 1. Populate Queue
    for item in items:
        queue.put_nowait(item)
    
    # 2. Define Worker
    async def worker() -> None:
        while True:
            try:
                item = await queue.get()
            except asyncio.CancelledError:
                return

            try:
                res = await worker_fn(item)
                results.append(res)
            except Exception as e:
                if return_exceptions:
                    results.append(e)
                else:
                    log.error("Worker failed: %s", e)
                    # Optional: Cancel all other workers here if you want fail-fast
            finally:
                queue.task_done()

    # 3. Start Workers
    workers = [asyncio.create_task(worker()) for _ in range(limit)]

    # 4. Wait & Cleanup
    await queue.join()
    for w in workers:
        w.cancel()
    await asyncio.gather(*workers, return_exceptions=True)

    return results

Metadata

Metadata

Assignees

Labels

kind:refactorCode cleanup without logic change.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions