diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 5e478edec7e..ce2f6d52314 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -135,7 +135,7 @@ async def async_process_closure(): # Python 3.14+ will raise RuntimeError if get_event_loop() is called when there's no running loop # We need to handle both cases: existing loop (container reuse) and no loop (cold start) try: - loop = asyncio.get_running_loop() + loop = asyncio.get_event_loop() except RuntimeError: # No running loop, create a new one loop = asyncio.new_event_loop() diff --git a/tests/functional/batch/required_dependencies/test_utilities_batch.py b/tests/functional/batch/required_dependencies/test_utilities_batch.py index 2e53d20592c..43c2aa16191 100644 --- a/tests/functional/batch/required_dependencies/test_utilities_batch.py +++ b/tests/functional/batch/required_dependencies/test_utilities_batch.py @@ -804,6 +804,37 @@ async def simple_async_handler(record: SQSRecord): assert result == {"batchItemFailures": []} +def test_async_batch_processor_lambda_warm_start_reuses_existing_loop(sqs_event_factory, monkeypatch): + """Test async processing reuses an existing event loop in Lambda warm start""" + import asyncio + + # GIVEN Lambda environment is set (warm start scenario) + monkeypatch.setenv("LAMBDA_TASK_ROOT", "/var/task") + + # Simple async handler without external dependencies + async def simple_async_handler(record: SQSRecord): + await asyncio.sleep(0.001) # Yield control to event loop + return {"processed": record.body} + + records = [sqs_event_factory("success"), sqs_event_factory("success")] + event = {"Records": records} + processor = AsyncBatchProcessor(event_type=EventType.SQS) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + # WHEN calling async_process_partial_response synchronously (like Lambda handler does) + result = async_process_partial_response( + event=event, + record_handler=simple_async_handler, + processor=processor, + ) + + assert asyncio.get_event_loop() == loop + + # THEN all records are processed successfully with new event loop created + assert result == {"batchItemFailures": []} + + def test_async_batch_processor_non_lambda_uses_asyncio_run(sqs_event_factory, monkeypatch): """Test async processing uses asyncio.run outside Lambda environment""" import asyncio