Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

State Store preparation #55

Merged
merged 12 commits into from
May 1, 2018
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Byte-compiled / optimized / DLL files
__pycache__/
.pytest_cache/
*.py[cod]
*$py.class

Expand Down
2 changes: 1 addition & 1 deletion Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Vagrant.configure("2") do |config|

config.vm.provision "shell", inline: <<-SHELL
export SCALA_VER=2.11
export KAFKA_VER=0.11.0.1
export KAFKA_VER=1.0.0
export KAFKA_PACKAGE=kafka_${SCALA_VER}-${KAFKA_VER}

apt-get update
Expand Down
1 change: 0 additions & 1 deletion examples/debug/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,3 @@ bootstrap.servers = localhost:9092
auto.offset.reset = earliest
enable.auto.commit = false
value.serde = winton_kafka_streams.processor.serialization.serdes.IntegerSerde
key.serde = winton_kafka_streams.processor.serialization.serdes.IntegerSerde
33 changes: 25 additions & 8 deletions examples/debug/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,53 @@
import time

from winton_kafka_streams.processor import BaseProcessor, TopologyBuilder
from winton_kafka_streams.state.simple import SimpleStore
import winton_kafka_streams.kafka_config as kafka_config
import winton_kafka_streams.kafka_streams as kafka_streams
import winton_kafka_streams.state as state_stores

log = logging.getLogger(__name__)


class DoubleProcessor(BaseProcessor):
"""
Example processor that will double the value passed in

"""

def process(self, key, value):
log.debug(f'DoubleProcessor::process({key}, {str(value)})')
def initialise(self, name, context):
super().initialise(name, context)
self.state = context.get_store('double_store')

def process(self, _, value):
log.debug(f'DoubleProcessor::process({str(value)})')
doubled = value*2
log.debug(f'Forwarding to sink ({key}, {str(doubled)})')
self.context.forward(key, doubled)
items_in_state = len(self.state)
self.state[items_in_state] = doubled
if items_in_state >= 4:
self.punctuate()

# TODO -- finish off the spec from the README, need to keep state
def punctuate(self):
for _, value in self.state.items():
log.debug(f'Forwarding to sink ({str(value)})')
self.context.forward(None, value)
self.state.clear()


def _debug_run(config_file):
kafka_config.read_local_config(config_file)

double_store = state_stores.create('double_store'). \
with_integer_keys(). \
with_integer_values(). \
in_memory(). \
build()

with TopologyBuilder() as topology_builder:
topology_builder. \
source('input-value', ['wks-debug-example-topic-two']). \
processor('double', DoubleProcessor, 'input-value'). \
sink('output-double', 'wks-debug-example-output', 'double'). \
state_store('double-store', SimpleStore, 'double')
state_store(double_store, 'double'). \
sink('output-double', 'wks-debug-example-output', 'double')

wks = kafka_streams.KafkaStreams(topology_builder, kafka_config)
wks.start()
Expand Down
4 changes: 2 additions & 2 deletions examples/wordcount/custom_serde.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from winton_kafka_streams.processor.serialization.integer import IntegerSerializer
from winton_kafka_streams.processor.serialization import IntegerSerializer
from winton_kafka_streams.processor.serialization.serdes.wrapper_serde import WrapperSerde
from winton_kafka_streams.processor.serialization.string import StringDeserializer
from winton_kafka_streams.processor.serialization import StringDeserializer


class StringIntSerde(WrapperSerde):
Expand Down
12 changes: 9 additions & 3 deletions examples/wordcount/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
import winton_kafka_streams.kafka_config as kafka_config
import winton_kafka_streams.kafka_streams as kafka_streams
from winton_kafka_streams.processor import BaseProcessor, TopologyBuilder
from winton_kafka_streams.state import InMemoryKeyValueStore, ChangeLoggingKeyValueStore
import winton_kafka_streams.state as state_stores

log = logging.getLogger(__name__)


# An example implementation of word count,
# showing where punctuate can be useful
class WordCount(BaseProcessor):
Expand Down Expand Up @@ -49,12 +50,17 @@ def run(config_file, binary_output):
if binary_output:
kafka_config.VALUE_SERDE = 'examples.wordcount.custom_serde.StringIntSerde'

count_store = lambda name: ChangeLoggingKeyValueStore(name, InMemoryKeyValueStore)
count_store = state_stores.create('counts'). \
with_string_keys(). \
with_integer_values(). \
in_memory(). \
build()

with TopologyBuilder() as topology_builder:
topology_builder. \
source('input-value', ['wks-wordcount-example-topic']). \
processor('count', WordCount, 'input-value'). \
state_store('counts', count_store, 'count'). \
state_store(count_store, 'count'). \
sink('output-count', 'wks-wordcount-example-count', 'count')

