diff --git a/betfair_parser/cache.py b/betfair_parser/cache.py index 6417421..b275d97 100644 --- a/betfair_parser/cache.py +++ b/betfair_parser/cache.py @@ -18,6 +18,7 @@ OCM, PV, ChangeType, + MarketDefinition, MatchedOrder, Order, OrderRunnerChange, @@ -154,7 +155,7 @@ def update_meta(self, msg: Union[MCM, OCM]) -> None: self.clear() def clear(self) -> None: - return + raise NotImplementedError() class MarketCache(ChangeCache): @@ -166,8 +167,10 @@ class MarketCache(ChangeCache): """ def __init__(self): - self.order_book: defaultdict[str, defaultdict] = defaultdict(lambda: defaultdict(RunnerOrderBook)) - self.market_definitions = {} + self.order_book: defaultdict[str, defaultdict[int, RunnerOrderBook]] = defaultdict( + lambda: defaultdict(RunnerOrderBook) + ) + self.market_definitions: dict[str, MarketDefinition] = {} def clear(self) -> None: self.order_book.clear() @@ -177,6 +180,9 @@ def update(self, mcm: MCM) -> None: self.update_meta(mcm) if mcm.is_heartbeat: return + if not mcm.mc: + return + for mc in mcm.mc: if mc.img: self.order_book.pop(mc.id, None) @@ -236,7 +242,7 @@ class OrderCache(ChangeCache): """ def __init__(self): - self.orders: defaultdict[str, defaultdict] = defaultdict(lambda: defaultdict(RunnerOrders)) + self.orders: defaultdict[str, defaultdict[int, RunnerOrders]] = defaultdict(lambda: defaultdict(RunnerOrders)) def clear(self) -> None: self.orders.clear() @@ -245,6 +251,8 @@ def update(self, ocm: OCM) -> None: self.update_meta(ocm) if ocm.is_heartbeat: return + if not ocm.oc: + return for oc in ocm.oc: if oc.full_image: diff --git a/betfair_parser/spec/streaming/__init__.py b/betfair_parser/spec/streaming/__init__.py index 55baf2f..4692a18 100644 --- a/betfair_parser/spec/streaming/__init__.py +++ b/betfair_parser/spec/streaming/__init__.py @@ -48,20 +48,23 @@ StartingPriceBack, StartingPriceLay, StrategyMatchChange, + StreamRef, Trade, ) -STREAM_REQUEST = Union[Authentication, MarketSubscription, OrderSubscription, Heartbeat] -STREAM_RESPONSE = Union[Connection, Status, MCM, OCM] -CHANGE_MESSAGE = Union[MCM, OCM] -_STREAM_MESSAGES = Union[STREAM_RESPONSE, list[STREAM_RESPONSE], STREAM_REQUEST] -_STREAM_DECODER = Decoder(_STREAM_MESSAGES, strict=False) +StreamRequestType = Union[Authentication, MarketSubscription, OrderSubscription, Heartbeat] +StreamResponseType = Union[Connection, Status, MCM, OCM] +SubscriptionType = Union[MarketSubscription, OrderSubscription] +ChangeMessageType = Union[MCM, OCM] +StreamMessageType = Union[StreamResponseType, list[StreamResponseType], StreamRequestType] +_STREAM_DECODER = Decoder(StreamMessageType, strict=False) -def stream_decode(raw: Union[str, bytes]): + +def stream_decode(raw: Union[str, bytes]) -> StreamMessageType: return _STREAM_DECODER.decode(raw) -def stream_decode_lines(raw: Union[str, bytes]): +def stream_decode_lines(raw: Union[str, bytes]) -> list[StreamMessageType]: return _STREAM_DECODER.decode_lines(raw) diff --git a/betfair_parser/spec/streaming/messages.py b/betfair_parser/spec/streaming/messages.py index d425f8d..bb90d16 100644 --- a/betfair_parser/spec/streaming/messages.py +++ b/betfair_parser/spec/streaming/messages.py @@ -2,9 +2,9 @@ Definition of the betfair streaming API messages as defined in: - https://docs.developer.betfair.com/display/1smk3cen4v3lu3yomq5qye0ni/Exchange+Stream+API - https://github.com/betfair/stream-api-sample-code/blob/master/ESASwaggerSchema.json - """ +""" -from typing import Literal, Optional, Union +from typing import Literal, Optional from betfair_parser.spec.common import BaseMessage, first_lower from betfair_parser.spec.streaming.enums import ChangeType, SegmentType, StatusErrorCode @@ -14,19 +14,20 @@ MarketFilter, OrderFilter, OrderMarketChange, + StreamRef, ) class _StreamRequest(BaseMessage, tag_field="op", tag=first_lower, frozen=True): """Common parent class for any stream request.""" - id: Optional[Union[int, str]] = None # Client generated unique id to link request with response (like json rpc) + id: Optional[StreamRef] = None # Client generated unique id to link request with response (like json rpc) class _StreamResponse(BaseMessage, tag_field="op", tag=str.lower, frozen=True): """Common parent class for any stream response.""" - id: Optional[Union[int, str]] = None # Client generated unique id to link request with response (like json rpc) + id: Optional[StreamRef] = None # Client generated unique id to link request with response (like json rpc) class Authentication(_StreamRequest, kw_only=True, frozen=True): diff --git a/betfair_parser/spec/streaming/type_definitions.py b/betfair_parser/spec/streaming/type_definitions.py index 398f907..24f5d73 100644 --- a/betfair_parser/spec/streaming/type_definitions.py +++ b/betfair_parser/spec/streaming/type_definitions.py @@ -21,6 +21,8 @@ from betfair_parser.spec.streaming.enums import LapseStatusReasonCode, MarketDataFilterFields, PriceLadderDefinitionType +StreamRef = Union[int, str] + # Request objects diff --git a/betfair_parser/stream.py b/betfair_parser/stream.py index 19ad3b8..2f2e146 100644 --- a/betfair_parser/stream.py +++ b/betfair_parser/stream.py @@ -1,24 +1,120 @@ import asyncio +import io import itertools import socket import ssl import urllib.parse -from typing import Optional +from collections.abc import AsyncGenerator, Iterable +from typing import Any, Callable, Optional, Union -from betfair_parser.exceptions import StreamAuthenticationError, StreamError +from betfair_parser.cache import MarketCache, OrderCache +from betfair_parser.exceptions import StreamError from betfair_parser.spec.common import encode from betfair_parser.spec.streaming import ( - STREAM_REQUEST, - STREAM_RESPONSE, + MCM, + OCM, Authentication, + ChangeMessageType, Connection, Heartbeat, + MarketSubscription, + OrderSubscription, Status, + StreamRef, + StreamResponseType, + SubscriptionType, stream_decode, ) -def create_ssl_socket(hostname, timeout: int = 15) -> ssl.SSLSocket: +LINE_SEPARATOR = b"\r\n" + + +def _default_handler(msg: StreamResponseType) -> StreamResponseType: + return msg + + +class ExchangeStream: + """Handle the byte stream with betfair.""" + + def __init__(self, app_key: str, token: str, id_generator: Optional[Callable] = None) -> None: + self.app_key = app_key + self.token = token + self.subscriptions: dict[StreamRef, SubscriptionType] = {} + self.handlers: dict[StreamRef, Callable] = {} + self._id_generator = id_generator if id_generator is not None else itertools.count(1000) + self._connection_id: Optional[str] = None + + @property + def connection_id(self) -> Optional[str]: + return self._connection_id + + @property + def is_connected(self) -> bool: + return bool(self.connection_id) + + def unique_id(self) -> int: + return next(self._id_generator) # type: ignore + + def handle_connection(self, msg: Connection) -> Connection: + self._connection_id = msg.connection_id + return msg + + def handle_heartbeat(self, msg: Heartbeat) -> Heartbeat: # noqa + return msg + + def handle_status(self, msg: Status) -> Status: + if msg.is_error or msg.connection_closed: + raise StreamError( + f"Connection {self.connection_id} to stream {msg.id} failed: {msg.error_code}: {msg.error_message}" + ) + return msg + + def handle_msg(self, msg: StreamResponseType) -> Any: + # TODO: use match syntax for py3.10+ + if isinstance(msg, Heartbeat): + return self.handle_heartbeat(msg) + if isinstance(msg, Status): + return self.handle_status(msg) + if isinstance(msg, Connection): + return self.handle_connection(msg) + try: + return self.handlers[msg.id](msg) + except KeyError: + raise StreamError(f"Unexpected stream message: {msg}") + except Exception as e: + raise StreamError(f"Handling stream message failed: {msg}") from e + + def authenticate(self) -> bytes: + return encode(Authentication(id=self.unique_id(), app_key=self.app_key, session=self.token)) + LINE_SEPARATOR + + def subscribe(self, subscription: SubscriptionType, handler: Callable = _default_handler) -> Optional[bytes]: + self.subscriptions[subscription.id] = subscription + self.handlers[subscription.id] = handler + if self.connection_id: + # only write something, if the connection is already established + return encode(subscription) + LINE_SEPARATOR + return None + + def connect(self) -> bytes: + auth = self.authenticate() + if not self.subscriptions: + return auth + + # send out subscriptions, that were registered before connecting + subscriptions = LINE_SEPARATOR.join(encode(subscription) for subscription in self.subscriptions.values()) + return auth + subscriptions + LINE_SEPARATOR + + def receive_bytes(self, data: bytes) -> Any: + if not data: + return None + return self.handle_msg(stream_decode(data)) # type: ignore + + def receive(self, stream: io.RawIOBase) -> Any: + return self.receive_bytes(stream.readline()) + + +def create_ssl_socket(hostname, timeout: float | None = None) -> ssl.SSLSocket: """Create ssl socket and set timeout.""" context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.load_default_certs() @@ -28,81 +124,74 @@ def create_ssl_socket(hostname, timeout: int = 15) -> ssl.SSLSocket: return secure_sock -class _BaseStream: - _connection_id: Optional[str] = None +def create_stream_io(endpoint, timeout: float = 15): + """Open an IO stream through a TLS connection to the given endpoint.""" + url = urllib.parse.urlparse(endpoint) + sock = create_ssl_socket(url.hostname, timeout=timeout) + sock.connect((url.hostname, url.port)) + return socket.SocketIO(sock, "rwb") - def __init__(self, endpoint) -> None: - self._endpoint = endpoint - self._id_generator = itertools.count() - @property - def connection_id(self) -> str: - return self._connection_id +def _message_changes(msg: StreamResponseType) -> Optional[list[str]]: + """Return the market IDs of the markets affected by the given message.""" + if isinstance(msg, (Status, Connection)): + return None + if isinstance(msg, MCM) and msg.market_changes: + return [m.id for m in msg.market_changes] + if isinstance(msg, OCM) and msg.order_market_changes: + return [m.id for m in msg.order_market_changes] + return None - def unique_id(self) -> int: - return next(self._id_generator) +class StreamReader: + """Read exchange stream data into a separate cache for each subscription.""" -class Stream(_BaseStream): - _sock: Optional[ssl.SSLSocket] = None - _io: Optional[socket.SocketIO] = None + def __init__(self, app_key, token) -> None: + self.caches: dict[StreamRef, Union[MarketCache, OrderCache]] = {} + self.esm = ExchangeStream(app_key, token) - def connect(self) -> None: - url = urllib.parse.urlparse(self._endpoint) - self._sock = create_ssl_socket(url.hostname) - self._sock.connect((url.hostname, url.port)) - self._io = socket.SocketIO(self._sock, "rwb") - msg: Connection = self.receive() # type: ignore - self._connection_id = msg.connection_id + def handle_change_message(self, msg: ChangeMessageType) -> ChangeMessageType: + self.caches[msg.id].update(msg) # type: ignore + return msg - def send(self, request: STREAM_REQUEST) -> None: - if not self._io: - raise StreamError("Stream is not connected") - msg = encode(request) + b"\r\n" - written_bytes = self._io.write(msg) - if not len(msg) == written_bytes: - raise StreamError(f"Incomplete request transfer: {written_bytes} of {len(msg)} bytes sent") + def subscribe(self, subscription: SubscriptionType) -> bytes: + if isinstance(subscription, MarketSubscription): + self.caches[subscription.id] = MarketCache() + elif isinstance(subscription, OrderSubscription): + self.caches[subscription.id] = OrderCache() + else: + raise TypeError("Invalid subscription type") + return self.esm.subscribe(subscription, self.handle_change_message) - def receive(self) -> STREAM_RESPONSE: - if not self._io: - raise StreamError("Stream is not connected") - return stream_decode(self._io.readline()) - - def close(self): - if self._io is not None: - self._io.close() - self._io = None - if self._sock is not None: - try: - self._sock.shutdown(socket.SHUT_RDWR) - except OSError: - pass - self._sock.close() - self._sock = None - - def authenticate(self, app_key: str, token: str) -> None: - self.send(Authentication(id=self.unique_id(), app_key=app_key, session=token)) - msg: Status = self.receive() # type: ignore - if msg.is_error: - raise StreamAuthenticationError(f"{msg.error_code.name}: {msg.error_message}") - if msg.connection_closed: - raise StreamAuthenticationError("Connection was closed by the server unexpectedly") - - def heartbeat(self) -> None: - self.send(Heartbeat(id=self.unique_id())) - - def __enter__(self): - self.connect() - return self + def receive(self, stream: io.RawIOBase) -> Any: + return self.esm.receive(stream) + + def connect(self, stream: io.RawIOBase) -> None: + self.esm.receive(stream) # read connection + stream.write(self.esm.connect()) # send auth + self.esm.receive(stream) + + def iter_changes(self, stream: io.RawIOBase) -> Iterable[list[str]]: + """Iterate over the stream, yielding lists of IDs of the updated markets.""" + if not self.esm.is_connected: + self.connect(stream) - def __exit__(self, exc_type, exc_val, exc_tb) -> None: - self.close() + while True: + changes = _message_changes(self.esm.receive(stream)) + if changes: + yield changes -class AsyncStream(_BaseStream): +class AsyncStream: + """Async version of io.RawIOBase over a SSL connection.""" + _reader: Optional[asyncio.StreamReader] = None _writer: Optional[asyncio.StreamWriter] = None + def __init__(self, endpoint, timeout: float = 15) -> None: + self._endpoint = endpoint + self._timeout = timeout + async def connect(self) -> None: url = urllib.parse.urlparse(self._endpoint) context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) @@ -114,23 +203,19 @@ async def connect(self) -> None: server_hostname=url.hostname, limit=1_000_000, ) - msg: Connection = await self.receive() # type: ignore - self._connection_id = msg.connection_id - async def send(self, request: STREAM_REQUEST) -> None: + async def write(self, data: bytes) -> None: if not self._writer: raise StreamError("Stream is not connected") - msg = encode(request) + b"\r\n" - self._writer.write(msg) + self._writer.write(data) await self._writer.drain() - async def receive(self) -> STREAM_RESPONSE: + async def readline(self) -> bytes: if not self._reader: raise StreamError("Stream is not connected") - data = await self._reader.readline() - return stream_decode(data) + return await self._reader.readline() - async def close(self): + async def close(self) -> None: self._reader = None # does not need to be closed explicitly if self._writer: await self._writer.drain() @@ -145,20 +230,28 @@ async def close(self): await self._writer.wait_closed() self._writer = None - async def authenticate(self, app_key: str, token: str) -> None: - await self.send(Authentication(id=self.unique_id(), app_key=app_key, session=token)) - msg: Status = await self.receive() # type: ignore - if msg.is_error: - raise StreamAuthenticationError(f"{msg.error_code.name}: {msg.error_message}") - if msg.connection_closed: - raise StreamAuthenticationError("Connection was closed by the server unexpectedly") - - async def heartbeat(self) -> None: - await self.send(Heartbeat(id=self.unique_id())) - async def __aenter__(self): await self.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: await self.close() + + +class AsyncStreamReader(StreamReader): + async def receive_async(self, stream: AsyncStream) -> Any: + return self.esm.receive_bytes(await stream.readline()) + + async def connect_async(self, stream: AsyncStream) -> None: + self.esm.receive_bytes(await stream.readline()) # read connection + await stream.write(self.esm.connect()) # send auth + self.esm.receive_bytes(await stream.readline()) + + async def iter_changes_async(self, stream: AsyncStream) -> AsyncGenerator[list[str], None]: + if not self.esm.is_connected: + await self.connect_async(stream) + + while True: + changes = _message_changes(self.esm.receive_bytes(await stream.readline())) + if changes: + yield changes diff --git a/tests/integration/benchmark.py b/tests/integration/benchmark.py index 94da18a..450d785 100644 --- a/tests/integration/benchmark.py +++ b/tests/integration/benchmark.py @@ -15,7 +15,7 @@ def test_performance(benchmark): result = benchmark.pedantic(stream_decode_lines, args=(data,)) assert len(result) == 50854 for msg in result: - # TODO: use isinstance(msg, STREAM_RESPONSE) for py3.10+ + # TODO: use isinstance(msg, StreamResponseType) for py3.10+ assert isinstance(msg, (MCM, OCM)) diff --git a/tests/integration/test_integration.py b/tests/integration/test_integration.py index 8c870ec..4ad8d47 100644 --- a/tests/integration/test_integration.py +++ b/tests/integration/test_integration.py @@ -43,7 +43,7 @@ def test_requests(path): raw = path.read_bytes() if "streaming" in str(path): data = stream_decode(raw) - # TODO: use isinstance(msg, STREAM_REQUEST) for py3.10+ + # TODO: use isinstance(msg, StreamRequestType) for py3.10+ assert isinstance(data, (Authentication, MarketSubscription, OrderSubscription)) return @@ -63,10 +63,10 @@ def test_responses(path): data = stream_decode(raw) if isinstance(data, list): for msg in data: - # TODO: use isinstance(msg, STREAM_RESPONSE) for py3.10+ + # TODO: use isinstance(msg, StreamResponseType) for py3.10+ assert isinstance(msg, (MCM, OCM, Status, Connection)) else: - # TODO: use isinstance(msg, STREAM_RESPONSE) for py3.10+ + # TODO: use isinstance(msg, StreamResponseType) for py3.10+ assert isinstance(data, (MCM, OCM, Status, Connection)) return diff --git a/tests/integration/test_stream.py b/tests/integration/test_stream.py index 91a5737..21e2188 100644 --- a/tests/integration/test_stream.py +++ b/tests/integration/test_stream.py @@ -2,6 +2,7 @@ from requests import Session # alternatively use httpx.Client from betfair_parser import client +from betfair_parser.cache import MarketDefinition, RunnerOrderBook from betfair_parser.endpoints import STREAM_INTEGRATION from betfair_parser.exceptions import BetfairError from betfair_parser.spec.common import EventTypeIdCode @@ -18,7 +19,7 @@ OrderSubscription, Status, ) -from betfair_parser.stream import AsyncStream, Stream +from betfair_parser.stream import AsyncStream, ExchangeStream, StreamReader, create_stream_io from tests.integration.test_live import appconfig # noqa: F401 @@ -32,9 +33,11 @@ def session(appconfig) -> Session: # noqa return s +MARKET_STREAM_ID = 1 +ORDER_STREAM_ID = 2 SUBSCRIPTIONS = [ MarketSubscription( - id=1, + id=MARKET_STREAM_ID, heartbeat_ms=500, market_filter=MarketFilter( betting_types=[MarketBettingType.ODDS], @@ -51,7 +54,7 @@ def session(appconfig) -> Session: # noqa ], ), ), - OrderSubscription(id=2, heartbeat_ms=500, order_filter=OrderFilter()), + OrderSubscription(id=ORDER_STREAM_ID, heartbeat_ms=500, order_filter=OrderFilter()), ] @@ -61,13 +64,17 @@ def session(appconfig) -> Session: # noqa ids=lambda x: type(x).__name__, ) def test_stream(session, subscription, iterations=3): - token = session.headers.get("X-Authentication") app_key = session.headers.get("X-Application") + token = session.headers.get("X-Authentication") + esm = ExchangeStream(app_key, token) - with Stream(STREAM_INTEGRATION) as strm: - strm.authenticate(app_key, token) - strm.send(subscription) - msg: Status = strm.receive() + with create_stream_io(STREAM_INTEGRATION) as stream: + print(esm.receive(stream)) # read connection + stream.write(esm.connect()) # send auth + print(esm.receive(stream)) + + stream.write(esm.subscribe(subscription)) + msg: Status = esm.receive(stream) assert isinstance(msg, Status) assert not msg.is_error, f"{msg.error_code.name}: {msg.error_message}" assert not msg.connection_closed @@ -78,7 +85,7 @@ def test_stream(session, subscription, iterations=3): req_type = MCM if isinstance(subscription, MarketSubscription) else OCM for _ in range(iterations): - msg = strm.receive() + msg = esm.receive(stream) assert isinstance(msg, req_type) print(msg) @@ -92,11 +99,15 @@ def test_stream(session, subscription, iterations=3): async def test_async_stream(session, subscription, iterations=3): token = session.headers.get("X-Authentication") app_key = session.headers.get("X-Application") + esm = ExchangeStream(app_key, token) + + async with AsyncStream(STREAM_INTEGRATION) as stream: + esm.receive_bytes(await stream.readline()) + await stream.write(esm.connect()) + esm.receive_bytes(await stream.readline()) + await stream.write(esm.subscribe(subscription)) - async with AsyncStream(STREAM_INTEGRATION) as strm: - await strm.authenticate(app_key, token) - await strm.send(subscription) - msg: Status = await strm.receive() + msg: Status = esm.receive_bytes(await stream.readline()) assert isinstance(msg, Status) assert not msg.is_error, f"{msg.error_code.name}: {msg.error_message}" assert not msg.connection_closed @@ -107,6 +118,48 @@ async def test_async_stream(session, subscription, iterations=3): req_type = MCM if isinstance(subscription, MarketSubscription) else OCM for _ in range(iterations): - msg = await strm.receive() + msg = esm.receive_bytes(await stream.readline()) assert isinstance(msg, req_type) print(msg) + + +def test_stream_reader(session, iterations=15): + app_key = session.headers.get("X-Application") + token = session.headers.get("X-Authentication") + + sr = StreamReader(app_key, token) + for subscription in SUBSCRIPTIONS: + sr.subscribe(subscription) # type: ignore + + with create_stream_io(STREAM_INTEGRATION) as stream: + for i, changed_ids in enumerate(sr.iter_changes(stream)): + assert len(changed_ids) + assert all(change_id.startswith("1.") for change_id in changed_ids) + assert all(change_id in sr.caches[MARKET_STREAM_ID].order_book for change_id in changed_ids) # type: ignore + if i >= iterations: + break + + order_book = sr.caches[MARKET_STREAM_ID].order_book # type: ignore + market_definitions: dict[str, MarketDefinition] = sr.caches[MARKET_STREAM_ID].market_definitions # type: ignore + assert len(order_book) == len(market_definitions) > 20 + assert all(isinstance(key, str) for key in market_definitions) + assert all(isinstance(md, MarketDefinition) for md in market_definitions.values()) + + assert all(isinstance(key, str) for key in order_book) + for market_order_book in order_book.values(): + for runner_order_book in market_order_book.values(): + assert isinstance(runner_order_book, RunnerOrderBook) + if not runner_order_book.total_volume: + # skip empty order books + continue + assert runner_order_book.available_to_back + assert runner_order_book.available_to_lay + assert runner_order_book.last_traded_price + + # fields must be deleted when nulled + for volume in runner_order_book.available_to_back.values(): + assert volume + for volume in runner_order_book.available_to_lay.values(): + assert volume + + assert sr.caches[ORDER_STREAM_ID].orders is not None # type: ignore diff --git a/tests/unit/test_streaming.py b/tests/unit/test_streaming.py index 84076ab..d39e327 100644 --- a/tests/unit/test_streaming.py +++ b/tests/unit/test_streaming.py @@ -9,7 +9,7 @@ def test_ocm(): b'{"op":"ocm","id":2,"clk":"AAAAAAAAAAAAAA==","pt":1669350204489,"oc":[{"id":"1.206818134","fullImage":true,' b'"orc":[{"id":49914337,"fullImage":true,"uo":[],"mb":[],"ml":[[2, 100]]}]}]}' ) - ocm: OCM = stream_decode(raw) + ocm: OCM = stream_decode(raw) # type: ignore assert isinstance(ocm, OCM) assert ocm.oc[0].orc[0].ml[0] == MatchedOrder(price=2.0, size=100) @@ -28,7 +28,7 @@ def test_mcm(): b'"discountAllowed":null,"timezone":"GMT","openDate":"2021-10-13T23:40:00.000Z","version":4099822530,' b'"priceLadderDefinition":"CLASSIC"}}]}' ) - mcm: MCM = stream_decode(raw) + mcm: MCM = stream_decode(raw) # type: ignore assert isinstance(mcm, MCM) runner = mcm.mc[0].market_definition.runners[0] assert runner.hc == 0.0 @@ -131,7 +131,7 @@ def test_mcm_no_missing_fields(): } ], } - mcm: MCM = stream_decode(msgspec.json.encode(raw)) + mcm: MCM = stream_decode(msgspec.json.encode(raw)) # type: ignore data = msgspec.json.decode(msgspec.json.encode(mcm)) result = set(data["mc"][0]["marketDefinition"].keys()) expected = set(raw["mc"][0]["marketDefinition"].keys()) # type: ignore @@ -140,20 +140,20 @@ def test_mcm_no_missing_fields(): def test_mcm_no_clk(): raw = b'{"op": "mcm", "clk": null, "pt": 1576840503572, "mc": []}' # noqa - mcm: MCM = stream_decode(raw) + mcm: MCM = stream_decode(raw) # type: ignore assert mcm.clk is None def test_mcm_market_definition_each_way(): raw = (RESOURCES_DIR / "responses" / "streaming" / "mcm_market_definition_each_way.json").read_bytes() - mcm: MCM = stream_decode(raw) + mcm: MCM = stream_decode(raw) # type: ignore assert mcm.mc[0].market_definition.market_type == "EACH_WAY" assert mcm.mc[0].market_definition.each_way_divisor == 4.0 def test_bsp_data(): raw = (RESOURCES_DIR / "responses" / "streaming" / "mcm_bsp_data.json").read_bytes() - mcm: MCM = stream_decode(raw)[0] + mcm: MCM = stream_decode(raw)[0] # type: ignore rc = mcm.mc[0].rc[0] assert rc.spl == [StartingPriceLay(price=1.01, volume=2.8)] assert rc.spn == 4.5 @@ -161,7 +161,7 @@ def test_bsp_data(): def test_bsp_result(): raw = (RESOURCES_DIR / "responses" / "streaming" / "mcm_market_definition_bsp.json").read_bytes() - mcm: MCM = stream_decode(raw) + mcm: MCM = stream_decode(raw) # type: ignore runners = mcm.mc[0].market_definition.runners assert runners[0].bsp == 2.0008034621107256 assert runners[0].status == RunnerStatus.WINNER @@ -169,5 +169,5 @@ def test_bsp_result(): def test_status_error_alt(): raw = (RESOURCES_DIR / "responses" / "streaming" / "status_error_alt.json").read_bytes() - status: Status = stream_decode(raw) + status: Status = stream_decode(raw) # type: ignore assert status