Skip to content

Commit

Permalink
Improve stream reading and caching
Browse files Browse the repository at this point in the history
  • Loading branch information
ml31415 committed May 2, 2024
1 parent 5a326b8 commit 67e4cb1
Show file tree
Hide file tree
Showing 9 changed files with 287 additions and 127 deletions.
16 changes: 12 additions & 4 deletions betfair_parser/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
OCM,
PV,
ChangeType,
MarketDefinition,
MatchedOrder,
Order,
OrderRunnerChange,
Expand Down Expand Up @@ -154,7 +155,7 @@ def update_meta(self, msg: Union[MCM, OCM]) -> None:
self.clear()

def clear(self) -> None:
return
raise NotImplementedError()


class MarketCache(ChangeCache):
Expand All @@ -166,8 +167,10 @@ class MarketCache(ChangeCache):
"""

def __init__(self):
self.order_book: defaultdict[str, defaultdict] = defaultdict(lambda: defaultdict(RunnerOrderBook))
self.market_definitions = {}
self.order_book: defaultdict[str, defaultdict[int, RunnerOrderBook]] = defaultdict(
lambda: defaultdict(RunnerOrderBook)
)
self.market_definitions: dict[str, MarketDefinition] = {}

def clear(self) -> None:
self.order_book.clear()
Expand All @@ -177,6 +180,9 @@ def update(self, mcm: MCM) -> None:
self.update_meta(mcm)
if mcm.is_heartbeat:
return
if not mcm.mc:
return

for mc in mcm.mc:
if mc.img:
self.order_book.pop(mc.id, None)
Expand Down Expand Up @@ -236,7 +242,7 @@ class OrderCache(ChangeCache):
"""

def __init__(self):
self.orders: defaultdict[str, defaultdict] = defaultdict(lambda: defaultdict(RunnerOrders))
self.orders: defaultdict[str, defaultdict[int, RunnerOrders]] = defaultdict(lambda: defaultdict(RunnerOrders))

def clear(self) -> None:
self.orders.clear()
Expand All @@ -245,6 +251,8 @@ def update(self, ocm: OCM) -> None:
self.update_meta(ocm)
if ocm.is_heartbeat:
return
if not ocm.oc:
return

for oc in ocm.oc:
if oc.full_image:
Expand Down
17 changes: 10 additions & 7 deletions betfair_parser/spec/streaming/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,23 @@
StartingPriceBack,
StartingPriceLay,
StrategyMatchChange,
StreamRef,
Trade,
)


STREAM_REQUEST = Union[Authentication, MarketSubscription, OrderSubscription, Heartbeat]
STREAM_RESPONSE = Union[Connection, Status, MCM, OCM]
CHANGE_MESSAGE = Union[MCM, OCM]
_STREAM_MESSAGES = Union[STREAM_RESPONSE, list[STREAM_RESPONSE], STREAM_REQUEST]
_STREAM_DECODER = Decoder(_STREAM_MESSAGES, strict=False)
StreamRequestType = Union[Authentication, MarketSubscription, OrderSubscription, Heartbeat]
StreamResponseType = Union[Connection, Status, MCM, OCM]
SubscriptionType = Union[MarketSubscription, OrderSubscription]
ChangeMessageType = Union[MCM, OCM]
StreamMessageType = Union[StreamResponseType, list[StreamResponseType], StreamRequestType]

_STREAM_DECODER = Decoder(StreamMessageType, strict=False)

def stream_decode(raw: Union[str, bytes]):

def stream_decode(raw: Union[str, bytes]) -> StreamMessageType:
return _STREAM_DECODER.decode(raw)


def stream_decode_lines(raw: Union[str, bytes]):
def stream_decode_lines(raw: Union[str, bytes]) -> list[StreamMessageType]:
return _STREAM_DECODER.decode_lines(raw)
9 changes: 5 additions & 4 deletions betfair_parser/spec/streaming/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
Definition of the betfair streaming API messages as defined in:
- https://docs.developer.betfair.com/display/1smk3cen4v3lu3yomq5qye0ni/Exchange+Stream+API
- https://github.com/betfair/stream-api-sample-code/blob/master/ESASwaggerSchema.json
"""
"""

from typing import Literal, Optional, Union
from typing import Literal, Optional

from betfair_parser.spec.common import BaseMessage, first_lower
from betfair_parser.spec.streaming.enums import ChangeType, SegmentType, StatusErrorCode
Expand All @@ -14,19 +14,20 @@
MarketFilter,
OrderFilter,
OrderMarketChange,
StreamRef,
)


class _StreamRequest(BaseMessage, tag_field="op", tag=first_lower, frozen=True):
"""Common parent class for any stream request."""

id: Optional[Union[int, str]] = None # Client generated unique id to link request with response (like json rpc)
id: Optional[StreamRef] = None # Client generated unique id to link request with response (like json rpc)


class _StreamResponse(BaseMessage, tag_field="op", tag=str.lower, frozen=True):
"""Common parent class for any stream response."""

id: Optional[Union[int, str]] = None # Client generated unique id to link request with response (like json rpc)
id: Optional[StreamRef] = None # Client generated unique id to link request with response (like json rpc)


class Authentication(_StreamRequest, kw_only=True, frozen=True):
Expand Down
2 changes: 2 additions & 0 deletions betfair_parser/spec/streaming/type_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from betfair_parser.spec.streaming.enums import LapseStatusReasonCode, MarketDataFilterFields, PriceLadderDefinitionType


StreamRef = Union[int, str]

# Request objects


Expand Down
Loading

0 comments on commit 67e4cb1

Please sign in to comment.