Skip to content

feat: Add Streaming Client#77

Merged
echarrod merged 20 commits intoluno:mainfrom
echarrod:fix/pr76-review-comments
Mar 30, 2026
Merged

feat: Add Streaming Client#77
echarrod merged 20 commits intoluno:mainfrom
echarrod:fix/pr76-review-comments

Conversation

@echarrod
Copy link
Copy Markdown
Contributor

@echarrod echarrod commented Mar 29, 2026

Summary

  • Fix StateUpdate type alias to accept Optional[dict] for the callback's third parameter, since None is passed on initial snapshot
  • Skip keepalive frames received before the initial snapshot to prevent MarketInitialisationException on startup
  • Correct the pair parameter docstring from describing an amount to describing the currency pair code
  • Fixes Streaming API #75

Addresses the outstanding CodeRabbit review comments from #76.

Summary by CodeRabbit

  • New Features

    • Real-time market data streaming via WebSocket for live orderbook updates and incremental state changes
    • Executable async example demonstrating subscription and processing of live market streams
    • Human-friendly orderbook and market-state formatting for clearer console output
  • Documentation

    • Updated examples and docs to use Python 3-compatible print(...) syntax

adamhicks and others added 16 commits July 8, 2019 16:09
As recommended by reviewer comment
improved general exceptions to be more specific
- Fix StateUpdate type alias to accept Optional[dict] for the callback's
  third parameter, since None is passed on initial snapshot
- Skip keepalive frames received before the initial snapshot to prevent
  MarketInitialisationException on startup
- Correct the pair parameter docstring from describing an amount to
  describing the currency pair code
@echarrod echarrod force-pushed the fix/pr76-review-comments branch from be105ba to 9880927 Compare March 29, 2026 18:42
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 29, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR updates README and docstring examples to Python 3 print(...) syntax, adds an asyncio example at examples/stream.py, introduces luno_python/api_types.py with DEC_0, Order, MarketState, and Pair plus format_orderbook and format_state helpers, and implements luno_python/stream_client.py — an async websocket stream client exposing stream_market(...), StateUpdate type alias, DEFAULT_URL, two custom exceptions (OutOfOrderMessageException, MarketInitialisationException), pair parsing, in-memory orderbook state handling (initial snapshot, sequential updates: create/trade/delete/status), snapshot emission via callback, and keep‑alive writes.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐇 I hopped through websockets with a curious nose,

