Skip to content

Conversation

@kohlisid
Copy link
Contributor

@kohlisid kohlisid commented Sep 2, 2025

This PR updates the Mapstreamer implementation to process multiple requests concurrently. Previously, requests were handled sequentially within a single async loop. With this change, each incoming request is dispatched to its own background task, allowing responses from different requests to be streamed back in parallel.

Testing code

class FlatMapStream(MapStreamer):
    async def handler(self, keys: list[str], datum: Datum) -> AsyncIterable[Message]:
        """
        A handler that splits the input datum value into multiple strings by `,` separator and
        emits them as a stream.
        """
        val = datum.value
        _ = datum.event_time
        _ = datum.watermark
        strs = val.decode("utf-8").split(",")
        current_time = time.time()
        print(f"Starting task {current_time}")
        yield Message(str.encode(f"Starting task {current_time}"))
        await asyncio.sleep(5)

        if len(strs) == 0:
            yield Message.to_drop()
            return
        for s in strs:
            yield Message(str.encode(s))
        current_time = time.time()
        print(f"Ended task {current_time}")
        yield Message(str.encode(f"Ended task {current_time}"))

Old Implementation

2025-09-02T20:31:24.999748Z  INFO sink::log: Payload - Starting task 1756845083.886833 Keys - ... ID - out-2-0-0
...
2025-09-02T20:31:30.007232Z  INFO sink::log: Payload - Ended task 1756845088.8932385 Keys - ... ID - out-6-0-0

2025-09-02T20:31:30.007236Z  INFO sink::log: Payload - Starting task 1756845088.8945782 Keys - ... ID - out-7-0-0
...
2025-09-02T20:31:35.012723Z  INFO sink::log: Payload - Ended task 1756845093.8964927 Keys - ... ID - out-11-0-0

old

New Implementation
2025-09-02T20:36:44.213891Z INFO sink::log: Payload - Starting task 1756845404.1992428 Keys - ... ID - out-1-0-0
2025-09-02T20:36:44.213908Z INFO sink::log: Payload - Starting task 1756845404.2001758 Keys - ... ID - out-2-0-0
2025-09-02T20:36:44.213911Z INFO sink::log: Payload - Starting task 1756845404.201107 Keys - ... ID - out-3-0-0
2025-09-02T20:36:44.213914Z INFO sink::log: Payload - Starting task 1756845404.2018268 Keys - ... ID - out-4-0-0
...
2025-09-02T20:36:49.217125Z INFO sink::log: Payload - Ended task 1756845409.2048292 Keys - ... ID - out-24-0-0
2025-09-02T20:36:49.217133Z INFO sink::log: Payload - Ended task 1756845409.2049541 Keys - ... ID - out-27-0-0
2025-09-02T20:36:49.217139Z INFO sink::log: Payload - Ended task 1756845409.2050297 Keys - ... ID - out-30-0-0

new

Log files

old_logs.txt
new_logs.txt

@codecov
Copy link

codecov bot commented Sep 2, 2025

Codecov Report

❌ Patch coverage is 83.33333% with 6 lines in your changes missing coverage. Please review.
✅ Project coverage is 93.66%. Comparing base (b944a5d) to head (485a21b).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
pynumaflow/mapstreamer/servicer/async_servicer.py 83.33% 3 Missing and 3 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #242      +/-   ##
==========================================
- Coverage   93.71%   93.66%   -0.06%     
==========================================
  Files          65       65              
  Lines        2866     2888      +22     
  Branches      150      152       +2     
==========================================
+ Hits         2686     2705      +19     
- Misses        130      133       +3     
  Partials       50       50              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Signed-off-by: kohlisid <[email protected]>
@vigith
Copy link
Member

vigith commented Sep 2, 2025

Previously, requests were handled sequentially within a single async loop.

Is this because users are having blocking async calls?

@kohlisid kohlisid marked this pull request as ready for review September 4, 2025 22:43
@kohlisid kohlisid merged commit bf00b2e into main Sep 15, 2025
8 checks passed
@kohlisid kohlisid deleted the mpstream branch September 15, 2025 17:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants