Skip to content

Commit a2a28c7

Browse files
committed
support metadata in sink
Signed-off-by: Sreekanth <[email protected]>
1 parent 44be545 commit a2a28c7

File tree

6 files changed

+80
-13
lines changed

6 files changed

+80
-13
lines changed

packages/pynumaflow/pynumaflow/sinker/_dtypes.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
from collections.abc import Sequence, Iterator
77
from warnings import warn
88

9+
from pynumaflow._metadata import SystemMetadata, UserMetadata
10+
911
R = TypeVar("R", bound="Response")
1012
Rs = TypeVar("Rs", bound="Responses")
1113

@@ -120,14 +122,25 @@ class Datum:
120122
... )
121123
"""
122124

123-
__slots__ = ("_keys", "_id", "_value", "_event_time", "_watermark", "_headers")
125+
__slots__ = (
126+
"_keys",
127+
"_id",
128+
"_value",
129+
"_event_time",
130+
"_watermark",
131+
"_headers",
132+
"_user_metadata",
133+
"_system_metadata",
134+
)
124135

125136
_keys: list[str]
126137
_id: str
127138
_value: bytes
128139
_event_time: datetime
129140
_watermark: datetime
130141
_headers: dict[str, str]
142+
_user_metadata: UserMetadata
143+
_system_metadata: SystemMetadata
131144

132145
def __init__(
133146
self,
@@ -137,6 +150,8 @@ def __init__(
137150
event_time: datetime,
138151
watermark: datetime,
139152
headers: Optional[dict[str, str]] = None,
153+
user_metadata: Optional[UserMetadata] = None,
154+
system_metadata: Optional[SystemMetadata] = None,
140155
):
141156
self._keys = keys
142157
self._id = sink_msg_id or ""
@@ -148,6 +163,8 @@ def __init__(
148163
raise TypeError(f"Wrong data type: {type(watermark)} for Datum.watermark")
149164
self._watermark = watermark
150165
self._headers = headers or {}
166+
self._user_metadata = user_metadata or UserMetadata()
167+
self._system_metadata = system_metadata or SystemMetadata()
151168

152169
def __str__(self):
153170
value_string = self._value.decode("utf-8")
@@ -192,6 +209,16 @@ def headers(self) -> dict[str, str]:
192209
"""Returns the headers of the event."""
193210
return self._headers.copy()
194211

212+
@property
213+
def user_metadata(self) -> UserMetadata:
214+
"""Returns the user metadata of the event."""
215+
return self._user_metadata
216+
217+
@property
218+
def system_metadata(self) -> SystemMetadata:
219+
"""Returns the system metadata of the event."""
220+
return self._system_metadata
221+
195222

196223
class Sinker(metaclass=ABCMeta):
197224
"""

packages/pynumaflow/pynumaflow/sinker/servicer/async_servicer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import asyncio
2-
from collections.abc import AsyncIterable
2+
from collections.abc import AsyncIterator
33