Sequences marching where the orderbook grows,
Keep‑alive whispers and snapshots that gleam,
I nibble updates and dream the stream dream,
A little rabbit cheers the code — hip‑hop!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 13.04% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description check ✅ Passed The description is well-related to the changeset, detailing specific fixes including StateUpdate type alias correction, keepalive frame handling, and docstring correction.
Linked Issues check ✅ Passed The pull request successfully addresses the linked issue #75 by incorporating streaming API updates into the repository with correct startup handling, robust parsing, and accurate typing for callbacks.
Out of Scope Changes check ✅ Passed All changes are within scope: streaming client implementation, example code for usage, data type definitions, and documentation updates directly support the streaming API objective.
Title check ✅ Passed The title 'feat: Add Streaming Client' clearly and concisely summarizes the main change: introducing a new streaming client module with associated types, exceptions, and examples.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@echarrod echarrod changed the title Address outstanding review comments from PR #76 Streaming Client updates Mar 29, 2026
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@luno_python/stream_client.py`:
- Around line 145-160: The loop currently treats empty keepalive frames (body ==
"" => body = None) specially only when is_first is True, which lets later
keepalive frames fall through and call update_f(..., None) reusing the
initial-snapshot sentinel; fix this by skipping keepalive frames
unconditionally: after converting "" to None, if body is None then continue
(move the keepalive check before any is_first handling), so _MarketStreamState
initialization (is_first branch), state.process_update(body) and update_f(pair,
...) only run for real updates.
- Around line 163-166: The keepalive loop in _write_keep_alive never returns and
the code uses TaskGroup (Python 3.11-only), causing stream_market to hang or
break compatibility; change orchestration to create two tasks (reader task and
keepalive task) with asyncio.wait(..., return_when=FIRST_COMPLETED) or
equivalent so when either finishes you cancel the other (cancel the keepalive
task when the reader finishes), ensure _write_keep_alive handles
asyncio.CancelledError cleanly and that you use only asyncio APIs compatible
with Python 3.5/3.6 (avoid TaskGroup) so socket closes do not hang or produce
misleading shutdown errors.
- Around line 185-189: Remove the rigid length check and update parsing so Pair
can handle variable-length currency codes: stop forcing pair to be length 6 and
stop slicing as pair[:3]/pair[3:]; instead allow creating Pair from the full
pair string (change the call at p = Pair(...) to pass the raw pair) and
implement detection inside Pair (or a helper used by Pair) that tries counter
code lengths of 4 then 3 (and/or any known currency set) to find a valid
base/counter split (e.g., try last-4 as counter, then last-3), falling back to a
clear ValueError if no valid split is found; keep the url construction using
p.base + p.counter unchanged.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 5faab3e8-cd6f-4052-8766-f510064f86d6

📥 Commits

Reviewing files that changed from the base of the PR and between 838df1f and 9880927.

📒 Files selected for processing (5)
  • README.md
  • examples/stream.py
  • luno_python/api_types.py
  • luno_python/client.py
  • luno_python/stream_client.py

- Skip keepalive frames unconditionally (not just before init) to avoid
  spamming callback with no-op updates
- Replace TaskGroup with asyncio.wait(FIRST_COMPLETED) for broader
  Python version compat and clean shutdown when reader finishes
- Support variable-length currency pairs (USDC, USDT) by using known
  4-char currency codes to disambiguate pair splits
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
luno_python/stream_client.py (1)

138-141: Unknown order types are silently ignored.

If create_update['type'] is neither "ASK" nor "BID", the order is silently dropped. This is defensive, but consider whether logging a warning would help detect unexpected API changes.

💡 Optional: Add logging for unknown types
+import logging
+
+_logger = logging.getLogger(__name__)
+
 # In _process_create:
         if create_update['type'] == "ASK":
             self._asks[o.order_id] = o
         elif create_update['type'] == "BID":
             self._bids[o.order_id] = o
+        else:
+            _logger.warning("Unknown order type: %s", create_update['type'])
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@luno_python/stream_client.py` around lines 138 - 141, The current create
handling in stream_client (the branch checking create_update['type'] and
assigning to self._asks/self._bids using o.order_id) silently ignores any type
that is not "ASK" or "BID"; add a warning log call (using the module/class
logger used elsewhere in stream_client.py) in the else branch to record
unexpected create_update['type'] and the full create_update payload (or at least
order_id and type) so API changes or malformed messages are detectable; ensure
you reference the same logger instance used in this class and keep behavior
otherwise unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@luno_python/stream_client.py`:
- Line 200: Fix the docstring grammar for the parameter update_callback in
stream_client.py by changing "an StateUpdate" to "a StateUpdate" in the
parameter description (the docstring that reads ":param update_callback: an
StateUpdate function that will be called with new updates."). Keep everything
else the same and ensure the updated docstring still references the
update_callback parameter and StateUpdate type.

---

Nitpick comments:
In `@luno_python/stream_client.py`:
- Around line 138-141: The current create handling in stream_client (the branch
checking create_update['type'] and assigning to self._asks/self._bids using
o.order_id) silently ignores any type that is not "ASK" or "BID"; add a warning
log call (using the module/class logger used elsewhere in stream_client.py) in
the else branch to record unexpected create_update['type'] and the full
create_update payload (or at least order_id and type) so API changes or
malformed messages are detectable; ensure you reference the same logger instance
used in this class and keep behavior otherwise unchanged.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: afa41a19-107d-4b7a-82ce-dfdc8645851a

📥 Commits

Reviewing files that changed from the base of the PR and between 9880927 and d99ca9b.

📒 Files selected for processing (1)
  • luno_python/stream_client.py

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
luno_python/stream_client.py (1)

103-104: Include sequence values in out-of-order errors.

OutOfOrderMessageException() without context makes production triage harder. Include expected/actual sequence values in the exception message.

Proposed tweak
         seq = update['sequence']
         if int(seq) != int(self._sequence)+1:
-            raise OutOfOrderMessageException()
+            raise OutOfOrderMessageException(
+                f"Out-of-order message: expected {int(self._sequence) + 1}, got {seq}"
+            )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@luno_python/stream_client.py` around lines 103 - 104, The
OutOfOrderMessageException is raised with no context in the sequence check
inside the stream client; update the raise to include the expected and actual
sequence values (use self._sequence and seq — e.g., expected =
int(self._sequence)+1 and actual = int(seq)) so the exception message contains
both values, or attach them as attributes on OutOfOrderMessageException for
better triage.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@luno_python/stream_client.py`:
- Around line 221-229: The task orchestration around asyncio.wait in
stream_market may leave reader and keepalive running if the parent is cancelled;
wrap the wait/gather logic in a try/finally inside stream_market so that in the
finally block you always cancel both reader and keepalive tasks and await them
(use asyncio.gather(..., return_exceptions=True)) to suppress/collect their
exceptions, and avoid relying only on the done/pending logic—explicitly cancel
and await {reader, keepalive} to ensure no orphaned websocket tasks remain.

---

Nitpick comments:
In `@luno_python/stream_client.py`:
- Around line 103-104: The OutOfOrderMessageException is raised with no context
in the sequence check inside the stream client; update the raise to include the
expected and actual sequence values (use self._sequence and seq — e.g., expected
= int(self._sequence)+1 and actual = int(seq)) so the exception message contains
both values, or attach them as attributes on OutOfOrderMessageException for
better triage.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 9824c1bd-2a92-47b0-b3b6-a07f14fef29b

📥 Commits

Reviewing files that changed from the base of the PR and between d99ca9b and fea50a9.

📒 Files selected for processing (1)
  • luno_python/stream_client.py

- Fix "an StateUpdate" → "a StateUpdate" in docstring
- Wrap task orchestration in try/finally to ensure reader and keepalive
  tasks are always cancelled and awaited, even if stream_market itself
  is cancelled externally
@echarrod echarrod force-pushed the fix/pr76-review-comments branch from fea50a9 to daa6c42 Compare March 30, 2026 07:17
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@luno_python/stream_client.py`:
- Around line 59-76: Wrap the entire startup parsing logic in __init__ (the
conv_message/map-to-_bids/_asks/_sequence/_trades/_status block) in a try/except
and on any exception raise MarketInitialisationException instead of letting
low-level errors leak; capture the original exception message and include it in
the MarketInitialisationException so EOF, malformed frames, or parse errors are
reported as initialization failures. Apply the same try/except-and-reraise
pattern to the other snapshot parsing block with the same
conv_message/assignment logic to ensure all startup failures produce
MarketInitialisationException.
- Around line 34-41: The parser currently guesses a split for unknown-length
pair strings which can yield incorrect Pair metadata; update the logic in
stream_client.py (the block that uses pair, _FOUR_CHAR_CURRENCIES and returns
Pair(...)) to reject any shapes other than the explicit 3/3 and the known 4-char
currency splits by raising a ValueError; keep the existing checks for all-alpha
and min length, keep the two branches that detect 4-char currency on the left or
right and return Pair accordingly, but replace the final fallback (currently
returning Pair(pair[:3], pair[3:])) with a ValueError indicating unsupported
pair format so the function fails closed for any other shapes.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: ef298da5-1574-4920-b452-819b0efffa52

