Skip to content

Commit

Permalink
Allow weakref linking to market caches
Browse files Browse the repository at this point in the history
Improved flaky stream test
  • Loading branch information
ml31415 committed May 6, 2024
1 parent ed47023 commit 0cf461f
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 21 deletions.
43 changes: 34 additions & 9 deletions betfair_parser/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,23 +157,42 @@ def clear(self) -> None:
raise NotImplementedError()


class MarketCache(ChangeCache):
class _DefaultDict(defaultdict):
"""
The caches for single markets are supposed to be referenceable from outside the
cache via weakrefs. So if the cache gets updated, it doesn't leave data pending.
As defaultdict itself can't be referenced with a weakref, let's subclass it.
"""

def __init__(self, **kwargs):
super().__init__(type(self).default_factory, **kwargs)

default_factory = dict


class MarketOrderBook(_DefaultDict):
"""Order book for a single market, collecting a bunch of RunnerOrderBooks."""

default_factory = RunnerOrderBook # type: ignore[assignment]


class MarketSubscriptionCache(ChangeCache):
"""
Orderbook for all markets of a market change stream.
Market subscriptions are always in the underlying exchange currency - GBP. The default roll-up for GBP
is £1 for batb / batl and bdatb / bdatl, This means that stakes of less than £1 (or currency
equivalent) are rolled up to the next available price on the odds ladder. For atb / atl there is
no roll-up. Available volume is displayed at all prices including those with less than £2 available.
"""

def __init__(self):
self.order_book: defaultdict[str, defaultdict[int, RunnerOrderBook]] = defaultdict(
lambda: defaultdict(RunnerOrderBook)
)
self.market_definitions: dict[str, MarketDefinition] = {}
self.order_book: defaultdict[str, MarketOrderBook] = defaultdict(MarketOrderBook)
self.definitions: dict[str, MarketDefinition] = {}

def clear(self) -> None:
self.order_book.clear()
self.market_definitions.clear()
self.definitions.clear()

def update(self, mcm: MCM) -> None:
self.update_meta(mcm)
Expand All @@ -186,7 +205,7 @@ def update(self, mcm: MCM) -> None:
if mc.img:
self.order_book.pop(mc.id, None)
if mc.market_definition:
self.market_definitions[mc.id] = mc.market_definition
self.definitions[mc.id] = mc.market_definition
if not mc.rc:
continue
for rc in mc.rc:
Expand Down Expand Up @@ -235,13 +254,19 @@ def update(self, orc: OrderRunnerChange) -> None:
ladder_update_mo(self.matched_backs, orc.ml)


class OrderCache(ChangeCache):
class MarketOrders(_DefaultDict):
"""All orders for a single market, collecting a bunch of RunnerOrders."""

default_factory = RunnerOrders # type: ignore[assignment]


class OrderSubscriptionCache(ChangeCache):
"""
Order subscriptions are provided in the currency of the account that the orders are placed in.
"""

def __init__(self):
self.orders: defaultdict[str, defaultdict[int, RunnerOrders]] = defaultdict(lambda: defaultdict(RunnerOrders))
self.orders: defaultdict[str, MarketOrders] = defaultdict(MarketOrders)

def clear(self) -> None:
self.orders.clear()
Expand Down
8 changes: 4 additions & 4 deletions betfair_parser/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from collections.abc import AsyncGenerator, Callable, Iterable
from typing import Any

