diff --git a/README.md b/README.md index 54894de..09bc9d3 100644 --- a/README.md +++ b/README.md @@ -23,9 +23,9 @@ from luno_python.client import Client c = Client(api_key_id='key_id', api_key_secret='key_secret') try: res = c.get_ticker(pair='XBTZAR') - print res + print(res) except Exception as e: - print e + print(e) ``` ### License diff --git a/examples/stream.py b/examples/stream.py new file mode 100644 index 0000000..7e669f8 --- /dev/null +++ b/examples/stream.py @@ -0,0 +1,21 @@ +import asyncio + +from luno_python.stream_client import stream_market +from luno_python.api_types import format_state + +def handle_update(pair, state, update): + print(format_state(pair, state)) + if update is not None: + print(update) + +async def main(): + await stream_market( + pair="XBTZAR", + api_key_id="", # API Key goes here + api_key_secret="", # and API Secret goes here + update_callback=handle_update, + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/luno_python/api_types.py b/luno_python/api_types.py new file mode 100644 index 0000000..736b916 --- /dev/null +++ b/luno_python/api_types.py @@ -0,0 +1,26 @@ +from collections import namedtuple +from decimal import Decimal +from typing import List + +DEC_0 = Decimal("0") + +Order = namedtuple("Order", "order_id price volume") + +MarketState = namedtuple("MarketState", "sequence asks bids status") + +Pair = namedtuple("Pair", "base counter") + +def format_orderbook(pair: Pair, asks: List[Order], bids: List[Order]): + if not bids or not asks: + return "Empty Orderbook" + + bid_sum = sum((o.price * o.volume for o in bids), DEC_0) + ask_sum = sum((o.volume for o in asks), DEC_0) + + mid = (asks[0].price + bids[0].price) / 2 + + return f"{bid_sum} {pair.counter} - {mid} - {ask_sum} {pair.base}" + +def format_state(pair: Pair, state: MarketState): + orderbook = format_orderbook(pair, state.asks, state.bids) + return f"[{state.sequence}] {orderbook}" diff --git a/luno_python/client.py b/luno_python/client.py index 5798ee1..cb08f64 100644 --- a/luno_python/client.py +++ b/luno_python/client.py @@ -13,9 +13,9 @@ class Client(BaseClient): c = Client(api_key_id='key_id', api_key_secret='key_secret') try: res = c.get_ticker(pair='XBTZAR') - print res + print(res) except Exception as e: - print e + print(e) """ def cancel_withdrawal(self, id): diff --git a/luno_python/stream_client.py b/luno_python/stream_client.py new file mode 100644 index 0000000..a59a1c5 --- /dev/null +++ b/luno_python/stream_client.py @@ -0,0 +1,250 @@ +""" +The stream client can be used to receive live updates to the orderbook. +It also maintains a representation of the Luno orderbook correctly updated for each event. + +For example usage see examples/stream.py +""" + +import asyncio +from decimal import Decimal, InvalidOperation +import json +from typing import Callable, Dict, List, Optional +import websockets + +from .api_types import DEC_0, Order, MarketState, Pair + +DEFAULT_URL = "wss://ws.luno.com" + +StateUpdate = Callable[[Pair, MarketState, Optional[dict]], None] + +class OutOfOrderMessageException(Exception): + pass + +class MarketInitialisationException(Exception): + pass + + +_FOUR_CHAR_CURRENCIES = {"USDC", "USDT"} + +def _parse_pair(pair: str) -> 'Pair': + """Parse a pair string like "XBTZAR" or "USDCZAR" into a Pair(base, counter). + + Uses known 4-character currency codes to disambiguate variable-length pairs. + """ + pair = pair.upper() + if not pair.isalpha() or len(pair) < 6: + raise ValueError(f"Invalid pair: '{pair}'.") + if pair[:4] in _FOUR_CHAR_CURRENCIES and len(pair[4:]) in (3, 4): + return Pair(pair[:4], pair[4:]) + if pair[-4:] in _FOUR_CHAR_CURRENCIES and len(pair[:-4]) in (3, 4): + return Pair(pair[:-4], pair[-4:]) + if len(pair) == 6: + return Pair(pair[:3], pair[3:]) + raise ValueError(f"Invalid pair: '{pair}'.") + + +def _flatten_orders(orders, reverse): + return sorted(orders.values(), key=lambda o: o.price, reverse=reverse) + + +def _decrement_trade(orders: Dict[str, Order], order_id: str, volume: Decimal): + order = orders.pop(order_id, None) + if order is None: + return + + new_order = order._replace(volume=order.volume - volume) + if new_order.volume > DEC_0: + orders[order_id] = new_order + + +class _MarketStreamState: + def __init__(self, first: dict): + if first is None: + raise MarketInitialisationException("Unable to initialise market state from an empty message") + + def conv_message(msg): + return Order( + msg['id'], + Decimal(msg['price']), + Decimal(msg['volume']), + ) + + try: + bids = [conv_message(m) for m in first['bids']] + asks = [conv_message(m) for m in first['asks']] + self._bids = {b.order_id: b for b in bids} + self._asks = {a.order_id: a for a in asks} + self._sequence = first['sequence'] + self._status = first['status'] + except (KeyError, TypeError, InvalidOperation) as exc: + raise MarketInitialisationException( + "Unable to initialise market state from the initial snapshot" + ) from exc + self._trades = [] + + def get_asks(self): + return _flatten_orders(self._asks, False) + + def get_bids(self): + return _flatten_orders(self._bids, True) + + def get_status(self): + return self._status + + def get_sequence(self): + return self._sequence + + def get_snapshot(self): + return MarketState( + sequence=self.get_sequence(), + asks=self.get_asks(), + bids=self.get_bids(), + status=self.get_status(), + ) + + def process_update(self, update: dict): + if update is None: + return + + seq = update['sequence'] + if int(seq) != int(self._sequence)+1: + raise OutOfOrderMessageException() + + trades = update.get('trade_updates') + if trades: + self._process_trades(trades) + + create = update.get('create_update') + if create: + self._process_create(create) + + delete_upd = update.get('delete_update') + if delete_upd: + self._process_delete(delete_upd) + + status_upd = update.get('status_update') + if status_upd: + self._process_status(status_upd) + + self._sequence = seq + + def _process_trades(self, trade_updates: List[dict]): + for t in trade_updates: + maker_id = t['maker_order_id'] + volume = Decimal(t['base']) + + _decrement_trade(self._asks, maker_id, volume) + _decrement_trade(self._bids, maker_id, volume) + + def _process_create(self, create_update: dict): + o = Order( + create_update['order_id'], + Decimal(create_update['price']), + Decimal(create_update['volume']), + ) + if create_update['type'] == "ASK": + self._asks[o.order_id] = o + elif create_update['type'] == "BID": + self._bids[o.order_id] = o + + def _process_delete(self, delete_update: dict): + order_id = delete_update['order_id'] + self._asks.pop(order_id, None) + self._bids.pop(order_id, None) + + def _process_status(self, status_update: dict): + self._status = status_update['status'] + + +async def _read_from_websocket(ws, pair: Pair, update_f: StateUpdate): + state = None + is_first = True + + async for message in ws: + try: + body = json.loads(message) + except ValueError as e: + raise ValueError(f"Invalid JSON received: {message}") from e + + if body == "": # Empty update, used as keepalive + body = None + + if body is None: + continue + + if is_first: + is_first = False + state = _MarketStreamState(body) + update_f(pair, state.get_snapshot(), None) + continue + + #could raise OutOfOrderMessageException + state.process_update(body) + + update_f(pair, state.get_snapshot(), body) + + if is_first: + raise MarketInitialisationException( + "Stream closed before the initial snapshot was received" + ) + + +async def _write_keep_alive(ws): + while True: + await ws.send('""') + await asyncio.sleep(60) + + +async def stream_market( + pair: str, + api_key_id: str, + api_key_secret: str, + update_callback: StateUpdate, + base_url: str = DEFAULT_URL, +): + """Opens a stream to /api/1/stream/... + + Stream orderbook information and maintain an orderbook state. + + :param pair: str Currency pair code (for example, "XBTZAR"). + :param api_key_id: str + :param api_key_secret: str + :param update_callback: a StateUpdate function that will be called with new updates. + """ + p = _parse_pair(pair) + url = '/'.join([base_url, 'api/1/stream', p.base + p.counter]) + + async with websockets.connect( + url, + origin='http://localhost/', + ping_interval=None, + max_size=2**21, + ) as websocket: + + auth = json.dumps({ + 'api_key_id': api_key_id, + 'api_key_secret': api_key_secret, + }) + await websocket.send(auth) + + reader = asyncio.create_task(_read_from_websocket(websocket, p, update_callback)) + keepalive = asyncio.create_task(_write_keep_alive(websocket)) + tasks = (reader, keepalive) + + try: + done, pending = await asyncio.wait( + set(tasks), + return_when=asyncio.FIRST_COMPLETED, + ) + for task in pending: + task.cancel() + await asyncio.gather(*pending, return_exceptions=True) + if reader in done: + reader.result() + if keepalive in done: + keepalive.result() + finally: + for task in tasks: + if not task.done(): + task.cancel() + await asyncio.gather(*tasks, return_exceptions=True) \ No newline at end of file