44
from google.protobuf import empty_pb2 as _empty_pb2
55
from pynumaflow.shared.asynciter import NonBlockingIterator
@@ -33,7 +33,7 @@ def __init__(
3333

3434
async def SinkFn(
3535
self,
36-
request_iterator: AsyncIterable[sink_pb2.SinkRequest],
36+
request_iterator: AsyncIterator[sink_pb2.SinkRequest],
3737
context: NumaflowServicerContext,
3838
) -> sink_pb2.SinkResponse:
3939
"""
@@ -89,7 +89,7 @@ async def SinkFn(
8989
return
9090

9191
async def __invoke_sink(
92-
self, request_queue: AsyncIterable[Datum], context: NumaflowServicerContext
92+
self, request_queue: AsyncIterator[Datum], context: NumaflowServicerContext
9393
):
9494
try:
9595
# invoke the user function with the request queue

packages/pynumaflow/pynumaflow/sinker/servicer/sync_servicer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from collections.abc import Iterable
1+
from collections.abc import Iterator
22

33

44
from pynumaflow._constants import _LOGGER, STREAM_EOF
@@ -26,8 +26,8 @@ def __init__(self, handler: SinkSyncCallable):
2626
self.handler: SinkSyncCallable = handler
2727

2828
def SinkFn(
29-
self, request_iterator: Iterable[sink_pb2.SinkRequest], context: NumaflowServicerContext
30-
) -> Iterable[sink_pb2.SinkResponse]:
29+
self, request_iterator: Iterator[sink_pb2.SinkRequest], context: NumaflowServicerContext
30+
) -> Iterator[sink_pb2.SinkResponse]:
3131
"""
3232
Applies a sink function to datum elements.
3333
"""

packages/pynumaflow/pynumaflow/sinker/servicer/utils.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from pynumaflow._metadata import _user_and_system_metadata_from_proto
12
from pynumaflow.proto.sinker import sink_pb2
23
from pynumaflow.sinker._dtypes import Response, Datum, Responses
34

@@ -47,13 +48,16 @@ def datum_from_sink_req(d: sink_pb2.SinkRequest) -> Datum:
4748
Returns:
4849
Datum: A Datum object populated with the data from the input SinkRequest object.
4950
"""
51+
user_metadata, system_metadata = _user_and_system_metadata_from_proto(d.request.metadata)
5052
datum = Datum(
5153
keys=list(d.request.keys),
5254
sink_msg_id=d.request.id,
5355
value=d.request.value,
5456
event_time=d.request.event_time.ToDatetime(),
5557
watermark=d.request.watermark.ToDatetime(),
5658
headers=dict(d.request.headers),
59+
user_metadata=user_metadata,
60+
system_metadata=system_metadata,
5761
)
5862
return datum
5963

packages/pynumaflow/tests/sink/test_async_sink.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import grpc
99
from google.protobuf import empty_pb2 as _empty_pb2
10-
from grpc.aio._server import Server
10+
from grpc.aio import Server
1111

1212
from pynumaflow import setup_logging
1313
from pynumaflow._constants import (
@@ -16,6 +16,7 @@
1616
FALLBACK_SINK_SOCK_PATH,
1717
FALLBACK_SINK_SERVER_INFO_FILE_PATH,
1818
)
19+
from pynumaflow.proto.common import metadata_pb2
1920
from pynumaflow.sinker import (
2021
Datum,
2122
)
@@ -41,6 +42,10 @@ async def udsink_handler(datums: AsyncIterable[Datum]) -> Responses:
4142
elif msg.value.decode("utf-8") == "test_mock_fallback_message":
4243
responses.append(Response.as_fallback(msg.id))
4344
else:
45+
if msg.user_metadata.groups() != ["custom_info"]:
46+
raise ValueError("user metadata groups do not match")
47+
if msg.system_metadata.groups() != ["numaflow_version_info"]:
48+
raise ValueError("system metadata groups do not match")
4449
responses.append(Response.as_success(msg.id))
4550
return responses
4651

@@ -55,7 +60,21 @@ def start_sink_streaming_request(_id: str, req_type) -> (Datum, tuple):
5560
value = mock_fallback_message()
5661

5762
request = sink_pb2.SinkRequest.Request(
58-
value=value, event_time=event_time_timestamp, watermark=watermark_timestamp, id=_id
63+
value=value,
64+
event_time=event_time_timestamp,
65+
watermark=watermark_timestamp,
66+
id=_id,
67+
metadata=metadata_pb2.Metadata(
68+
previous_vertex="test-source",
69+
user_metadata={
70+
"custom_info": metadata_pb2.KeyValueGroup(key_value={"version": b"1.0.0"}),
71+
},
72+
sys_metadata={
73+
"numaflow_version_info": metadata_pb2.KeyValueGroup(
74+
key_value={"version": b"1.0.0"}
75+
),
76+
},
77+
),
5978
)
6079
return sink_pb2.SinkRequest(request=request)
6180

@@ -64,10 +83,8 @@ def request_generator(count, req_type="success", session=1, handshake=True):
6483
if handshake:
6584
yield sink_pb2.SinkRequest(handshake=sink_pb2.Handshake(sot=True))
6685

67-
for j in range(session):
68-
for i in range(count):
69-
yield start_sink_streaming_request(str(i), req_type)
70-
86+
for _ in range(session):
87+
yield from (start_sink_streaming_request(str(i), req_type) for i in range(count))
7188
yield sink_pb2.SinkRequest(status=sink_pb2.TransmissionStatus(eot=True))
7289

7390

packages/pynumaflow/tests/sink/test_server.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
FALLBACK_SINK_SOCK_PATH,
1616
FALLBACK_SINK_SERVER_INFO_FILE_PATH,
1717
)
18+
from pynumaflow.proto.common import metadata_pb2
1819
from pynumaflow.proto.sinker import sink_pb2
1920
from pynumaflow.sinker import Responses, Datum, Response, SinkServer
2021
from tests.testing_utils import mock_terminate_on_stop
@@ -32,6 +33,10 @@ def udsink_handler(datums: Iterator[Datum]) -> Responses:
3233
elif "fallback" in msg.value.decode("utf-8"):
3334
results.append(Response.as_fallback(msg.id))
3435
else:
36+
if msg.user_metadata.groups() != ["custom_info"]:
37+
raise ValueError("user metadata groups do not match")
38+
if msg.system_metadata.groups() != ["numaflow_version_info"]:
39+
raise ValueError("system metadata groups do not match")
3540
results.append(Response.as_success(msg.id))
3641
return results
3742

