Skip to content

Commit c7ca40a

Browse files
authored
fix: Function call syntax error (#251)
1 parent ac2b513 commit c7ca40a

File tree

2 files changed

+8
-17
lines changed

2 files changed

+8
-17
lines changed

pynumaflow/sourcer/servicer/async_servicer.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import asyncio
2-
from collections.abc import AsyncIterable
2+
from collections.abc import AsyncIterator
33

44
from google.protobuf import timestamp_pb2 as _timestamp_pb2
55
from google.protobuf import empty_pb2 as _empty_pb2
@@ -80,9 +80,9 @@ def __initialize_handlers(self):
8080

8181
async def ReadFn(
8282
self,
83-
request_iterator: AsyncIterable[source_pb2.ReadRequest],
83+
request_iterator: AsyncIterator[source_pb2.ReadRequest],
8484
context: NumaflowServicerContext,
85-
) -> AsyncIterable[source_pb2.ReadResponse]:
85+
) -> AsyncIterator[source_pb2.ReadResponse]:
8686
"""
8787
Handles the Read function, processing incoming requests and sending responses.
8888
"""
@@ -108,7 +108,7 @@ async def ReadFn(
108108

109109
async for resp in riter:
110110
if isinstance(resp, BaseException):
111-
await handle_async_error(context, resp)
111+
await handle_async_error(context, resp, ERR_UDF_EXCEPTION_STRING)
112112
return
113113

114114
yield _create_read_response(resp)
@@ -139,9 +139,9 @@ async def __invoke_read(self, req, niter):
139139

140140
async def AckFn(
141141
self,
142-
request_iterator: AsyncIterable[source_pb2.AckRequest],
142+
request_iterator: AsyncIterator[source_pb2.AckRequest],
143143
context: NumaflowServicerContext,
144-
) -> AsyncIterable[source_pb2.AckResponse]:
144+
) -> AsyncIterator[source_pb2.AckResponse]:
145145
"""
146146
Handles the Ack function for user-defined source.
147147
"""

tests/source/test_async_source_err.py

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,10 @@
99
from grpc.aio._server import Server
1010

1111
from pynumaflow import setup_logging
12-
from pynumaflow._constants import ERR_UDF_EXCEPTION_STRING
13-
from pynumaflow.sourcer import SourceAsyncServer
1412
from pynumaflow.proto.sourcer import source_pb2_grpc
1513
from google.protobuf import empty_pb2 as _empty_pb2
1614

15+
from pynumaflow.sourcer import SourceAsyncServer
1716
from tests.source.test_async_source import request_generator
1817
from tests.source.utils import (
1918
read_req_source_fn,
@@ -92,20 +91,12 @@ def test_read_error(self) -> None:
9291
)
9392
for _ in generator_response:
9493
pass
95-
except BaseException as e:
96-
self.assertTrue(
97-
f"{ERR_UDF_EXCEPTION_STRING}: TypeError("
98-
'"handle_async_error() missing 1 required positional argument: '
99-
"'exception_type'\")" in e.__str__()
100-
)
101-
return
10294
except grpc.RpcError as e:
10395
grpc_exception = e
104-
self.assertEqual(grpc.StatusCode.UNKNOWN, e.code())
96+
self.assertEqual(grpc.StatusCode.INTERNAL, e.code())
10597
print(e.details())
10698

10799
self.assertIsNotNone(grpc_exception)
108-
self.fail("Expected an exception.")
109100

110101
def test_read_handshake_error(self) -> None:
111102
grpc_exception = None

0 commit comments

Comments
 (0)