From 0cf461f21f6a50f5aa484ba2bdb6dcc9a2c1fbf5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20L=C3=B6ffler?= Date: Mon, 6 May 2024 15:26:41 +0300 Subject: [PATCH] Allow weakref linking to market caches Improved flaky stream test --- betfair_parser/cache.py | 43 +++++++++++++++++++++------ betfair_parser/stream.py | 8 ++--- tests/integration/test_integration.py | 4 +-- tests/integration/test_stream.py | 10 +++++-- tests/unit/test_cache.py | 31 ++++++++++++++++--- 5 files changed, 75 insertions(+), 21 deletions(-) diff --git a/betfair_parser/cache.py b/betfair_parser/cache.py index 1c2ab00..218eea2 100644 --- a/betfair_parser/cache.py +++ b/betfair_parser/cache.py @@ -157,8 +157,29 @@ def clear(self) -> None: raise NotImplementedError() -class MarketCache(ChangeCache): +class _DefaultDict(defaultdict): """ + The caches for single markets are supposed to be referenceable from outside the + cache via weakrefs. So if the cache gets updated, it doesn't leave data pending. + As defaultdict itself can't be referenced with a weakref, let's subclass it. + """ + + def __init__(self, **kwargs): + super().__init__(type(self).default_factory, **kwargs) + + default_factory = dict + + +class MarketOrderBook(_DefaultDict): + """Order book for a single market, collecting a bunch of RunnerOrderBooks.""" + + default_factory = RunnerOrderBook # type: ignore[assignment] + + +class MarketSubscriptionCache(ChangeCache): + """ + Orderbook for all markets of a market change stream. + Market subscriptions are always in the underlying exchange currency - GBP. The default roll-up for GBP is £1 for batb / batl and bdatb / bdatl, This means that stakes of less than £1 (or currency equivalent) are rolled up to the next available price on the odds ladder. For atb / atl there is @@ -166,14 +187,12 @@ class MarketCache(ChangeCache): """ def __init__(self): - self.order_book: defaultdict[str, defaultdict[int, RunnerOrderBook]] = defaultdict( - lambda: defaultdict(RunnerOrderBook) - ) - self.market_definitions: dict[str, MarketDefinition] = {} + self.order_book: defaultdict[str, MarketOrderBook] = defaultdict(MarketOrderBook) + self.definitions: dict[str, MarketDefinition] = {} def clear(self) -> None: self.order_book.clear() - self.market_definitions.clear() + self.definitions.clear() def update(self, mcm: MCM) -> None: self.update_meta(mcm) @@ -186,7 +205,7 @@ def update(self, mcm: MCM) -> None: if mc.img: self.order_book.pop(mc.id, None) if mc.market_definition: - self.market_definitions[mc.id] = mc.market_definition + self.definitions[mc.id] = mc.market_definition if not mc.rc: continue for rc in mc.rc: @@ -235,13 +254,19 @@ def update(self, orc: OrderRunnerChange) -> None: ladder_update_mo(self.matched_backs, orc.ml) -class OrderCache(ChangeCache): +class MarketOrders(_DefaultDict): + """All orders for a single market, collecting a bunch of RunnerOrders.""" + + default_factory = RunnerOrders # type: ignore[assignment] + + +class OrderSubscriptionCache(ChangeCache): """ Order subscriptions are provided in the currency of the account that the orders are placed in. """ def __init__(self): - self.orders: defaultdict[str, defaultdict[int, RunnerOrders]] = defaultdict(lambda: defaultdict(RunnerOrders)) + self.orders: defaultdict[str, MarketOrders] = defaultdict(MarketOrders) def clear(self) -> None: self.orders.clear() diff --git a/betfair_parser/stream.py b/betfair_parser/stream.py index 0a8e0b2..9104f51 100644 --- a/betfair_parser/stream.py +++ b/betfair_parser/stream.py @@ -8,7 +8,7 @@ from collections.abc import AsyncGenerator, Callable, Iterable from typing import Any -from betfair_parser.cache import MarketCache, OrderCache +from betfair_parser.cache import MarketSubscriptionCache, OrderSubscriptionCache from betfair_parser.exceptions import StreamError from betfair_parser.spec.common import encode from betfair_parser.spec.streaming import ( @@ -146,7 +146,7 @@ class StreamReader: """Read exchange stream data into a separate cache for each subscription.""" def __init__(self, app_key, token) -> None: - self.caches: dict[StreamRef, MarketCache | OrderCache] = {} + self.caches: dict[StreamRef, MarketSubscriptionCache | OrderSubscriptionCache] = {} self.esm = ExchangeStream(app_key, token) def handle_change_message(self, msg: ChangeMessageType) -> ChangeMessageType: @@ -155,9 +155,9 @@ def handle_change_message(self, msg: ChangeMessageType) -> ChangeMessageType: def subscribe(self, subscription: SubscriptionType) -> bytes: if isinstance(subscription, MarketSubscription): - self.caches[subscription.id] = MarketCache() + self.caches[subscription.id] = MarketSubscriptionCache() elif isinstance(subscription, OrderSubscription): - self.caches[subscription.id] = OrderCache() + self.caches[subscription.id] = OrderSubscriptionCache() else: raise TypeError("Invalid subscription type") return self.esm.subscribe(subscription, self.handle_change_message) diff --git a/tests/integration/test_integration.py b/tests/integration/test_integration.py index 46f6431..20f7da1 100644 --- a/tests/integration/test_integration.py +++ b/tests/integration/test_integration.py @@ -4,7 +4,7 @@ import pytest -from betfair_parser.cache import MarketCache, RunnerOrderBook +from betfair_parser.cache import MarketSubscriptionCache, RunnerOrderBook from betfair_parser.spec import accounts, betting from betfair_parser.spec.common import decode from betfair_parser.spec.streaming import MCM, StreamRequestType, StreamResponseType, stream_decode @@ -83,7 +83,7 @@ def test_responses(path): @pytest.mark.parametrize("path", sorted((RESOURCES_DIR / "data").glob("**/*.bz2")), ids=id_from_path) def test_archive(path): - mc = MarketCache() + mc = MarketSubscriptionCache() i = 0 for i, line in enumerate(bz2.open(path), start=1): msg = stream_decode(line) diff --git a/tests/integration/test_stream.py b/tests/integration/test_stream.py index 3b99b59..0e0a1e6 100644 --- a/tests/integration/test_stream.py +++ b/tests/integration/test_stream.py @@ -1,3 +1,5 @@ +import datetime + import pytest from requests import Session # alternatively use httpx.Client @@ -143,15 +145,19 @@ def test_stream_reader(session, iterations=15): assert sr.caches[ORDER_STREAM_ID].orders is not None # type: ignore[union-attr] - market_definitions: dict[str, MarketDefinition] = sr.caches[MARKET_STREAM_ID].market_definitions # type: ignore[union-attr] + market_definitions: dict[str, MarketDefinition] = sr.caches[MARKET_STREAM_ID].definitions # type: ignore[union-attr] assert len(market_definitions) > 20 assert all(isinstance(key, str) for key in market_definitions) assert all(isinstance(md, MarketDefinition) for md in market_definitions.values()) + now = datetime.datetime.utcnow().astimezone(datetime.UTC) order_book = sr.caches[MARKET_STREAM_ID].order_book # type: ignore[union-attr] assert len(order_book) == len(market_definitions) assert all(isinstance(key, str) for key in order_book) - for market_order_book in order_book.values(): + for market_id, market_order_book in order_book.items(): + if (market_definitions[market_id].suspend_time - now).seconds > 6 * 60 * 60: # 6h + # data further in the future is likely to be incomplete and leads to errors + continue for runner_order_book in market_order_book.values(): assert isinstance(runner_order_book, RunnerOrderBook) if not runner_order_book.total_volume: diff --git a/tests/unit/test_cache.py b/tests/unit/test_cache.py index 64a0512..d0c0868 100644 --- a/tests/unit/test_cache.py +++ b/tests/unit/test_cache.py @@ -1,4 +1,15 @@ -from betfair_parser.cache import LPV, MarketCache, OrderCache, ladder_update_lpv +import weakref + +import pytest + +from betfair_parser.cache import ( + LPV, + MarketOrderBook, + MarketOrders, + MarketSubscriptionCache, + OrderSubscriptionCache, + ladder_update_lpv, +) from betfair_parser.spec.common import decode, encode from betfair_parser.spec.streaming import MCM, stream_decode from tests.resources import RESOURCES_DIR @@ -7,7 +18,7 @@ def test_runner_order_book_repr(): raw = (RESOURCES_DIR / "responses" / "streaming" / "mcm_sub_image_no_market_def.json").read_bytes() mcm: MCM = stream_decode(raw) # type: ignore[assignment] - cache = MarketCache() + cache = MarketSubscriptionCache() cache.update(mcm) rob = cache.order_book["1.180737193"][25327214] repr_str = repr(rob) @@ -74,8 +85,8 @@ def test_batb_cache(): def test_order_cache_runner_removal(): - """OrderCache example including reduction factor from the documentation.""" - cache = OrderCache() + """OrderSubscriptionCache example including reduction factor from the documentation.""" + cache = OrderSubscriptionCache() # bet gets placed cache.update(stream_decode(OCMS_SAMPLE[0])) # type: ignore[arg-type] @@ -110,3 +121,15 @@ def test_order_cache_runner_removal(): assert order.average_price_matched == 9.47 assert ro.matched_backs[9.47] == 2 assert len(ro.matched_backs) == 1 + + +@pytest.mark.parametrize("dict_type", (MarketOrders, MarketOrderBook)) +def test_market_caches(dict_type): + cache = dict_type() + runner_cache = cache[123] + r = weakref.ref(cache) + assert type(runner_cache).__name__ == type(cache).__name__.replace("Market", "Runner") + assert r() is cache + del runner_cache + del cache + assert r() is None