From 214e360b735aa17f331fb336109043e0eecb6f46 Mon Sep 17 00:00:00 2001 From: Tim McCormack Date: Thu, 25 Aug 2022 23:59:38 +0000 Subject: [PATCH 1/9] refactor: Cache `create_schema_registry_client` and rename to `get_...` --- edx_event_bus_kafka/config.py | 14 +++++++++++++- edx_event_bus_kafka/consumer/event_consumer.py | 4 ++-- edx_event_bus_kafka/publishing/event_producer.py | 4 ++-- edx_event_bus_kafka/tests/test_config.py | 4 ++-- 4 files changed, 19 insertions(+), 7 deletions(-) diff --git a/edx_event_bus_kafka/config.py b/edx_event_bus_kafka/config.py index aa1654d..e73c4ba 100644 --- a/edx_event_bus_kafka/config.py +++ b/edx_event_bus_kafka/config.py @@ -3,9 +3,12 @@ """ import warnings +from functools import lru_cache from typing import Optional from django.conf import settings +from django.dispatch import receiver +from django.test.signals import setting_changed # See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst try: @@ -16,10 +19,13 @@ # return type (Optional[SchemaRegistryClient]) removed from signature to avoid error on import -def create_schema_registry_client(): +@lru_cache +def get_schema_registry_client(): """ Create a schema registry client from common settings. + This is cached for convenience. + Returns None if confluent_kafka library is not available or the settings are invalid. SchemaRegistryClient if it is. @@ -69,3 +75,9 @@ def load_common_settings() -> Optional[dict]: }) return base_settings + + +@receiver(setting_changed) +def _reset_state(sender, **kwargs): # pylint: disable=unused-argument + """Reset caches during testing when settings change.""" + get_schema_registry_client.cache_clear() diff --git a/edx_event_bus_kafka/consumer/event_consumer.py b/edx_event_bus_kafka/consumer/event_consumer.py index 0a4c285..695f21d 100644 --- a/edx_event_bus_kafka/consumer/event_consumer.py +++ b/edx_event_bus_kafka/consumer/event_consumer.py @@ -13,7 +13,7 @@ from openedx_events.learning.signals import SESSION_LOGIN_COMPLETED from openedx_events.tooling import OpenEdxPublicSignal -from edx_event_bus_kafka.config import create_schema_registry_client, load_common_settings +from edx_event_bus_kafka.config import get_schema_registry_client, load_common_settings logger = logging.getLogger(__name__) @@ -68,7 +68,7 @@ def _create_consumer(self): DeserializingConsumer if it is. """ - schema_registry_client = create_schema_registry_client() + schema_registry_client = get_schema_registry_client() # TODO (EventBus): # 1. Reevaluate if all consumers should listen for the earliest unprocessed offset (auto.offset.reset) diff --git a/edx_event_bus_kafka/publishing/event_producer.py b/edx_event_bus_kafka/publishing/event_producer.py index 03785f3..1fed351 100644 --- a/edx_event_bus_kafka/publishing/event_producer.py +++ b/edx_event_bus_kafka/publishing/event_producer.py @@ -14,7 +14,7 @@ from openedx_events.event_bus.avro.serializer import AvroSignalSerializer from openedx_events.tooling import OpenEdxPublicSignal -from edx_event_bus_kafka.config import create_schema_registry_client, load_common_settings +from edx_event_bus_kafka.config import get_schema_registry_client, load_common_settings logger = logging.getLogger(__name__) @@ -161,7 +161,7 @@ def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str): logger.warning('Library confluent-kafka not available. Cannot create event producer.') return None - schema_registry_client = create_schema_registry_client() + schema_registry_client = get_schema_registry_client() if schema_registry_client is None: return None diff --git a/edx_event_bus_kafka/tests/test_config.py b/edx_event_bus_kafka/tests/test_config.py index 80e684a..c81b804 100644 --- a/edx_event_bus_kafka/tests/test_config.py +++ b/edx_event_bus_kafka/tests/test_config.py @@ -17,11 +17,11 @@ class TestSchemaRegistryClient(TestCase): def test_unconfigured(self): - assert config.create_schema_registry_client() is None + assert config.get_schema_registry_client() is None def test_configured(self): with override_settings(EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345'): - assert isinstance(config.create_schema_registry_client(), SchemaRegistryClient) + assert isinstance(config.get_schema_registry_client(), SchemaRegistryClient) class TestCommonSettings(TestCase): From a9c6ac72afbf66399da6abb01182ab466123c60b Mon Sep 17 00:00:00 2001 From: Tim McCormack Date: Fri, 26 Aug 2022 15:14:50 +0000 Subject: [PATCH 2/9] test: Lift producer test data to be instance variables --- .../publishing/test_event_producer.py | 36 +++++++------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/edx_event_bus_kafka/publishing/test_event_producer.py b/edx_event_bus_kafka/publishing/test_event_producer.py index 81521bc..757cacb 100644 --- a/edx_event_bus_kafka/publishing/test_event_producer.py +++ b/edx_event_bus_kafka/publishing/test_event_producer.py @@ -24,8 +24,10 @@ class TestEventProducer(TestCase): """Test producer.""" - def test_extract_event_key(self): - event_data = { + def setUp(self): + super().setUp() + self.signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED + self.event_data = { 'user': UserData( id=123, is_active=True, @@ -37,17 +39,17 @@ def test_extract_event_key(self): ) } - assert ep.extract_event_key(event_data, 'user.pii.username') == 'foobob' + def test_extract_event_key(self): + assert ep.extract_event_key(self.event_data, 'user.pii.username') == 'foobob' with pytest.raises(Exception, match="Could not extract key from event; lookup in xxx failed at 'xxx' in dictionary"): - ep.extract_event_key(event_data, 'xxx') + ep.extract_event_key(self.event_data, 'xxx') with pytest.raises(Exception, match="Could not extract key from event; lookup in user.xxx failed at 'xxx' in object"): - ep.extract_event_key(event_data, 'user.xxx') + ep.extract_event_key(self.event_data, 'user.xxx') def test_descend_avro_schema(self): - signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED - schema = AvroSignalSerializer(signal).schema + schema = AvroSignalSerializer(self.signal).schema assert ep.descend_avro_schema(schema, ['user', 'pii', 'username']) == {"name": "username", "type": "string"} @@ -57,8 +59,7 @@ def test_descend_avro_schema(self): assert isinstance(excinfo.value.__cause__, IndexError) def test_extract_key_schema(self): - signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED - schema = ep.extract_key_schema(AvroSignalSerializer(signal), 'user.pii.username') + schema = ep.extract_key_schema(AvroSignalSerializer(self.signal), 'user.pii.username') assert schema == '{"name": "username", "type": "string"}' def test_get_producer_for_signal_unconfigured(self): @@ -100,25 +101,12 @@ def test_on_event_deliver(self, mock_logger): ) def test_send_to_event_bus(self): - signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED - event_data = { - 'user': UserData( - id=123, - is_active=True, - pii=UserPersonalData( - username='foobob', - email='bob@foo.example', - name="Bob Foo", - ) - ) - } - mock_producer = MagicMock() with patch('edx_event_bus_kafka.publishing.event_producer.get_producer_for_signal', return_value=mock_producer): - ep.send_to_event_bus(signal, 'user_stuff', 'user.id', event_data) + ep.send_to_event_bus(self.signal, 'user_stuff', 'user.id', self.event_data) mock_producer.produce.assert_called_once_with( - 'user_stuff', key=123, value=event_data, + 'user_stuff', key=123, value=self.event_data, on_delivery=ep.on_event_deliver, headers={'ce_type': 'org.openedx.learning.auth.session.login.completed.v1'}, ) From a4b9e524800cca5e299c5c665ed8436b61c4bf2e Mon Sep 17 00:00:00 2001 From: Tim McCormack Date: Fri, 26 Aug 2022 00:18:49 +0000 Subject: [PATCH 3/9] feat!: Switch to a single Producer, wrapped in an API singleton Purpose: - Revisit https://github.com/openedx/event-bus-kafka/issues/16 since I finally figured out a clean way to have a single producer. - Reduce the burden on future code that will need to adjust how polling is done (https://github.com/openedx/event-bus-kafka/issues/31) and maybe handle shutdown (https://github.com/openedx/event-bus-kafka/issues/11) - Prepare for configurable implementation loading, which will need a singleton and getter: https://github.com/openedx/openedx-events/issues/87 - Get rid of the `sync` argument (which didn't fit the abstraction) and move it to a dedicated method. Relying code should now call `get_producer().send(...)` rather than `send_to_event_bus(...)`. The return value is an object that wraps a `Producer` instance (not a `SerializingProducer`) and that handles the serialization itself. Serialization logic is moved to a cached `get_serializers(...)` that expands upon the previous `get_serializer` function; it now returns a pair of key and value serializers. This also acts as a patch point for mocking. I'd like to test the serializers themselves, but they want to talk to a server. `send_to_event_bus` gets a shorter name (now it's just a `send` method) and loses the `sync` keyword argument; there is instead now a `pre_shutdown` method. --- CHANGELOG.rst | 14 ++ edx_event_bus_kafka/__init__.py | 2 +- edx_event_bus_kafka/config.py | 2 + .../management/commands/produce_event.py | 7 +- .../publishing/event_producer.py | 202 +++++++++--------- .../publishing/test_event_producer.py | 51 +++-- 6 files changed, 158 insertions(+), 120 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 5e69ed7..63d6ef1 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -16,6 +16,20 @@ Unreleased * +[0.5.0] - 2022-08-29 +******************** + +Changed +======= + +* **Breaking changes** in the producer module, refactored to expose a better API: + + * Rather than `send_to_event_bus(...)`, relying code should now call `get_producer().send(...)`. + * The `sync` kwarg is gone; to flush and sync messages before shutdown, call `get_producer().pre_shutdown()` instead. + +* Clarify that config module is for internal use only. +* Implementation changes: Only a single Producer is created, and is used for all signals. + [0.4.4] - 2022-08-26 ******************** diff --git a/edx_event_bus_kafka/__init__.py b/edx_event_bus_kafka/__init__.py index fea9846..4f23f3a 100644 --- a/edx_event_bus_kafka/__init__.py +++ b/edx_event_bus_kafka/__init__.py @@ -2,4 +2,4 @@ Kafka implementation for Open edX event bus. """ -__version__ = '0.4.4' +__version__ = '0.5.0' diff --git a/edx_event_bus_kafka/config.py b/edx_event_bus_kafka/config.py index e73c4ba..fc76bc6 100644 --- a/edx_event_bus_kafka/config.py +++ b/edx_event_bus_kafka/config.py @@ -1,5 +1,7 @@ """ Configuration loading and validation. + +This module is for internal use only. """ import warnings diff --git a/edx_event_bus_kafka/management/commands/produce_event.py b/edx_event_bus_kafka/management/commands/produce_event.py index 162cf2c..c1d8e66 100644 --- a/edx_event_bus_kafka/management/commands/produce_event.py +++ b/edx_event_bus_kafka/management/commands/produce_event.py @@ -9,7 +9,7 @@ from django.utils.module_loading import import_string from openedx_events.tooling import OpenEdxPublicSignal -from edx_event_bus_kafka.publishing.event_producer import send_to_event_bus +from edx_event_bus_kafka.publishing.event_producer import get_producer logger = logging.getLogger(__name__) @@ -53,12 +53,13 @@ def add_arguments(self, parser): def handle(self, *args, **options): try: - send_to_event_bus( + producer = get_producer() + producer.send( signal=import_string(options['signal'][0]), topic=options['topic'][0], event_key_field=options['key_field'][0], event_data=json.loads(options['data'][0]), - sync=True, # otherwise command may exit before delivery is complete ) + producer.pre_shutdown() # otherwise command may exit before delivery is complete except Exception: # pylint: disable=broad-except logger.exception("Error producing Kafka event") diff --git a/edx_event_bus_kafka/publishing/event_producer.py b/edx_event_bus_kafka/publishing/event_producer.py index 1fed351..084103b 100644 --- a/edx_event_bus_kafka/publishing/event_producer.py +++ b/edx_event_bus_kafka/publishing/event_producer.py @@ -1,13 +1,13 @@ """ Produce Kafka events from signals. -Main function is ``send_to_event_bus``. +Main function is ``get_producer()``. """ import json import logging from functools import lru_cache -from typing import Any, List +from typing import Any, List, Optional from django.dispatch import receiver from django.test.signals import setting_changed @@ -21,8 +21,9 @@ # See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst try: import confluent_kafka - from confluent_kafka import SerializingProducer + from confluent_kafka import Producer from confluent_kafka.schema_registry.avro import AvroSerializer + from confluent_kafka.serialization import MessageField, SerializationContext except ImportError: # pragma: no cover confluent_kafka = None @@ -113,63 +114,25 @@ def extract_key_schema(signal_serializer: AvroSignalSerializer, event_key_field: @lru_cache -def get_serializer(signal: OpenEdxPublicSignal) -> AvroSignalSerializer: +def get_serializers(signal: OpenEdxPublicSignal, event_key_field: str): """ - Get the serializer for a signal. + Get the key and value serializers for a signal and a key field path. This is cached in order to save work re-transforming classes into Avro schemas. - """ - return AvroSignalSerializer(signal) - - -# Note: This caching is required, since otherwise the Producer will -# fall out of scope and be garbage-collected, destroying the -# outbound-message queue and threads. The use of this cache allows the -# producers to be long-lived. -# -# We are also likely to need to iterate through this cache at server -# shutdown in order to flush each of the producers, which means the -# cache needs to never evict. See https://github.com/openedx/event-bus-kafka/issues/11 -# for more details. -# -# (Why not change the code to use a single Producer rather than multiple -# SerializerProducer? Because the code actually turns out to be significantly -# uglier that way due to the number of separate values that need to be passed -# around in bundles. There aren't clear "cut-off" points. Additionally, it -# makes unit testing harder/uglier since now the mocks need to either deal with -# serialized bytes or mock out the serializers. Getting this down to a single -# Producer doesn't really seem worth the trouble.) - -# return type (Optional[SerializingProducer]) removed from signature to avoid error on import - -@lru_cache(maxsize=None) # Never evict an entry -- it's a small set and we need to keep all of them. -def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str): - """ - Create the producer for a signal and a key field path. - - If essential settings are missing or invalid, warn and return None. Arguments: - signal: The OpenEdxPublicSignal to make a producer for - event_key_field: Path to the event data field to use as the event key (period-delimited - string naming the dictionary keys to descend) + signal: The OpenEdxPublicSignal to make a serializer for. + event_key_field: Path to descend in the signal schema to find the subschema for the key + (period-delimited string naming the field names to descend). + Returns: - None if confluent_kafka is not defined or the settings are invalid. - SerializingProducer if it is. + 2-tuple of AvroSignalSerializers, for event key and value """ - if not confluent_kafka: # pragma: no cover - logger.warning('Library confluent-kafka not available. Cannot create event producer.') - return None + client = get_schema_registry_client() + if client is None: + raise Exception('Cannot create Kafka serializers -- missing library or settings') - schema_registry_client = get_schema_registry_client() - if schema_registry_client is None: - return None - - producer_settings = load_common_settings() - if producer_settings is None: - return None - - signal_serializer = get_serializer(signal) + signal_serializer = AvroSignalSerializer(signal) def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument """Tells Avro how to turn objects into dictionaries.""" @@ -178,21 +141,95 @@ def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument # Serializers for key and value components of Kafka event key_serializer = AvroSerializer( schema_str=extract_key_schema(signal_serializer, event_key_field), - schema_registry_client=schema_registry_client, + schema_registry_client=client, to_dict=inner_to_dict, ) value_serializer = AvroSerializer( schema_str=signal_serializer.schema_string(), - schema_registry_client=schema_registry_client, + schema_registry_client=client, to_dict=inner_to_dict, ) - producer_settings.update({ - 'key.serializer': key_serializer, - 'value.serializer': value_serializer, - }) + return key_serializer, value_serializer + + +class EventProducerKafka(): + """ + API singleton for event production to Kafka. + + This is just a wrapper around a confluent_kafka Producer that knows how to + serialize a signal to event wire format. + + Only one instance (of Producer or this wrapper) should be created, + since it is stateful and needs lifecycle management. + """ + + def __init__(self, producer): + self.producer = producer + + def send( + self, *, signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict, + ) -> None: + """ + Send a signal event to the event bus under the specified topic. + + Arguments: + signal: The original OpenEdxPublicSignal the event was sent to + topic: The event bus topic for the event + event_key_field: Path to the event data field to use as the event key (period-delimited + string naming the dictionary keys to descend) + event_data: The event data (kwargs) sent to the signal + """ + event_key = extract_event_key(event_data, event_key_field) + headers = {EVENT_TYPE_HEADER_KEY: signal.event_type} + + key_serializer, value_serializer = get_serializers(signal, event_key_field) + key_bytes = key_serializer(event_key, SerializationContext(topic, MessageField.KEY, headers)) + value_bytes = value_serializer(event_data, SerializationContext(topic, MessageField.VALUE, headers)) + + self.producer.produce( + topic, key=key_bytes, value=value_bytes, headers=headers, on_delivery=on_event_deliver, + ) + + # Opportunistically ensure any pending callbacks from recent event-sends are triggered. + # + # This assumes events come regularly, or that we're not concerned about + # high latency between delivery and callback. If those assumptions are + # false, we should switch to calling poll(1.0) or similar in a loop on + # a separate thread. Or do both. + # + # Issue: https://github.com/openedx/event-bus-kafka/issues/31 + self.producer.poll(0) + + def pre_shutdown(self): + """ + Prepare producer for a clean shutdown. - return SerializingProducer(producer_settings) + Flush pending outbound events, wait for acknowledgement, and process callbacks. + """ + self.producer.flush(-1) + + +# Note: This caching is required, since otherwise the Producer will +# fall out of scope and be garbage-collected, destroying the +# outbound-message queue and threads. The use of this cache allows the +# producer to be long-lived. +@lru_cache +def get_producer() -> Optional[EventProducerKafka]: + """ + Create or retrieve Producer API singleton. + + If confluent-kafka library or essential settings are missing, warn and return None. + """ + if not confluent_kafka: # pragma: no cover + logger.warning('Library confluent-kafka not available. Cannot create event producer.') + return None + + producer_settings = load_common_settings() + if producer_settings is None: + return None + + return EventProducerKafka(Producer(producer_settings)) def on_event_deliver(err, evt): @@ -214,51 +251,8 @@ def on_event_deliver(err, evt): f"partition={evt.partition()}") -def send_to_event_bus( - signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict, - sync: bool = False, -) -> None: - """ - Send a signal event to the event bus under the specified topic. - - If the Kafka settings are missing or invalid, return with a warning. - - Arguments: - signal: The original OpenEdxPublicSignal the event was sent to - topic: The event bus topic for the event - event_key_field: Path to the event data field to use as the event key (period-delimited - string naming the dictionary keys to descend) - event_data: The event data (kwargs) sent to the signal - sync: Whether to wait indefinitely for event to be received by the message bus (probably - only want to use this for testing) - """ - producer = get_producer_for_signal(signal, event_key_field) - if producer is None: # Note: SerializingProducer has False truthiness when len() == 0 - return - - event_key = extract_event_key(event_data, event_key_field) - producer.produce(topic, key=event_key, value=event_data, - on_delivery=on_event_deliver, - headers={EVENT_TYPE_HEADER_KEY: signal.event_type}) - - if sync: - # Wait for all buffered events to send, then wait for all of - # them to be acknowledged, and trigger all callbacks. - producer.flush(-1) - else: - # Opportunistically ensure any pending callbacks from recent events are triggered. - # - # This assumes events come regularly, or that we're not concerned about - # high latency between delivery and callback. If those assumptions are - # false, we should switch to calling poll(1.0) or similar in a loop on - # a separate thread. - # - # Docs: https://github.com/edenhill/librdkafka/blob/4faeb8132521da70b6bcde14423a14eb7ed5c55e/src/rdkafka.h#L3079 - producer.poll(0) - - @receiver(setting_changed) def _reset_caches(sender, **kwargs): # pylint: disable=unused-argument """Reset caches during testing when settings change.""" - get_serializer.cache_clear() - get_producer_for_signal.cache_clear() + get_serializers.cache_clear() + get_producer.cache_clear() diff --git a/edx_event_bus_kafka/publishing/test_event_producer.py b/edx_event_bus_kafka/publishing/test_event_producer.py index 757cacb..3ebb1e6 100644 --- a/edx_event_bus_kafka/publishing/test_event_producer.py +++ b/edx_event_bus_kafka/publishing/test_event_producer.py @@ -16,7 +16,7 @@ # See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst try: - from confluent_kafka import SerializingProducer + from confluent_kafka.schema_registry.avro import AvroSerializer except ImportError: # pragma: no cover pass @@ -62,18 +62,27 @@ def test_extract_key_schema(self): schema = ep.extract_key_schema(AvroSignalSerializer(self.signal), 'user.pii.username') assert schema == '{"name": "username", "type": "string"}' - def test_get_producer_for_signal_unconfigured(self): + def test_serializers_configured(self): + with override_settings(EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345'): + key_ser, value_ser = ep.get_serializers(self.signal, 'user.id') + # We can't actually call them because they want to talk to the schema server. + assert isinstance(key_ser, AvroSerializer) + assert isinstance(value_ser, AvroSerializer) + + def test_serializers_unconfigured(self): + with pytest.raises(Exception, match="missing library or settings"): + ep.get_serializers(self.signal, 'user.id') + + def test_get_producer_unconfigured(self): """With missing essential settings, just warn and return None.""" - signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED with warnings.catch_warnings(record=True) as caught_warnings: warnings.simplefilter('always') - assert ep.get_producer_for_signal(signal, 'user.id') is None + assert ep.get_producer() is None assert len(caught_warnings) == 1 assert str(caught_warnings[0].message).startswith("Cannot configure event-bus-kafka: Missing setting ") - def test_get_producer_for_signal_configured(self): + def test_get_producer_configured(self): """Creation succeeds when all settings are present.""" - signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED with override_settings( EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345', EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_KEY='some_key', @@ -83,7 +92,7 @@ def test_get_producer_for_signal_configured(self): EVENT_BUS_KAFKA_API_KEY='some_other_key', EVENT_BUS_KAFKA_API_SECRET='some_other_secret', ): - assert isinstance(ep.get_producer_for_signal(signal, 'user.id'), SerializingProducer) + assert isinstance(ep.get_producer(), ep.EventProducerKafka) @patch('edx_event_bus_kafka.publishing.event_producer.logger') def test_on_event_deliver(self, mock_logger): @@ -100,13 +109,31 @@ def test_on_event_deliver(self, mock_logger): 'Event delivered to topic some_topic; key=some_key; partition=some_partition' ) - def test_send_to_event_bus(self): - mock_producer = MagicMock() - with patch('edx_event_bus_kafka.publishing.event_producer.get_producer_for_signal', return_value=mock_producer): - ep.send_to_event_bus(self.signal, 'user_stuff', 'user.id', self.event_data) + # Mock out the serializers for this one so we don't have to deal + # with expected Avro bytes -- and they can't call their schema server. + @patch( + 'edx_event_bus_kafka.publishing.event_producer.get_serializers', autospec=True, + return_value=( + lambda _key, _ctx: b'key-bytes-here', + lambda _value, _ctx: b'value-bytes-here', + ) + ) + def test_send_to_event_bus(self, mock_get_serializers): + with override_settings( + EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345', + EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='http://localhost:54321', + ): + producer_api = ep.get_producer() + with patch.object(producer_api, 'producer', autospec=True) as mock_producer: + producer_api.send( + signal=self.signal, topic='user_stuff', + event_key_field='user.id', event_data=self.event_data + ) + + mock_get_serializers.assert_called_once_with(self.signal, 'user.id') mock_producer.produce.assert_called_once_with( - 'user_stuff', key=123, value=self.event_data, + 'user_stuff', key=b'key-bytes-here', value=b'value-bytes-here', on_delivery=ep.on_event_deliver, headers={'ce_type': 'org.openedx.learning.auth.session.login.completed.v1'}, ) From c5fe89a28e0040ada6bb36d97eb541d208c447d9 Mon Sep 17 00:00:00 2001 From: Tim McCormack Date: Mon, 29 Aug 2022 23:22:13 +0000 Subject: [PATCH 4/9] fixup! Tweaks from PR review - Fix comment re: client caching - Use `cache` instead of `lru_cache` (should only ever have one value anyhow) --- edx_event_bus_kafka/config.py | 8 +++++--- edx_event_bus_kafka/publishing/event_producer.py | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/edx_event_bus_kafka/config.py b/edx_event_bus_kafka/config.py index fc76bc6..46eb7e7 100644 --- a/edx_event_bus_kafka/config.py +++ b/edx_event_bus_kafka/config.py @@ -5,7 +5,7 @@ """ import warnings -from functools import lru_cache +from functools import cache from typing import Optional from django.conf import settings @@ -21,12 +21,14 @@ # return type (Optional[SchemaRegistryClient]) removed from signature to avoid error on import -@lru_cache +@cache def get_schema_registry_client(): """ Create a schema registry client from common settings. - This is cached for convenience. + This is cached on the assumption of a performance benefit (avoid reloading settings and + reconstructing client) but it may also be that the client keeps around long-lived + connections that we could benefit from. Returns None if confluent_kafka library is not available or the settings are invalid. diff --git a/edx_event_bus_kafka/publishing/event_producer.py b/edx_event_bus_kafka/publishing/event_producer.py index 084103b..d6e7968 100644 --- a/edx_event_bus_kafka/publishing/event_producer.py +++ b/edx_event_bus_kafka/publishing/event_producer.py @@ -6,7 +6,7 @@ import json import logging -from functools import lru_cache +from functools import cache, lru_cache from typing import Any, List, Optional from django.dispatch import receiver @@ -214,7 +214,7 @@ def pre_shutdown(self): # fall out of scope and be garbage-collected, destroying the # outbound-message queue and threads. The use of this cache allows the # producer to be long-lived. -@lru_cache +@cache def get_producer() -> Optional[EventProducerKafka]: """ Create or retrieve Producer API singleton. From 1278f997155c86dc58aa24a2f8bd9c122eb1d0dd Mon Sep 17 00:00:00 2001 From: Tim McCormack Date: Mon, 29 Aug 2022 23:27:00 +0000 Subject: [PATCH 5/9] fixup! Restore use of lru_cache functools.cache is new in Python 3.9. :-) --- edx_event_bus_kafka/config.py | 4 ++-- edx_event_bus_kafka/publishing/event_producer.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/edx_event_bus_kafka/config.py b/edx_event_bus_kafka/config.py index 46eb7e7..c74d19b 100644 --- a/edx_event_bus_kafka/config.py +++ b/edx_event_bus_kafka/config.py @@ -5,7 +5,7 @@ """ import warnings -from functools import cache +from functools import lru_cache from typing import Optional from django.conf import settings @@ -21,7 +21,7 @@ # return type (Optional[SchemaRegistryClient]) removed from signature to avoid error on import -@cache +@lru_cache def get_schema_registry_client(): """ Create a schema registry client from common settings. diff --git a/edx_event_bus_kafka/publishing/event_producer.py b/edx_event_bus_kafka/publishing/event_producer.py index d6e7968..084103b 100644 --- a/edx_event_bus_kafka/publishing/event_producer.py +++ b/edx_event_bus_kafka/publishing/event_producer.py @@ -6,7 +6,7 @@ import json import logging -from functools import cache, lru_cache +from functools import lru_cache from typing import Any, List, Optional from django.dispatch import receiver @@ -214,7 +214,7 @@ def pre_shutdown(self): # fall out of scope and be garbage-collected, destroying the # outbound-message queue and threads. The use of this cache allows the # producer to be long-lived. -@cache +@lru_cache def get_producer() -> Optional[EventProducerKafka]: """ Create or retrieve Producer API singleton. From 2e09d6d99a4b7627736effc3fda56f7edbdaf486 Mon Sep 17 00:00:00 2001 From: Tim McCormack Date: Wed, 31 Aug 2022 15:58:52 +0000 Subject: [PATCH 6/9] fixup! Rename `pre_shutdown` -> `prepare_for_shutdown` Also update CHANGELOG date prediction. --- CHANGELOG.rst | 4 ++-- edx_event_bus_kafka/management/commands/produce_event.py | 2 +- edx_event_bus_kafka/publishing/event_producer.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 63d6ef1..fb73dfe 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -16,7 +16,7 @@ Unreleased * -[0.5.0] - 2022-08-29 +[0.5.0] - 2022-08-31 ******************** Changed @@ -25,7 +25,7 @@ Changed * **Breaking changes** in the producer module, refactored to expose a better API: * Rather than `send_to_event_bus(...)`, relying code should now call `get_producer().send(...)`. - * The `sync` kwarg is gone; to flush and sync messages before shutdown, call `get_producer().pre_shutdown()` instead. + * The `sync` kwarg is gone; to flush and sync messages before shutdown, call `get_producer().prepare_for_shutdown()` instead. * Clarify that config module is for internal use only. * Implementation changes: Only a single Producer is created, and is used for all signals. diff --git a/edx_event_bus_kafka/management/commands/produce_event.py b/edx_event_bus_kafka/management/commands/produce_event.py index c1d8e66..92fa058 100644 --- a/edx_event_bus_kafka/management/commands/produce_event.py +++ b/edx_event_bus_kafka/management/commands/produce_event.py @@ -60,6 +60,6 @@ def handle(self, *args, **options): event_key_field=options['key_field'][0], event_data=json.loads(options['data'][0]), ) - producer.pre_shutdown() # otherwise command may exit before delivery is complete + producer.prepare_for_shutdown() # otherwise command may exit before delivery is complete except Exception: # pylint: disable=broad-except logger.exception("Error producing Kafka event") diff --git a/edx_event_bus_kafka/publishing/event_producer.py b/edx_event_bus_kafka/publishing/event_producer.py index 084103b..ba02c41 100644 --- a/edx_event_bus_kafka/publishing/event_producer.py +++ b/edx_event_bus_kafka/publishing/event_producer.py @@ -201,7 +201,7 @@ def send( # Issue: https://github.com/openedx/event-bus-kafka/issues/31 self.producer.poll(0) - def pre_shutdown(self): + def prepare_for_shutdown(self): """ Prepare producer for a clean shutdown. From de012d58458b0f5344d7221cfa58e8e2f70dc2b9 Mon Sep 17 00:00:00 2001 From: Tim McCormack Date: Wed, 31 Aug 2022 17:15:07 +0000 Subject: [PATCH 7/9] fixup! comment lru_cache size --- edx_event_bus_kafka/config.py | 2 +- edx_event_bus_kafka/publishing/event_producer.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/edx_event_bus_kafka/config.py b/edx_event_bus_kafka/config.py index c74d19b..ef5bae2 100644 --- a/edx_event_bus_kafka/config.py +++ b/edx_event_bus_kafka/config.py @@ -21,7 +21,7 @@ # return type (Optional[SchemaRegistryClient]) removed from signature to avoid error on import -@lru_cache +@lru_cache # will just be one cache entry, in practice def get_schema_registry_client(): """ Create a schema registry client from common settings. diff --git a/edx_event_bus_kafka/publishing/event_producer.py b/edx_event_bus_kafka/publishing/event_producer.py index ba02c41..7f90176 100644 --- a/edx_event_bus_kafka/publishing/event_producer.py +++ b/edx_event_bus_kafka/publishing/event_producer.py @@ -214,7 +214,7 @@ def prepare_for_shutdown(self): # fall out of scope and be garbage-collected, destroying the # outbound-message queue and threads. The use of this cache allows the # producer to be long-lived. -@lru_cache +@lru_cache # will just be one cache entry, in practice def get_producer() -> Optional[EventProducerKafka]: """ Create or retrieve Producer API singleton. From 3e63553abdc29ff39b18bdf824a6c4ad888088cc Mon Sep 17 00:00:00 2001 From: Tim McCormack Date: Wed, 31 Aug 2022 18:12:39 +0000 Subject: [PATCH 8/9] fixup! Tweak reset-cache docstring --- edx_event_bus_kafka/config.py | 2 +- edx_event_bus_kafka/publishing/event_producer.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/edx_event_bus_kafka/config.py b/edx_event_bus_kafka/config.py index ef5bae2..dc44b3a 100644 --- a/edx_event_bus_kafka/config.py +++ b/edx_event_bus_kafka/config.py @@ -83,5 +83,5 @@ def load_common_settings() -> Optional[dict]: @receiver(setting_changed) def _reset_state(sender, **kwargs): # pylint: disable=unused-argument - """Reset caches during testing when settings change.""" + """Resets caches when settings change during unit tests.""" get_schema_registry_client.cache_clear() diff --git a/edx_event_bus_kafka/publishing/event_producer.py b/edx_event_bus_kafka/publishing/event_producer.py index 7f90176..55926b8 100644 --- a/edx_event_bus_kafka/publishing/event_producer.py +++ b/edx_event_bus_kafka/publishing/event_producer.py @@ -253,6 +253,6 @@ def on_event_deliver(err, evt): @receiver(setting_changed) def _reset_caches(sender, **kwargs): # pylint: disable=unused-argument - """Reset caches during testing when settings change.""" + """Resets caches when settings change during unit tests.""" get_serializers.cache_clear() get_producer.cache_clear() From 46f3c91643a5c11d7be5a1d49edb193279e2237f Mon Sep 17 00:00:00 2001 From: Tim McCormack Date: Wed, 31 Aug 2022 18:28:04 +0000 Subject: [PATCH 9/9] fixup! Fair enough, linter. Fair enough. --- edx_event_bus_kafka/config.py | 2 +- edx_event_bus_kafka/publishing/event_producer.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/edx_event_bus_kafka/config.py b/edx_event_bus_kafka/config.py index dc44b3a..886231d 100644 --- a/edx_event_bus_kafka/config.py +++ b/edx_event_bus_kafka/config.py @@ -83,5 +83,5 @@ def load_common_settings() -> Optional[dict]: @receiver(setting_changed) def _reset_state(sender, **kwargs): # pylint: disable=unused-argument - """Resets caches when settings change during unit tests.""" + """Reset caches when settings change during unit tests.""" get_schema_registry_client.cache_clear() diff --git a/edx_event_bus_kafka/publishing/event_producer.py b/edx_event_bus_kafka/publishing/event_producer.py index 55926b8..4503d10 100644 --- a/edx_event_bus_kafka/publishing/event_producer.py +++ b/edx_event_bus_kafka/publishing/event_producer.py @@ -253,6 +253,6 @@ def on_event_deliver(err, evt): @receiver(setting_changed) def _reset_caches(sender, **kwargs): # pylint: disable=unused-argument - """Resets caches when settings change during unit tests.""" + """Reset caches when settings change during unit tests.""" get_serializers.cache_clear() get_producer.cache_clear()