Skip to content

Bug: AsyncBatchProcessor recreates the loop on the warm start, ruining some module level locks #7792

@ibestuzhev-limehome

Description

@ibestuzhev-limehome

Expected Behaviour

We expect to use some class / module level asyncio.Lock instances.

Current Behaviour

Once used, there is lock._loop attached to it. If you try to reuse the lock from the new loop, it raises a Runtime error

RuntimeError: <asyncio.locks.Lock object at 0x76e551e05010 [locked]> is bound to a different event loop

Code snippet

I extracted `async_process` function to play around with several approaches. The first call always work, but with two calls (lambda warm start) the second might fail if new loop is used


import asyncio

lock = asyncio.Lock()
RECORDS = [3, 5]
RUNNER = asyncio.Runner()


async def some_handler(x: int) -> int:
    async with lock:
        await asyncio.sleep(1)
    return x**2


def main_old():
    async def async_process_closure():
        return list(await asyncio.gather(*[some_handler(record) for record in RECORDS]))

    coro = async_process_closure()

    loop = (
        asyncio.get_event_loop()
    )  # NOTE: this might return an error starting in Python 3.12 in a few years
    task_instance = loop.create_task(coro)
    data = loop.run_until_complete(task_instance)
    print(data)


def main_new():
    async def async_process_closure():
        return list(await asyncio.gather(*[some_handler(record) for record in RECORDS]))

    coro = async_process_closure()
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    task_instance = loop.create_task(coro)
    data = loop.run_until_complete(task_instance)
    print(data)


def main_new_same_loop():
    async def async_process_closure():
        return list(await asyncio.gather(*[some_handler(record) for record in RECORDS]))

    coro = async_process_closure()
    try:
        loop = asyncio.get_event_loop()
    except RuntimeError:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    task_instance = loop.create_task(coro)
    data = loop.run_until_complete(task_instance)
    print(data)


def main_runner():
    async def async_process_closure():
        return list(await asyncio.gather(*[some_handler(record) for record in RECORDS]))

    coro = async_process_closure()
    data = RUNNER.run(coro)
    print(data)


if __name__ == '__main__':
    main = main_new
    main()
    main()

Possible Solution

The issue was introduced in #7599

It calls get_running_loop from the sync code, which will always raise an exception. It means that for every warm lambda invocation the new loop will be created.

Keep using get_event_loop will produce warning in 3.12 but work silently in 3.14 - it will raise an exception only on cold start, then the same loop will be reused.

Alternatively, you can try asyncio.Runner without context - it should keep the loop open and reuse it.

Steps to Reproduce

You need to use some global asyncio synchronisation object, like asyncio.Lock. It will raise RuntimeError

Powertools for AWS Lambda (Python) version

latest

AWS Lambda function runtime

3.12

Packaging format used

Lambda Layers

Debugging logs

Metadata

Metadata

Labels

bugSomething isn't working

Type

Projects

Status

Working on it

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions