diff --git a/setup.py b/setup.py index 3162688..ba5cfe1 100644 --- a/setup.py +++ b/setup.py @@ -12,8 +12,10 @@ readme = readme_file.read() requirements = [ + 'javaproperties', 'confluent-kafka', - 'javaproperties' + 'requests', + 'avro-python3' ] test_requirements = [ diff --git a/tests/processor/serde/mock_schema_registry.py b/tests/processor/serde/mock_schema_registry.py new file mode 100644 index 0000000..0180c7a --- /dev/null +++ b/tests/processor/serde/mock_schema_registry.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python +# +# Copyright 2016 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +# +# derived from https://github.com/verisign/python-confluent-schemaregistry.git +# + +from confluent_kafka.avro import ClientError + + +class MockSchemaRegistryClient(object): + """ + A client that acts as a schema registry locally. + + Compatibiity related methods are not implemented at this time. + """ + + def __init__(self, max_schemas_per_subject=1000): + self.max_schemas_per_subject = max_schemas_per_subject + # subj => { schema => id } + self.subject_to_schema_ids = {} + # id => avro_schema + self.id_to_schema = {} + # subj => { schema => version } + self.subject_to_schema_versions = {} + + self.subject_to_latest_schema = {} + + # counters + self.next_id = 1 + self.schema_to_id = {} + + def _get_next_id(self, schema): + if schema in self.schema_to_id: + return self.schema_to_id[schema] + result = self.next_id + self.next_id += 1 + self.schema_to_id[schema] = result + return result + + def _get_next_version(self, subject): + if subject not in self.subject_to_schema_versions: + self.subject_to_schema_versions[subject] = {} + return len(self.subject_to_schema_versions[subject]) + + def _get_all_versions(self, subject): + versions = self.subject_to_schema_versions.get(subject, {}) + return sorted(versions) + + def _add_to_cache(self, cache, subject, schema, value): + if subject not in cache: + cache[subject] = {} + sub_cache = cache[subject] + sub_cache[schema] = value + + def _cache_schema(self, schema, schema_id, subject, version): + # don't overwrite anything + if schema_id in self.id_to_schema: + schema = self.id_to_schema[schema_id] + else: + self.id_to_schema[schema_id] = schema + + self._add_to_cache(self.subject_to_schema_ids, + subject, schema, schema_id) + + self._add_to_cache(self.subject_to_schema_versions, + subject, schema, version) + + if subject in self.subject_to_latest_schema: + si, s, v = self.subject_to_latest_schema[subject] + if v > version: + return + self.subject_to_latest_schema[subject] = (schema_id, schema, version) + + def register(self, subject, avro_schema): + """ + Register a schema with the registry under the given subject + and receive a schema id. + + avro_schema must be a parsed schema from the python avro library + + Multiple instances of the same schema will result in inconsistencies. + """ + schemas_to_id = self.subject_to_schema_ids.get(subject, {}) + schema_id = schemas_to_id.get(avro_schema, -1) + if schema_id != -1: + return schema_id + + # add it + version = self._get_next_version(subject) + schema_id = self._get_next_id(avro_schema) + + # cache it + self._cache_schema(avro_schema, schema_id, subject, version) + return schema_id + + def get_by_id(self, schema_id): + """Retrieve a parsed avro schema by id or None if not found""" + return self.id_to_schema.get(schema_id, None) + + def get_latest_schema(self, subject): + """ + Return the latest 3-tuple of: + (the schema id, the parsed avro schema, the schema version) + for a particular subject. + + If the subject is not found, (None,None,None) is returned. + """ + return self.subject_to_latest_schema.get(subject, (None, None, None)) + + def get_version(self, subject, avro_schema): + """ + Get the version of a schema for a given subject. + + Returns -1 if not found. + """ + schemas_to_version = self.subject_to_schema_versions.get(subject, {}) + return schemas_to_version.get(avro_schema, -1) + + def get_id_for_schema(self, subject, avro_schema): + """ + Get the ID of a parsed schema + """ + schemas_to_id = self.subject_to_schema_ids.get(subject, {}) + return schemas_to_id.get(avro_schema, -1) + + def test_compatibility(self, subject, avro_schema, version='latest'): + raise ClientError("not implemented") + + def update_compatibility(self, level, subject=None): + raise ClientError("not implemented") + + def get_compatibility(self, subject=None): + raise ClientError("not implemented") diff --git a/tests/processor/serde/test_avro_serde.py b/tests/processor/serde/test_avro_serde.py new file mode 100644 index 0000000..6487a8b --- /dev/null +++ b/tests/processor/serde/test_avro_serde.py @@ -0,0 +1,45 @@ +import io +import struct +from confluent_kafka.avro import loads as avro_loads +from .mock_schema_registry import MockSchemaRegistryClient +from winton_kafka_streams.processor.serialization.serdes import AvroSerde +import winton_kafka_streams.kafka_config as config + +string_avro = '{"type": "string"}' + + +def create_serde(registry, schema): + serde = AvroSerde() + config.AVRO_SCHEMA_REGISTRY = 'nowhere' + config.KEY_AVRO_SCHEMA = schema + + serde.configure(config, True) + serde.serializer._avro_helper._set_serializer(registry) + serde.deserializer._avro_helper._set_serializer(registry) + + serde.test_registry = registry + return serde + + +def test_serialize_avro(): + registry = MockSchemaRegistryClient() + serde = create_serde(registry, string_avro) + + message = serde.serializer.serialize('topic', 'data') + message_io = io.BytesIO(message) + magic, schema_id, length, string = struct.unpack('>bIb4s', message_io.read(10)) + assert(0 == magic) + assert(schema_id in registry.id_to_schema) + assert(8 == length) # (==4) uses variable-length zig-zag encoding + assert(b'data' == string) + message_io.close() + + +def test_deserialize_avro(): + registry = MockSchemaRegistryClient() + serde = create_serde(registry, string_avro) + schema_id = registry.register('topic-value', avro_loads(string_avro)) + + serialized = b'\0' + schema_id.to_bytes(4, 'big') + b'\x08data' + message = serde.deserializer.deserialize('ignored', serialized) + assert('data' == message) diff --git a/winton_kafka_streams/kafka_config.py b/winton_kafka_streams/kafka_config.py index 028584b..fb9d7f0 100644 --- a/winton_kafka_streams/kafka_config.py +++ b/winton_kafka_streams/kafka_config.py @@ -4,7 +4,7 @@ Configuration may either be set inline in your application using: import kafka_config -kafka_config.BOOSTRAP_SERVERS = 'localhost:9092' +kafka_config.BOOTSTRAP_SERVERS = 'localhost:9092' or as a file in java properties format. The property names are identical to those used in the Java implementation for ease of sharing between both. @@ -273,6 +273,14 @@ KEY_SERIALIZER_INT_SIZE = None VALUE_SERIALIZER_INT_SIZE = None +# AvroSerde +AVRO_SCHEMA_REGISTRY = None +AVRO_SCHEMA = None +KEY_AVRO_SCHEMA_REGISTRY = None +KEY_AVRO_SCHEMA = None +VALUE_AVRO_SCHEMA_REGISTRY = None +VALUE_AVRO_SCHEMA = None + def read_local_config(config_file): if not os.path.exists(config_file): diff --git a/winton_kafka_streams/processor/serialization/avro.py b/winton_kafka_streams/processor/serialization/avro.py new file mode 100644 index 0000000..073ee96 --- /dev/null +++ b/winton_kafka_streams/processor/serialization/avro.py @@ -0,0 +1,68 @@ +from confluent_kafka.avro import CachedSchemaRegistryClient, MessageSerializer +from confluent_kafka.avro import loads as avro_loads + +from ._serde import extract_config_property +from ._deserializer import Deserializer +from ._serializer import Serializer + + +class AvroHelper: + def __init__(self): + self._is_key = False + self._schema_registry = None + self._serializer = None + self._schema = None + + def _set_serializer(self, schema_registry): + self._schema_registry = schema_registry + self._serializer = MessageSerializer(registry_client=self._schema_registry) + + def configure(self, configs, is_key): + self._is_key = is_key + schema_registry_url = extract_config_property(configs, is_key, 'AVRO_SCHEMA_REGISTRY') + schema = extract_config_property(configs, is_key, 'AVRO_SCHEMA') + + if schema_registry_url is None: + raise Exception("Missing Avro Schema Registry Url") + else: + self._set_serializer(CachedSchemaRegistryClient(url=schema_registry_url)) + + if schema: + self._schema = avro_loads(schema) + + def serialize(self, topic, data): + if self._schema is None: + raise Exception("Missing Avro Schema") + + return self._serializer.encode_record_with_schema(topic, self._schema, data, is_key=self._is_key) + + def deserialize(self, topic, data): + return self._serializer.decode_message(data) + + +class AvroSerializer(Serializer): + def __init__(self): + self._avro_helper = AvroHelper() + + def serialize(self, topic, data): + return self._avro_helper.serialize(topic, data) + + def configure(self, configs, is_key): + self._avro_helper.configure(configs, is_key) + + def close(self): + pass + + +class AvroDeserializer(Deserializer): + def __init__(self): + self._avro_helper = AvroHelper() + + def deserialize(self, topic, data): + return self._avro_helper.deserialize(topic, data) + + def configure(self, configs, is_key): + self._avro_helper.configure(configs, is_key) + + def close(self): + pass diff --git a/winton_kafka_streams/processor/serialization/serdes/__init__.py b/winton_kafka_streams/processor/serialization/serdes/__init__.py index 00014af..5e9d4d1 100644 --- a/winton_kafka_streams/processor/serialization/serdes/__init__.py +++ b/winton_kafka_streams/processor/serialization/serdes/__init__.py @@ -4,6 +4,7 @@ from .integer import IntegerSerde from .string import StringSerde from .json import JsonSerde +from .avro import AvroSerde from ._serdes import serde_from_string from ._serdes import serde_as_string diff --git a/winton_kafka_streams/processor/serialization/serdes/avro.py b/winton_kafka_streams/processor/serialization/serdes/avro.py new file mode 100644 index 0000000..bafee23 --- /dev/null +++ b/winton_kafka_streams/processor/serialization/serdes/avro.py @@ -0,0 +1,18 @@ +""" +Avro Serde + +""" +from ..avro import AvroSerializer, AvroDeserializer +from ._wrapper_serde import WrapperSerde + + +class AvroSerde(WrapperSerde): + """ + Avro Serde that will use Avro and a schema registry + for serialization and deserialization + """ + + def __init__(self): + serializer = AvroSerializer() + deserializer = AvroDeserializer() + super().__init__(serializer, deserializer)