-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
State Store preparation #55
Conversation
…Task, Context to support new data structures
def __init__(self, topology, kafka_config): | ||
self.topology = topology | ||
def __init__(self, topology_builder, kafka_config): | ||
self.topology_builder = topology_builder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is never read after __init__
why store it in self?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can see no good reason. Maybe it was used previously. I'll remove this.
|
||
@property | ||
@property # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are these ignores about?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's wating on python/mypy#1362
Have put a comment in linking the issue
@@ -45,7 +45,8 @@ def send(self, topic, key, value, timestamp, | |||
log.exception(nie) | |||
produced = True # should not enter infinite loop | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this static now?
@@ -146,7 +146,7 @@ def run(self): | |||
|
|||
self.log.debug('Ending stream thread...') | |||
finally: | |||
self.commit_all() | |||
self.commit_all() # TODO does this mean we update the consumer offset under a failure scenario? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be fine, as this will only commit offsets that have been stored by successful processing. So a tuple is processed, offset for that is updated in-process, and then occasionally the in-process offsets are committed back to Kafka.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks
is_key : bool | ||
whether is for key or value | ||
""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does the configuration do? Could we pass this in during __init__
rather than mutating?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potentially. Worth looking into this, but it's outside the scope of this PR. This is just annotating the existing methods with type hints.
|
||
@abstractmethod | ||
def build(self) -> StateStoreSupplier[KT, VT]: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe raise NotImplementedError
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessary: the ABC metaclass will prevent any class derived from this from being instantiated that doesn't implement it.
class ChangeLoggingKeyValueStore(KeyValueStateStore[KT, VT]): | ||
# TODO : add write buffer | ||
# TODO : periodically dump full dict state to topic and change the | ||
# consumer offset so it need not read full history |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The latter one should just come out of the box with topic compaction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point - Will update comment
|
||
def serialize_key(self, key: KT) -> bytes: | ||
return None | ||
return self._key_serde.serializer.serialize("", key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two returns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops. Good catch! Fixed.
This has been corrected in my next branch already and has a test in place.
Looks good to me now! |
State store architecture rebuilt, ready for state rebuild and database connection handling. This is a step towards #21
This stage of the State Store work is large so will submit it for peer review and suggestions while working on the second part.
This step essentially adds no features. It introduces a StateStoreSupplier that is held in a Topology, Application and StreamThread; these generate StateStores for StreamTasks. A StateStore exists within a StreamTask and is designed to allow handling of database connections and Kafka Topic associations. A StateStore, creates a KeyValueStateStore for use within a processor, this provides a MutableMapping (python dict) as an interface to reading and writing from a state store.
As part of this work, the TopologyBuilder API to add a state store has changed. It has changed to take a StateStoreSupplier rather than a Callable. This is a potential breaking change for users for the initial implementation. Examples have been updated to reflect.
It adds a set of factory classes as an option to create StateStoreSupplier. This API stands out as Java-esque, it should be replaced with something that fits with the work for #22
Type hints have been introduced for the StateStore and Serde objects as (de)serialization from known, fixed types will be required on the state changelog.
The next stage will be to add the State rebuild from changelog and tests that check for correctness of the rebuilt state and failure scenarios around