wks = kafka_streams.KafkaStreams(topology_builder, kafka_config)
Expand Down
4 changes: 2 additions & 2 deletions tests/processor/serde/test_instantiation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
def test_serde_instance_to_string():
serde = serdes.BytesSerde()
serde_str = serdes.serde_as_string(serde)
assert 'winton_kafka_streams.processor.serialization.serdes.bytes.BytesSerde' == serde_str
assert 'winton_kafka_streams.processor.serialization.serdes.bytes_serde.BytesSerde' == serde_str


def test_serde_class_to_string():
serde = serdes.BytesSerde
serde_str = serdes.serde_as_string(serde)
assert 'winton_kafka_streams.processor.serialization.serdes.bytes.BytesSerde' == serde_str
assert 'winton_kafka_streams.processor.serialization.serdes.bytes_serde.BytesSerde' == serde_str


def test_string_to_serde():
Expand Down
6 changes: 6 additions & 0 deletions tests/processor/serde/test_serialisation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ def test_string_serde():

def test_integer_serde():
int_serde = IntegerSerde()
assert int_serde.serializer.serialize('topic', -2132) == b'\xac\xf7\xff\xff'
assert int_serde.deserializer.deserialize('topic', b'\xac\xf7\xff\xff') == -2132


def test_long_serde():
int_serde = LongSerde()
assert int_serde.serializer.serialize('topic', -2132) == b'\xac\xf7\xff\xff\xff\xff\xff\xff'
assert int_serde.deserializer.deserialize('topic', b'\xac\xf7\xff\xff\xff\xff\xff\xff') == -2132

Expand Down
2 changes: 1 addition & 1 deletion tests/processor/test_base_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def test_initialiseBaseProcessor():
mock_task = mock.Mock()
mock_task.application_id = 'test_id'
mock_task_id = TaskId('test_group', 0)
mock_context = ProcessorContext(mock_task_id, mock_task, None, {})
mock_context = ProcessorContext(mock_task_id, mock_task, None, None, {})
bp = wks_processor.BaseProcessor()
bp.initialise('my-name', mock_context)

Expand Down
6 changes: 3 additions & 3 deletions tests/processor/test_sink_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ def test_sinkProcessorProcess():
mock_task = mock.Mock()
mock_task.application_id = 'test_id'
mock_task_id = TaskId('test_group', 0)
processor_context = wks_processor.ProcessorContext(mock_task_id, mock_task, None, {})
processor_context.recordCollector = mock.MagicMock()
processor_context = wks_processor.ProcessorContext(mock_task_id, mock_task, None, None, {})
processor_context.record_collector = mock.MagicMock()

sink = wks_processor.SinkProcessor('topic1')
sink.initialise('test-sink', processor_context)
assert sink.name == 'test-sink'

test_key, test_value = 'test-key', 'test-value'
sink.process(test_key, test_value)
assert processor_context.recordCollector.called_with(test_key, test_value, _expected_timestamp)
assert processor_context.record_collector.called_with(test_key, test_value, _expected_timestamp)
21 changes: 12 additions & 9 deletions tests/state/test_in_memory_key_value_store.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import pytest
from winton_kafka_streams.state.in_memory_key_value_store import InMemoryKeyValueStore

from winton_kafka_streams.processor.serialization.serdes import BytesSerde
from winton_kafka_streams.state.in_memory.in_memory_state_store import InMemoryStateStore


def test_inMemoryKeyValueStore():
store = InMemoryKeyValueStore('teststore')
store = InMemoryStateStore('teststore', BytesSerde(), BytesSerde(), False)
kv_store = store.get_key_value_store()

store['a'] = 1
assert store['a'] == 1
kv_store['a'] = 1
assert kv_store['a'] == 1

store['a'] = 2
assert store['a'] == 2
kv_store['a'] = 2
assert kv_store['a'] == 2

del store['a']
assert store.get('a') is None
del kv_store['a']
assert kv_store.get('a') is None
with pytest.raises(KeyError):
store['a']
_ = kv_store['a']
12 changes: 4 additions & 8 deletions winton_kafka_streams/kafka_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import sys

import javaproperties
from typing import List

from .processor.serialization.serdes import BytesSerde, serde_as_string
from .errors.kafka_streams_error import KafkaStreamsError
Expand Down Expand Up @@ -125,7 +126,7 @@
Default: []
Importance: Low
"""
METRIC_REPORTERS = []
METRIC_REPORTERS: List[str] = []

"""
The number of samples maintained to compute metrics.
Expand Down Expand Up @@ -252,27 +253,22 @@
VALUE_SERIALIZER_ERROR = None
VALUE_DESERIALIZER_ERROR = None