@@ -68,6 +73,16 @@ def mock_watermark():
6873
# We are mocking the terminate function from the psutil to not exit the program during testing
6974
@patch("psutil.Process.kill", mock_terminate_on_stop)
7075
class TestServer(unittest.TestCase):
76+
metadata = metadata_pb2.Metadata(
77+
previous_vertex="test-source",
78+
user_metadata={
79+
"custom_info": metadata_pb2.KeyValueGroup(key_value={"version": b"1.0.0"}),
80+
},
81+
sys_metadata={
82+
"numaflow_version_info": metadata_pb2.KeyValueGroup(key_value={"version": b"1.0.0"}),
83+
},
84+
)
85+
7186
def setUp(self) -> None:
7287
server = SinkServer(sinker_instance=udsink_handler)
7388
my_servicer = server.servicer
@@ -147,6 +162,7 @@ def test_udsink_err(self):
147162
value=mock_message(),
148163
event_time=event_time_timestamp,
149164
watermark=watermark_timestamp,
165+
metadata=self.metadata,
150166
)
151167
),
152168
sink_pb2.SinkRequest(
@@ -155,6 +171,7 @@ def test_udsink_err(self):
155171
value=mock_err_message(),
156172
event_time=event_time_timestamp,
157173
watermark=watermark_timestamp,
174+
metadata=self.metadata,
158175
)
159176
),
160177
sink_pb2.SinkRequest(status=sink_pb2.TransmissionStatus(eot=True)),
@@ -201,6 +218,7 @@ def test_forward_message(self):
201218
value=mock_message(),
202219
event_time=event_time_timestamp,
203220
watermark=watermark_timestamp,
221+
metadata=self.metadata,
204222
)
205223
),
206224
sink_pb2.SinkRequest(
@@ -209,6 +227,7 @@ def test_forward_message(self):
209227
value=mock_err_message(),
210228
event_time=event_time_timestamp,
211229
watermark=watermark_timestamp,
230+
metadata=self.metadata,
212231
)
213232
),
214233
sink_pb2.SinkRequest(status=sink_pb2.TransmissionStatus(eot=True)),

0 commit comments

Comments
 (0)