Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ from luno_python.client import Client
c = Client(api_key_id='key_id', api_key_secret='key_secret')
try:
res = c.get_ticker(pair='XBTZAR')
print res
print(res)
except Exception as e:
print e
print(e)
```

### License
Expand Down
21 changes: 21 additions & 0 deletions examples/stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import asyncio

from luno_python.stream_client import stream_market
from luno_python.api_types import format_state

def handle_update(pair, state, update):
print(format_state(pair, state))
if update is not None:
print(update)

async def main():
await stream_market(
pair="XBTZAR",
api_key_id="", # API Key goes here
api_key_secret="", # and API Secret goes here
update_callback=handle_update,
)


if __name__ == "__main__":
asyncio.run(main())
26 changes: 26 additions & 0 deletions luno_python/api_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from collections import namedtuple
from decimal import Decimal
from typing import List

DEC_0 = Decimal("0")

Order = namedtuple("Order", "order_id price volume")

MarketState = namedtuple("MarketState", "sequence asks bids status")

Pair = namedtuple("Pair", "base counter")

def format_orderbook(pair: Pair, asks: List[Order], bids: List[Order]):
if not bids or not asks:
return "Empty Orderbook"

bid_sum = sum((o.price * o.volume for o in bids), DEC_0)
ask_sum = sum((o.volume for o in asks), DEC_0)

mid = (asks[0].price + bids[0].price) / 2

return f"{bid_sum} {pair.counter} - {mid} - {ask_sum} {pair.base}"

def format_state(pair: Pair, state: MarketState):
orderbook = format_orderbook(pair, state.asks, state.bids)
return f"[{state.sequence}] {orderbook}"
4 changes: 2 additions & 2 deletions luno_python/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ class Client(BaseClient):
c = Client(api_key_id='key_id', api_key_secret='key_secret')
try:
res = c.get_ticker(pair='XBTZAR')
print res
print(res)
except Exception as e:
print e
print(e)
"""

def cancel_withdrawal(self, id):
Expand Down
250 changes: 250 additions & 0 deletions luno_python/stream_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
"""
The stream client can be used to receive live updates to the orderbook.
It also maintains a representation of the Luno orderbook correctly updated for each event.

For example usage see examples/stream.py
"""

import asyncio
from decimal import Decimal, InvalidOperation
import json
from typing import Callable, Dict, List, Optional
import websockets

from .api_types import DEC_0, Order, MarketState, Pair

DEFAULT_URL = "wss://ws.luno.com"

StateUpdate = Callable[[Pair, MarketState, Optional[dict]], None]

class OutOfOrderMessageException(Exception):
pass

class MarketInitialisationException(Exception):
pass


_FOUR_CHAR_CURRENCIES = {"USDC", "USDT"}

def _parse_pair(pair: str) -> 'Pair':
"""Parse a pair string like "XBTZAR" or "USDCZAR" into a Pair(base, counter).

Uses known 4-character currency codes to disambiguate variable-length pairs.
"""
pair = pair.upper()
if not pair.isalpha() or len(pair) < 6:
raise ValueError(f"Invalid pair: '{pair}'.")
if pair[:4] in _FOUR_CHAR_CURRENCIES and len(pair[4:]) in (3, 4):
return Pair(pair[:4], pair[4:])
if pair[-4:] in _FOUR_CHAR_CURRENCIES and len(pair[:-4]) in (3, 4):
return Pair(pair[:-4], pair[-4:])
if len(pair) == 6:
return Pair(pair[:3], pair[3:])
raise ValueError(f"Invalid pair: '{pair}'.")


def _flatten_orders(orders, reverse):
return sorted(orders.values(), key=lambda o: o.price, reverse=reverse)


def _decrement_trade(orders: Dict[str, Order], order_id: str, volume: Decimal):
order = orders.pop(order_id, None)
if order is None:
return

new_order = order._replace(volume=order.volume - volume)
if new_order.volume > DEC_0:
orders[order_id] = new_order


class _MarketStreamState:
def __init__(self, first: dict):
if first is None:
raise MarketInitialisationException("Unable to initialise market state from an empty message")

def conv_message(msg):
return Order(
msg['id'],
Decimal(msg['price']),
Decimal(msg['volume']),
)

try:
bids = [conv_message(m) for m in first['bids']]
asks = [conv_message(m) for m in first['asks']]
self._bids = {b.order_id: b for b in bids}
self._asks = {a.order_id: a for a in asks}
self._sequence = first['sequence']
self._status = first['status']
except (KeyError, TypeError, InvalidOperation) as exc:
raise MarketInitialisationException(
"Unable to initialise market state from the initial snapshot"
) from exc
self._trades = []

def get_asks(self):
return _flatten_orders(self._asks, False)

def get_bids(self):
return _flatten_orders(self._bids, True)

def get_status(self):
return self._status

def get_sequence(self):
return self._sequence

def get_snapshot(self):
return MarketState(
sequence=self.get_sequence(),
asks=self.get_asks(),
bids=self.get_bids(),
status=self.get_status(),
)

def process_update(self, update: dict):
if update is None:
return

seq = update['sequence']
if int(seq) != int(self._sequence)+1:
raise OutOfOrderMessageException()

trades = update.get('trade_updates')
if trades:
self._process_trades(trades)

create = update.get('create_update')
if create:
self._process_create(create)

delete_upd = update.get('delete_update')
if delete_upd:
self._process_delete(delete_upd)

status_upd = update.get('status_update')
if status_upd:
self._process_status(status_upd)

self._sequence = seq

def _process_trades(self, trade_updates: List[dict]):
for t in trade_updates:
maker_id = t['maker_order_id']
volume = Decimal(t['base'])

_decrement_trade(self._asks, maker_id, volume)
_decrement_trade(self._bids, maker_id, volume)

def _process_create(self, create_update: dict):
o = Order(
create_update['order_id'],
Decimal(create_update['price']),
Decimal(create_update['volume']),
)
if create_update['type'] == "ASK":
self._asks[o.order_id] = o
elif create_update['type'] == "BID":
self._bids[o.order_id] = o

def _process_delete(self, delete_update: dict):
order_id = delete_update['order_id']
self._asks.pop(order_id, None)
self._bids.pop(order_id, None)

def _process_status(self, status_update: dict):
self._status = status_update['status']


async def _read_from_websocket(ws, pair: Pair, update_f: StateUpdate):
state = None
is_first = True

async for message in ws:
try:
body = json.loads(message)
except ValueError as e:
raise ValueError(f"Invalid JSON received: {message}") from e

if body == "": # Empty update, used as keepalive
body = None

if body is None:
continue

if is_first:
is_first = False
state = _MarketStreamState(body)
update_f(pair, state.get_snapshot(), None)
continue

#could raise OutOfOrderMessageException
state.process_update(body)

update_f(pair, state.get_snapshot(), body)

if is_first:
raise MarketInitialisationException(
"Stream closed before the initial snapshot was received"
)


async def _write_keep_alive(ws):
while True:
await ws.send('""')
await asyncio.sleep(60)


async def stream_market(
pair: str,
api_key_id: str,
api_key_secret: str,
update_callback: StateUpdate,
base_url: str = DEFAULT_URL,
):
"""Opens a stream to /api/1/stream/...

Stream orderbook information and maintain an orderbook state.

:param pair: str Currency pair code (for example, "XBTZAR").
:param api_key_id: str
:param api_key_secret: str
:param update_callback: a StateUpdate function that will be called with new updates.
"""
p = _parse_pair(pair)
url = '/'.join([base_url, 'api/1/stream', p.base + p.counter])

async with websockets.connect(
url,
origin='http://localhost/',
ping_interval=None,
max_size=2**21,
) as websocket:

auth = json.dumps({
'api_key_id': api_key_id,
'api_key_secret': api_key_secret,
})
await websocket.send(auth)

reader = asyncio.create_task(_read_from_websocket(websocket, p, update_callback))
keepalive = asyncio.create_task(_write_keep_alive(websocket))
tasks = (reader, keepalive)

try:
done, pending = await asyncio.wait(
set(tasks),
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
await asyncio.gather(*pending, return_exceptions=True)
if reader in done:
reader.result()
if keepalive in done:
keepalive.result()
finally:
for task in tasks:
if not task.done():
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)