# IntegerSerde - byte order
# IntegerSerde/LongSerde - byte order
SERIALIZER_BYTEORDER = 'little'
DESERIALIZER_BYTEORDER = 'little'
KEY_SERIALIZER_BYTEORDER = None
KEY_DESERIALIZER_BYTEORDER = None
VALUE_SERIALIZER_BYTEORDER = None
VALUE_DESERIALIZER_BYTEORDER = None

# IntegerSerde - signed integer
# IntegerSerde/LongSerde - signed integer
SERIALIZER_SIGNED = 'true'
DESERIALIZER_SIGNED = 'true'
KEY_SERIALIZER_SIGNED = None
KEY_DESERIALIZER_SIGNED = None
VALUE_SERIALIZER_SIGNED = None
VALUE_DESERIALIZER_SIGNED = None

# IntegerSerde - int size in bytes
SERIALIZER_INT_SIZE = 4
KEY_SERIALIZER_INT_SIZE = None
VALUE_SERIALIZER_INT_SIZE = None

# AvroSerde
AVRO_SCHEMA_REGISTRY = None
AVRO_SCHEMA = None
Expand Down
7 changes: 3 additions & 4 deletions winton_kafka_streams/kafka_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ def is_created_or_running(self):
def __str__(self):
return self.name

def __init__(self, topology, kafka_config):
self.topology = topology
def __init__(self, topology_builder, kafka_config):
self.kafka_config = kafka_config

self.state = self.State.CREATED
Expand All @@ -91,8 +90,8 @@ def __init__(self, topology, kafka_config):

self.consumer = None

self.stream_threads = [StreamThread(topology, kafka_config, KafkaClientSupplier(self.kafka_config))
for i in range(int(self.kafka_config.NUM_STREAM_THREADS))]
self.stream_threads = [StreamThread(topology_builder, self.kafka_config, KafkaClientSupplier(self.kafka_config))
for _ in range(int(self.kafka_config.NUM_STREAM_THREADS))]
for stream_thread in self.stream_threads:
stream_thread.set_state_listener(self.on_thread_state_change)
self.thread_states[stream_thread.thread_id()] = stream_thread.state
Expand Down
47 changes: 25 additions & 22 deletions winton_kafka_streams/processor/_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@

import functools
import logging
from typing import Any, Callable

from winton_kafka_streams.state.key_value_state_store import KeyValueStateStore
from ..errors.kafka_streams_error import KafkaStreamsError

log = logging.getLogger(__name__)


def _raiseIfNullRecord(fn):
def _raise_if_null_record(fn: Callable[..., Any]) -> Callable[..., Any]:
@functools.wraps(fn)
def _inner(*args, **kwargs):
if args[0].currentRecord is None:
if args[0].current_record is None:
raise KafkaStreamsError(f"Record cannot be unset when retrieving {fn.__name__}")
return fn(*args, **kwargs)
return _inner
Expand All @@ -26,9 +28,10 @@ class Context:

"""

def __init__(self, _state_stores):
self.currentNode = None
self.currentRecord = None
def __init__(self, _state_record_collector, _state_stores):
self.current_node = None
self.current_record = None
self.state_record_collector = _state_record_collector
self._state_stores = _state_stores

def send(self, topic, key, obj):
Expand All @@ -47,36 +50,36 @@ def schedule(self, timestamp):

pass

@property
@_raiseIfNullRecord
@property # type: ignore # waiting on https://github.com/python/mypy/issues/1362
@_raise_if_null_record
def offset(self):
return self.currentRecord.offset()
return self.current_record.offset()

@property
@_raiseIfNullRecord
@property # type: ignore
@_raise_if_null_record
def partition(self):
return self.currentRecord.partition()
return self.current_record.partition()

@property
@_raiseIfNullRecord
@property # type: ignore
@_raise_if_null_record
def timestamp(self):
return self.currentRecord.timestamp()
return self.current_record.timestamp()

@property
@_raiseIfNullRecord
@property # type: ignore
@_raise_if_null_record
def topic(self):
return self.currentRecord.topic()
return self.current_record.topic()

def get_store(self, name):
if not self.currentNode:
def get_store(self, name) -> KeyValueStateStore:
if not self.current_node:
raise KafkaStreamsError("Access of state from unknown node")

# TODO: Need to check for a global state here
# This is the reason that processors access store through context

if name not in self.currentNode.state_stores:
raise KafkaStreamsError(f"Processor {self.currentNode.name} does not have access to store {name}")
if name not in self.current_node.state_stores:
raise KafkaStreamsError(f"Processor {self.current_node.name} does not have access to store {name}")
if name not in self._state_stores:
raise KafkaStreamsError(f"Store {name} is not found")

return self._state_stores[name]
return self._state_stores[name].get_key_value_store()
Loading