Skip to content

Commit

Permalink
Removed undefined attributes
Browse files Browse the repository at this point in the history
Fixes limx0#39

docstrings and @properties added in order to make streamed data objects more convenient to use

AsyncStream closing improved

Introduced msgspec 0.18 decode_lines
  • Loading branch information
ml31415 committed Sep 26, 2023
1 parent 1def3c8 commit ba51fbf
Show file tree
Hide file tree
Showing 9 changed files with 401 additions and 201 deletions.
8 changes: 8 additions & 0 deletions betfair_parser/spec/streaming/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from betfair_parser.spec.betting.enums import MarketStatus # noqa: F401
from betfair_parser.spec.betting.enums import MarketTypeCode # noqa: F401
from betfair_parser.spec.betting.enums import RunnerStatus # noqa: F401
from betfair_parser.spec.streaming.enums import ChangeType # noqa: F401
from betfair_parser.spec.streaming.enums import SegmentType # noqa: F401
from betfair_parser.spec.streaming.messages import (
MCM,
OCM,
Expand All @@ -18,8 +20,10 @@
)
from betfair_parser.spec.streaming.type_definitions import MarketDataFilter # noqa: F401
from betfair_parser.spec.streaming.type_definitions import MarketDataFilterFields # noqa: F401
from betfair_parser.spec.streaming.type_definitions import MarketDefinition # noqa: F401
from betfair_parser.spec.streaming.type_definitions import MarketFilter # noqa: F401
from betfair_parser.spec.streaming.type_definitions import OrderFilter # noqa: F401
from betfair_parser.spec.streaming.type_definitions import RunnerChange # noqa: F401


STREAM_REQUEST = Union[Authentication, MarketSubscription, OrderSubscription, Heartbeat]
Expand All @@ -30,3 +34,7 @@

def stream_decode(raw: bytes):
return _STREAM_DECODER.decode(raw)


def stream_decode_lines(raw: bytes):
return _STREAM_DECODER.decode_lines(raw)
75 changes: 58 additions & 17 deletions betfair_parser/spec/streaming/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from betfair_parser.spec.streaming.type_definitions import (
MarketChange,
MarketDataFilter,
MarketDefinition,
MarketFilter,
OrderFilter,
OrderMarketChange,
Expand All @@ -32,38 +31,58 @@ class StreamResponse(BaseMessage, tag_field="op", tag=str.lower, omit_defaults=T


class Authentication(StreamRequest, kw_only=True, frozen=True):
app_key: str
session: str
"""
This message is the first message that the client must send on connecting to the server. You must
be authenticated before any other request is processed.
"""

app_key: str # Application Key
session: str # Session Token generated by the API login


class _Subscription(StreamRequest, kw_only=True, frozen=True):
"""Common parent class for any Subscription request."""

clk: Optional[str] = None # Token value delta (received in MarketChangeMessage) for resuming a subscription
conflate_ms: Optional[int] = None # the conflation rate (looped back on initial image: bounds are 0 to 120000)
heartbeat_ms: Optional[int] = None # the heartbeat rate (looped back on initial image: bounds are 500 to 5000)
conflate_ms: Optional[int] = None # The conflation rate (looped back on initial image: bounds are 0 to 120000)
heartbeat_ms: Optional[int] = None # The heartbeat rate (looped back on initial image: bounds are 500 to 5000)
initial_clk: Optional[str] = None # Token value that should be passed to resume a subscription
segmentation_enabled: bool = True # allow server to send large sets of data in segments, instead of a single block


class MarketSubscription(_Subscription, kw_only=True, frozen=True):
"""
This subscription type is used to receive price changes for one or more markets; your subscription
criteria determine what you see. Limiting the amount of data that you consume will make your initial
image much smaller (and faster) & suppress changes that are uninteresting to you.
"""

market_filter: MarketFilter
market_data_filter: MarketDataFilter


class OrderSubscription(_Subscription, kw_only=True, frozen=True):
"""This subscription type is used to receive order changes."""

order_filter: OrderFilter # Optional filter applied to order subscription


class Heartbeat(StreamRequest, frozen=True):
pass
"""
This is an explicit heartbeat request (in addition to the server heartbeat interval which is automatic).
This functionality should not normally be necessary unless you need to keep a firewall open.
"""


class Connection(StreamResponse, kw_only=True, frozen=True):
connection_id: str
"""This is received by the client when it successfully opens a connection to the server."""

connection_id: str # Unique identifier for support queries


class Status(StreamResponse, kw_only=True, frozen=True):
"""Every request receives a status response with a matching id."""

connection_closed: bool
connection_id: Optional[str] = None
connections_available: Optional[int] = None # The number of connections available for this account at this moment
Expand All @@ -79,15 +98,14 @@ def is_error(self):
class _ChangeMessage(StreamResponse, kw_only=True, frozen=True):
"""Common parent class for any ChangeMessage."""

clk: Optional[str] = None # Token value (non-null) should be stored for resuming in case of a disconnect
con: Optional[bool] = None # TODO: Undocumented in swagger, mb misplaced in the tests?
conflate_ms: Optional[int] = None # the conflation rate (may differ from that requested if subscription is delayed)
clk: Optional[str] = None # Token value (non-None) should be stored for resuming in case of a disconnect
conflate_ms: Optional[int] = None # The conflation rate (may differ from that requested if subscription is delayed)
ct: Optional[ChangeType] = None
heartbeat_ms: Optional[int] = None # heartbeat rate (may differ from requested: bounds are 500 to 30000)
initial_clk: Optional[str] = None # Token value (non-null) should be stored for resuming in case of a disconnect
pt: int # Publish Time (in millis since epoch) that the changes were generated
heartbeat_ms: Optional[int] = None # Heartbeat rate (may differ from requested: bounds are 500 to 30000)
initial_clk: Optional[str] = None # Token value (non-None) should be stored for resuming in case of a disconnect
pt: int # Publish Time
segment_type: Optional[SegmentType] = None # denotes the beginning and end of a segmentation
status: Optional[int] = None # null if the stream is up to date and 503 if the services are experiencing latencies
status: Optional[int] = None # None if the stream is up-to-date and 503 if the services are experiencing latencies

@property
def is_heartbeat(self):
Expand All @@ -97,11 +115,34 @@ def is_heartbeat(self):
def stream_unreliable(self):
return self.status == 503

@property
def change_type(self):
"""Set to indicate the type of change (if None this is a delta)"""
return self.ct

@property
def publish_time(self):
"""Time (in milliseconds since epoch) that the changes were generated"""
return self.pt


class MCM(_ChangeMessage, kw_only=True, frozen=True):
market_definition: Optional[MarketDefinition] = None # Undocumented in swagger
mc: Optional[List[MarketChange]] = None # empty for heartbeats
"""This is the ChangeMessage stream of data Betfair sends back after subscribing to the market stream."""

mc: Optional[List[MarketChange]] = None

@property
def market_changes(self):
"""The modifications to markets (will be None on a heartbeat)"""
return self.mc


class OCM(_ChangeMessage, kw_only=True, frozen=True):
oc: Optional[List[OrderMarketChange]] = None # empty for heartbeats
"""This is the ChangeMessage stream of data Betfair sends back after subscribing to the order stream."""

oc: Optional[List[OrderMarketChange]] = None

@property
def order_market_changes(self):
"""The modifications to account's orders (will be None on a heartbeat)"""
return self.oc
Loading

0 comments on commit ba51fbf

Please sign in to comment.