-
Notifications
You must be signed in to change notification settings - Fork 41
feat: Add Streaming Client #77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
c69c8f7
Implement Streaming API client for python 3
adamhicks 1481a99
Refactor quotes
adamhicks 346d5f4
Fix runnable auth creds
adamhicks b2b051d
Return stream errors
adamhicks fccc48c
Bug Fix, out of order messages are now caught. And raised
27efc6b
ws.connect max_size increased to handle initial transfer
f30b9bd
More concise sequence check
YinYin-blip af6d000
Merge pull request #2 from YinYin-blip/streamingapi
adamhicks c2393a5
Merge pull request #1 from YinYin-blip/streamingapi
YinYin-blip 0b10568
improved general exceptions to be more specific
dd6ef14
Merge pull request #2 from YinYin-blip/streamingapi
YinYin-blip 2173352
Clean coroutines
3d9d82b
resolved indentation error
546298b
remove debug line
8be5204
Merge pull request #3 from YinYin-blip/streamingapi
YinYin-blip 9880927
Address CodeRabbit review comments on PR #76
echarrod d99ca9b
Address CodeRabbit review comments on PR #77
echarrod daa6c42
Fix grammar in docstring and handle parent cancellation
echarrod d945ac9
Tighten pair parsing and improve init error handling
echarrod 900abf6
Use explicit task checks instead of iterating unordered done set
echarrod File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| import asyncio | ||
|
|
||
| from luno_python.stream_client import stream_market | ||
| from luno_python.api_types import format_state | ||
|
|
||
| def handle_update(pair, state, update): | ||
| print(format_state(pair, state)) | ||
| if update is not None: | ||
| print(update) | ||
|
|
||
| async def main(): | ||
| await stream_market( | ||
| pair="XBTZAR", | ||
| api_key_id="", # API Key goes here | ||
| api_key_secret="", # and API Secret goes here | ||
| update_callback=handle_update, | ||
| ) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| from collections import namedtuple | ||
| from decimal import Decimal | ||
| from typing import List | ||
|
|
||
| DEC_0 = Decimal("0") | ||
|
|
||
| Order = namedtuple("Order", "order_id price volume") | ||
|
|
||
| MarketState = namedtuple("MarketState", "sequence asks bids status") | ||
|
|
||
| Pair = namedtuple("Pair", "base counter") | ||
|
|
||
| def format_orderbook(pair: Pair, asks: List[Order], bids: List[Order]): | ||
| if not bids or not asks: | ||
| return "Empty Orderbook" | ||
|
|
||
| bid_sum = sum((o.price * o.volume for o in bids), DEC_0) | ||
| ask_sum = sum((o.volume for o in asks), DEC_0) | ||
|
|
||
| mid = (asks[0].price + bids[0].price) / 2 | ||
|
|
||
| return f"{bid_sum} {pair.counter} - {mid} - {ask_sum} {pair.base}" | ||
|
|
||
| def format_state(pair: Pair, state: MarketState): | ||
| orderbook = format_orderbook(pair, state.asks, state.bids) | ||
| return f"[{state.sequence}] {orderbook}" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,250 @@ | ||
| """ | ||
| The stream client can be used to receive live updates to the orderbook. | ||
| It also maintains a representation of the Luno orderbook correctly updated for each event. | ||
|
|
||
| For example usage see examples/stream.py | ||
| """ | ||
|
|
||
| import asyncio | ||
| from decimal import Decimal, InvalidOperation | ||
| import json | ||
| from typing import Callable, Dict, List, Optional | ||
| import websockets | ||
|
|
||
| from .api_types import DEC_0, Order, MarketState, Pair | ||
|
|
||
| DEFAULT_URL = "wss://ws.luno.com" | ||
|
|
||
| StateUpdate = Callable[[Pair, MarketState, Optional[dict]], None] | ||
|
|
||
| class OutOfOrderMessageException(Exception): | ||
| pass | ||
|
|
||
| class MarketInitialisationException(Exception): | ||
| pass | ||
|
|
||
|
|
||
| _FOUR_CHAR_CURRENCIES = {"USDC", "USDT"} | ||
|
|
||
| def _parse_pair(pair: str) -> 'Pair': | ||
| """Parse a pair string like "XBTZAR" or "USDCZAR" into a Pair(base, counter). | ||
|
|
||
| Uses known 4-character currency codes to disambiguate variable-length pairs. | ||
| """ | ||
| pair = pair.upper() | ||
| if not pair.isalpha() or len(pair) < 6: | ||
| raise ValueError(f"Invalid pair: '{pair}'.") | ||
| if pair[:4] in _FOUR_CHAR_CURRENCIES and len(pair[4:]) in (3, 4): | ||
| return Pair(pair[:4], pair[4:]) | ||
| if pair[-4:] in _FOUR_CHAR_CURRENCIES and len(pair[:-4]) in (3, 4): | ||
| return Pair(pair[:-4], pair[-4:]) | ||
| if len(pair) == 6: | ||
| return Pair(pair[:3], pair[3:]) | ||
| raise ValueError(f"Invalid pair: '{pair}'.") | ||
|
|
||
|
|
||
| def _flatten_orders(orders, reverse): | ||
| return sorted(orders.values(), key=lambda o: o.price, reverse=reverse) | ||
|
|
||
|
|
||
| def _decrement_trade(orders: Dict[str, Order], order_id: str, volume: Decimal): | ||
| order = orders.pop(order_id, None) | ||
| if order is None: | ||
| return | ||
|
|
||
| new_order = order._replace(volume=order.volume - volume) | ||
| if new_order.volume > DEC_0: | ||
| orders[order_id] = new_order | ||
|
|
||
|
|
||
| class _MarketStreamState: | ||
| def __init__(self, first: dict): | ||
| if first is None: | ||
| raise MarketInitialisationException("Unable to initialise market state from an empty message") | ||
|
|
||
| def conv_message(msg): | ||
| return Order( | ||
| msg['id'], | ||
| Decimal(msg['price']), | ||
| Decimal(msg['volume']), | ||
| ) | ||
|
|
||
| try: | ||
| bids = [conv_message(m) for m in first['bids']] | ||
| asks = [conv_message(m) for m in first['asks']] | ||
| self._bids = {b.order_id: b for b in bids} | ||
| self._asks = {a.order_id: a for a in asks} | ||
| self._sequence = first['sequence'] | ||
| self._status = first['status'] | ||
| except (KeyError, TypeError, InvalidOperation) as exc: | ||
| raise MarketInitialisationException( | ||
| "Unable to initialise market state from the initial snapshot" | ||
| ) from exc | ||
| self._trades = [] | ||
|
|
||
| def get_asks(self): | ||
| return _flatten_orders(self._asks, False) | ||
|
|
||
| def get_bids(self): | ||
| return _flatten_orders(self._bids, True) | ||
|
|
||
| def get_status(self): | ||
| return self._status | ||
|
|
||
| def get_sequence(self): | ||
| return self._sequence | ||
|
|
||
| def get_snapshot(self): | ||
| return MarketState( | ||
| sequence=self.get_sequence(), | ||
| asks=self.get_asks(), | ||
| bids=self.get_bids(), | ||
| status=self.get_status(), | ||
| ) | ||
|
|
||
| def process_update(self, update: dict): | ||
| if update is None: | ||
| return | ||
|
|
||
| seq = update['sequence'] | ||
| if int(seq) != int(self._sequence)+1: | ||
| raise OutOfOrderMessageException() | ||
|
|
||
| trades = update.get('trade_updates') | ||
| if trades: | ||
| self._process_trades(trades) | ||
|
|
||
| create = update.get('create_update') | ||
| if create: | ||
| self._process_create(create) | ||
|
|
||
| delete_upd = update.get('delete_update') | ||
| if delete_upd: | ||
| self._process_delete(delete_upd) | ||
|
|
||
| status_upd = update.get('status_update') | ||
| if status_upd: | ||
| self._process_status(status_upd) | ||
|
|
||
| self._sequence = seq | ||
|
|
||
| def _process_trades(self, trade_updates: List[dict]): | ||
| for t in trade_updates: | ||
| maker_id = t['maker_order_id'] | ||
| volume = Decimal(t['base']) | ||
|
|
||
| _decrement_trade(self._asks, maker_id, volume) | ||
| _decrement_trade(self._bids, maker_id, volume) | ||
|
|
||
| def _process_create(self, create_update: dict): | ||
| o = Order( | ||
| create_update['order_id'], | ||
| Decimal(create_update['price']), | ||
| Decimal(create_update['volume']), | ||
| ) | ||
| if create_update['type'] == "ASK": | ||
| self._asks[o.order_id] = o | ||
| elif create_update['type'] == "BID": | ||
| self._bids[o.order_id] = o | ||
|
|
||
| def _process_delete(self, delete_update: dict): | ||
| order_id = delete_update['order_id'] | ||
| self._asks.pop(order_id, None) | ||
| self._bids.pop(order_id, None) | ||
|
|
||
| def _process_status(self, status_update: dict): | ||
| self._status = status_update['status'] | ||
|
|
||
|
|
||
| async def _read_from_websocket(ws, pair: Pair, update_f: StateUpdate): | ||
| state = None | ||
| is_first = True | ||
|
|
||
| async for message in ws: | ||
| try: | ||
| body = json.loads(message) | ||
| except ValueError as e: | ||
| raise ValueError(f"Invalid JSON received: {message}") from e | ||
|
|
||
| if body == "": # Empty update, used as keepalive | ||
| body = None | ||
|
|
||
| if body is None: | ||
| continue | ||
|
|
||
| if is_first: | ||
| is_first = False | ||
| state = _MarketStreamState(body) | ||
| update_f(pair, state.get_snapshot(), None) | ||
| continue | ||
|
|
||
| #could raise OutOfOrderMessageException | ||
| state.process_update(body) | ||
|
|
||
| update_f(pair, state.get_snapshot(), body) | ||
|
|
||
| if is_first: | ||
| raise MarketInitialisationException( | ||
| "Stream closed before the initial snapshot was received" | ||
| ) | ||
|
|
||
|
|
||
| async def _write_keep_alive(ws): | ||
| while True: | ||
| await ws.send('""') | ||
| await asyncio.sleep(60) | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| async def stream_market( | ||
| pair: str, | ||
| api_key_id: str, | ||
| api_key_secret: str, | ||
| update_callback: StateUpdate, | ||
| base_url: str = DEFAULT_URL, | ||
| ): | ||
| """Opens a stream to /api/1/stream/... | ||
|
|
||
| Stream orderbook information and maintain an orderbook state. | ||
|
|
||
| :param pair: str Currency pair code (for example, "XBTZAR"). | ||
| :param api_key_id: str | ||
| :param api_key_secret: str | ||
| :param update_callback: a StateUpdate function that will be called with new updates. | ||
| """ | ||
| p = _parse_pair(pair) | ||
| url = '/'.join([base_url, 'api/1/stream', p.base + p.counter]) | ||
|
|
||
| async with websockets.connect( | ||
| url, | ||
| origin='http://localhost/', | ||
| ping_interval=None, | ||
| max_size=2**21, | ||
| ) as websocket: | ||
|
|
||
| auth = json.dumps({ | ||
| 'api_key_id': api_key_id, | ||
| 'api_key_secret': api_key_secret, | ||
| }) | ||
| await websocket.send(auth) | ||
|
|
||
| reader = asyncio.create_task(_read_from_websocket(websocket, p, update_callback)) | ||
| keepalive = asyncio.create_task(_write_keep_alive(websocket)) | ||
| tasks = (reader, keepalive) | ||
|
|
||
| try: | ||
| done, pending = await asyncio.wait( | ||
| set(tasks), | ||
| return_when=asyncio.FIRST_COMPLETED, | ||
| ) | ||
| for task in pending: | ||
| task.cancel() | ||
| await asyncio.gather(*pending, return_exceptions=True) | ||
| if reader in done: | ||
| reader.result() | ||
| if keepalive in done: | ||
| keepalive.result() | ||
| finally: | ||
| for task in tasks: | ||
| if not task.done(): | ||
| task.cancel() | ||
| await asyncio.gather(*tasks, return_exceptions=True) | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.