from betfair_parser.cache import MarketCache, OrderCache
from betfair_parser.cache import MarketSubscriptionCache, OrderSubscriptionCache
from betfair_parser.exceptions import StreamError
from betfair_parser.spec.common import encode
from betfair_parser.spec.streaming import (
Expand Down Expand Up @@ -146,7 +146,7 @@ class StreamReader:
"""Read exchange stream data into a separate cache for each subscription."""

def __init__(self, app_key, token) -> None:
self.caches: dict[StreamRef, MarketCache | OrderCache] = {}
self.caches: dict[StreamRef, MarketSubscriptionCache | OrderSubscriptionCache] = {}
self.esm = ExchangeStream(app_key, token)

def handle_change_message(self, msg: ChangeMessageType) -> ChangeMessageType:
Expand All @@ -155,9 +155,9 @@ def handle_change_message(self, msg: ChangeMessageType) -> ChangeMessageType:

def subscribe(self, subscription: SubscriptionType) -> bytes:
if isinstance(subscription, MarketSubscription):
self.caches[subscription.id] = MarketCache()
self.caches[subscription.id] = MarketSubscriptionCache()
elif isinstance(subscription, OrderSubscription):
self.caches[subscription.id] = OrderCache()
self.caches[subscription.id] = OrderSubscriptionCache()
else:
raise TypeError("Invalid subscription type")
return self.esm.subscribe(subscription, self.handle_change_message)
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import pytest

from betfair_parser.cache import MarketCache, RunnerOrderBook
from betfair_parser.cache import MarketSubscriptionCache, RunnerOrderBook
from betfair_parser.spec import accounts, betting
from betfair_parser.spec.common import decode
from betfair_parser.spec.streaming import MCM, StreamRequestType, StreamResponseType, stream_decode
Expand Down Expand Up @@ -83,7 +83,7 @@ def test_responses(path):

@pytest.mark.parametrize("path", sorted((RESOURCES_DIR / "data").glob("**/*.bz2")), ids=id_from_path)
def test_archive(path):
mc = MarketCache()
mc = MarketSubscriptionCache()
i = 0
for i, line in enumerate(bz2.open(path), start=1):
msg = stream_decode(line)
Expand Down
10 changes: 8 additions & 2 deletions tests/integration/test_stream.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import datetime

import pytest
from requests import Session # alternatively use httpx.Client

Expand Down Expand Up @@ -143,15 +145,19 @@ def test_stream_reader(session, iterations=15):

assert sr.caches[ORDER_STREAM_ID].orders is not None # type: ignore[union-attr]

market_definitions: dict[str, MarketDefinition] = sr.caches[MARKET_STREAM_ID].market_definitions # type: ignore[union-attr]
market_definitions: dict[str, MarketDefinition] = sr.caches[MARKET_STREAM_ID].definitions # type: ignore[union-attr]
assert len(market_definitions) > 20
assert all(isinstance(key, str) for key in market_definitions)
assert all(isinstance(md, MarketDefinition) for md in market_definitions.values())

now = datetime.datetime.utcnow().astimezone(datetime.UTC)
order_book = sr.caches[MARKET_STREAM_ID].order_book # type: ignore[union-attr]
assert len(order_book) == len(market_definitions)
assert all(isinstance(key, str) for key in order_book)
for market_order_book in order_book.values():
for market_id, market_order_book in order_book.items():
if (market_definitions[market_id].suspend_time - now).seconds > 6 * 60 * 60: # 6h
# data further in the future is likely to be incomplete and leads to errors
continue
for runner_order_book in market_order_book.values():
assert isinstance(runner_order_book, RunnerOrderBook)
if not runner_order_book.total_volume:
Expand Down
31 changes: 27 additions & 4 deletions tests/unit/test_cache.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
from betfair_parser.cache import LPV, MarketCache, OrderCache, ladder_update_lpv
import weakref

import pytest

from betfair_parser.cache import (
LPV,
MarketOrderBook,
MarketOrders,
MarketSubscriptionCache,
OrderSubscriptionCache,
ladder_update_lpv,
)
from betfair_parser.spec.common import decode, encode
from betfair_parser.spec.streaming import MCM, stream_decode
from tests.resources import RESOURCES_DIR
Expand All @@ -7,7 +18,7 @@
def test_runner_order_book_repr():
raw = (RESOURCES_DIR / "responses" / "streaming" / "mcm_sub_image_no_market_def.json").read_bytes()
mcm: MCM = stream_decode(raw) # type: ignore[assignment]
cache = MarketCache()
cache = MarketSubscriptionCache()
cache.update(mcm)
rob = cache.order_book["1.180737193"][25327214]
repr_str = repr(rob)
Expand Down Expand Up @@ -74,8 +85,8 @@ def test_batb_cache():


def test_order_cache_runner_removal():
"""OrderCache example including reduction factor from the documentation."""
cache = OrderCache()
"""OrderSubscriptionCache example including reduction factor from the documentation."""
cache = OrderSubscriptionCache()

# bet gets placed
cache.update(stream_decode(OCMS_SAMPLE[0])) # type: ignore[arg-type]
Expand Down Expand Up @@ -110,3 +121,15 @@ def test_order_cache_runner_removal():
assert order.average_price_matched == 9.47
assert ro.matched_backs[9.47] == 2
assert len(ro.matched_backs) == 1


@pytest.mark.parametrize("dict_type", (MarketOrders, MarketOrderBook))
def test_market_caches(dict_type):
cache = dict_type()
runner_cache = cache[123]
r = weakref.ref(cache)
assert type(runner_cache).__name__ == type(cache).__name__.replace("Market", "Runner")
assert r() is cache
del runner_cache
del cache
assert r() is None

0 comments on commit 0cf461f

Please sign in to comment.