📥 Commits

Reviewing files that changed from the base of the PR and between fea50a9 and daa6c42.

📒 Files selected for processing (1)
  • luno_python/stream_client.py

- Fail closed in _parse_pair for unrecognised pair shapes instead of
  guessing a 3+N split
- Wrap _MarketStreamState init parsing in try/except to surface
  malformed snapshots as MarketInitialisationException
- Raise MarketInitialisationException if stream closes before the
  initial snapshot is received
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@luno_python/stream_client.py`:
- Around line 50-57: The incremental-update path currently swallows errors
(unknown maker order, overfill, or unexpected side) causing silent desync;
update _decrement_trade to validate and fail loudly: if orders.pop(order_id,
None) is None raise an exception (e.g. ValueError) indicating unknown order, and
if volume > order.volume raise an exception indicating overfill instead of
silently returning; ensure any incremental-update handler that calls
_decrement_trade (and the corresponding create/update handler noted in the
comment) propagates that exception so the sequence advancement stops rather than
continuing (do not advance _sequence on exceptions); keep the existing DEC_0
check for reinserting only after subtracting volume.
- Around line 234-243: The current logic iterates the unordered done set after
asyncio.wait(FIRST_COMPLETED), which can obscure which task (reader or
keepalive) completed and hide exceptions; update the block that awaits tasks
(the variables/tasks containing reader and keepalive) to check each task
explicitly (e.g., check reader.done() and keepalive.done()), call .result() on
the completed task(s) to surface exceptions, cancel and gather pending tasks as
before, and avoid relying on iterating the done set so exception handling
remains deterministic for the reader and keepalive tasks.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 1210d937-7116-431d-9ecf-7f75a30ee535

📥 Commits

Reviewing files that changed from the base of the PR and between daa6c42 and d945ac9.

📒 Files selected for processing (1)
  • luno_python/stream_client.py

Prioritise reader.result() over keepalive.result() so the more
informative exception surfaces when both tasks fail simultaneously.
@sonarqubecloud
Copy link
Copy Markdown

@echarrod echarrod changed the title Streaming Client updates feat: Add Streaming Client Mar 30, 2026
@echarrod echarrod merged commit 002cdde into luno:main Mar 30, 2026
6 checks passed
@echarrod echarrod deleted the fix/pr76-review-comments branch March 30, 2026 14:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Streaming API

4 participants