forked from idanya/algo-trader
-
Notifications
You must be signed in to change notification settings - Fork 0
/
processor.py
50 lines (38 loc) · 1.65 KB
/
processor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from __future__ import annotations
from abc import abstractmethod
from typing import Optional, Dict
from algotrader.entities.candle import Candle
from algotrader.entities.event import Event
from algotrader.entities.serializable import Deserializable, Serializable
from algotrader.pipeline.shared_context import SharedContext
from algotrader.serialization.store import DeserializationService
class Processor(Serializable, Deserializable):
def __init__(self, next_processor: Optional[Processor] = None) -> None:
self.next_processor = next_processor
@abstractmethod
def process(self, context: SharedContext, candle: Candle):
if self.next_processor:
self.next_processor.process(context, candle)
def reprocess(self, context: SharedContext, candle: Candle):
if self.next_processor:
self.next_processor.reprocess(context, candle)
def event(self, context: SharedContext, event: Event):
if self.next_processor:
self.next_processor.event(context, event)
@classmethod
def deserialize(cls, data: Dict) -> Optional[Processor]:
obj = cls(None)
obj.next_processor = cls._deserialize_next_processor(data)
return obj
@classmethod
def _deserialize_next_processor(cls, data: Dict) -> Optional[Processor]:
if data.get('next_processor'):
return DeserializationService.deserialize(data['next_processor'])
return None
def serialize(self) -> Dict:
obj = super().serialize()
if self.next_processor:
obj.update({
"next_processor": self.next_processor.serialize()
})
return obj