Skip to content

Commit 930bba8

Browse files
authored
Remove risk of getting "coroutine 'Queue.get' was never awaited" warning. (#687)
This could happen when the coroutine object returned from asyncio.Queue.get() was handed over to a newly created task which was then immediately cancelled so that the coroutine object was never awaited. Instead, let the newly created task itself make the call to asyncio.Queue.get() before immediately awaiting it. Resolves #646. Signed-off-by: Knut Aksel Røysland <[email protected]>
1 parent 30b529b commit 930bba8

File tree

2 files changed

+27
-3
lines changed

2 files changed

+27
-3
lines changed

nats/aio/subscription.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,10 @@ async def next_msg(self, timeout: Optional[float] = 1.0) -> Msg:
172172
msg = await sub.next_msg(timeout=1)
173173
174174
"""
175+
176+
async def timed_get() -> Msg:
177+
return await asyncio.wait_for(self._pending_queue.get(), timeout)
178+
175179
if self._conn.is_closed:
176180
raise errors.ConnectionClosedError
177181

@@ -182,9 +186,7 @@ async def next_msg(self, timeout: Optional[float] = 1.0) -> Msg:
182186

183187
task_name = str(uuid4())
184188
try:
185-
future = asyncio.create_task(
186-
asyncio.wait_for(self._pending_queue.get(), timeout)
187-
)
189+
future = asyncio.create_task(timed_get())
188190
self._pending_next_msgs_calls[task_name] = future
189191
msg = await future
190192
except asyncio.TimeoutError:

tests/test_js.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -829,6 +829,28 @@ async def wait_for_mock(future, _):
829829

830830
await nc.close()
831831

832+
@async_test
833+
async def test_fetch_being_cancelled_early(self):
834+
nc = NATS()
835+
await nc.connect()
836+
837+
js = nc.jetstream()
838+
await js.add_stream(name="test", subjects=["test.a"])
839+
sub = await js.pull_subscribe("test.a", "test", stream="test")
840+
841+
task = asyncio.create_task(sub.fetch())
842+
843+
# Allow "task" to reach the point where it has just finished calling
844+
# asyncio.create_task() inside nats.client.Subscription.next_msg().
845+
await asyncio.sleep(0)
846+
847+
# Cancel the call to sub.fetch() to provoke issue #646.
848+
task.cancel()
849+
with self.assertRaises(asyncio.CancelledError):
850+
await task
851+
852+
await nc.close()
853+
832854
@async_test
833855
async def test_ephemeral_pull_subscribe(self):
834856
nc = NATS()

0 commit comments

Comments
 (0)