Skip to content

Commit c3df3ad

Browse files
committed
fix: fix batch processor
The change to BasePartialProcessor.async_process() in commit d08711f causes the method to create a new loop on every invocation. This is because asyncio.get_running_loop() raises RuntimeError also when the thread has an event loop, since the loop is no longer running after loop.run_until_complete() terminates.
1 parent 9e18800 commit c3df3ad

File tree

2 files changed

+32
-1
lines changed

2 files changed

+32
-1
lines changed

aws_lambda_powertools/utilities/batch/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ async def async_process_closure():
135135
# Python 3.14+ will raise RuntimeError if get_event_loop() is called when there's no running loop
136136
# We need to handle both cases: existing loop (container reuse) and no loop (cold start)
137137
try:
138-
loop = asyncio.get_running_loop()
138+
loop = asyncio.get_event_loop()
139139
except RuntimeError:
140140
# No running loop, create a new one
141141
loop = asyncio.new_event_loop()

tests/functional/batch/required_dependencies/test_utilities_batch.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -804,6 +804,37 @@ async def simple_async_handler(record: SQSRecord):
804804
assert result == {"batchItemFailures": []}
805805

806806

807+
def test_async_batch_processor_lambda_warm_start_reuses_existing_loop(sqs_event_factory, monkeypatch):
808+
"""Test async processing reuses an existing event loop in Lambda warm start"""
809+
import asyncio
810+
811+
# GIVEN Lambda environment is set (warm start scenario)
812+
monkeypatch.setenv("LAMBDA_TASK_ROOT", "/var/task")
813+
814+
# Simple async handler without external dependencies
815+
async def simple_async_handler(record: SQSRecord):
816+
await asyncio.sleep(0.001) # Yield control to event loop
817+
return {"processed": record.body}
818+
819+
records = [sqs_event_factory("success"), sqs_event_factory("success")]
820+
event = {"Records": records}
821+
processor = AsyncBatchProcessor(event_type=EventType.SQS)
822+
823+
loop = asyncio.new_event_loop()
824+
asyncio.set_event_loop(loop)
825+
# WHEN calling async_process_partial_response synchronously (like Lambda handler does)
826+
result = async_process_partial_response(
827+
event=event,
828+
record_handler=simple_async_handler,
829+
processor=processor,
830+
)
831+
832+
assert asyncio.get_event_loop() == loop
833+
834+
# THEN all records are processed successfully with new event loop created
835+
assert result == {"batchItemFailures": []}
836+
837+
807838
def test_async_batch_processor_non_lambda_uses_asyncio_run(sqs_event_factory, monkeypatch):
808839
"""Test async processing uses asyncio.run outside Lambda environment"""
809840
import asyncio

0 commit comments

Comments
 (0)