diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 9d4da68..ec3652b 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -16,6 +16,14 @@ Unreleased * +[0.3.3] - 2023-10-13 +************************************************ + +Added +===== +* Use utility from openedx_events to reset application state before processing + event + [0.3.2] - 2023-09-01 ************************************************ diff --git a/edx_event_bus_redis/__init__.py b/edx_event_bus_redis/__init__.py index dc14858..7d8f35a 100644 --- a/edx_event_bus_redis/__init__.py +++ b/edx_event_bus_redis/__init__.py @@ -5,6 +5,6 @@ from edx_event_bus_redis.internal.consumer import RedisEventConsumer from edx_event_bus_redis.internal.producer import create_producer -__version__ = '0.3.2' +__version__ = '0.3.3' default_app_config = 'edx_event_bus_redis.apps.EdxEventBusRedisConfig' # pylint: disable=invalid-name diff --git a/edx_event_bus_redis/internal/consumer.py b/edx_event_bus_redis/internal/consumer.py index b968695..7b8bfe2 100644 --- a/edx_event_bus_redis/internal/consumer.py +++ b/edx_event_bus_redis/internal/consumer.py @@ -11,7 +11,7 @@ from edx_toggles.toggles import SettingToggle from openedx_events.event_bus import EventBusConsumer from openedx_events.event_bus.avro.deserializer import deserialize_bytes_to_event_data -from openedx_events.tooling import OpenEdxPublicSignal +from openedx_events.tooling import OpenEdxPublicSignal, prepare_for_new_work_cycle from redis.exceptions import ConnectionError as RedisConnectionError from redis.exceptions import ResponseError from walrus import Database @@ -70,21 +70,6 @@ class EventConsumptionException(Exception): """ -def _reconnect_to_db_if_needed(): - """ - Reconnects the db connection if needed. - - This is important because Django only does connection validity/age checks as part of - its request/response cycle, which isn't in effect for the consume-loop. If we don't - force these checks, a broken connection will remain broken indefinitely. For most - consumers, this will cause event processing to fail. - """ - has_connection = bool(connection.connection) - requires_reconnect = has_connection and not connection.is_usable() - if requires_reconnect: - connection.connect() - - class RedisEventConsumer(EventBusConsumer): """ Construct consumer for the given topic and group. The consumer can then @@ -240,8 +225,9 @@ def _consume_indefinitely(self): if isinstance(redis_raw_msg, list): redis_raw_msg = redis_raw_msg[0] msg = RedisMessage.parse(redis_raw_msg, self.full_topic) - # Before processing, make sure our db connection is still active - _reconnect_to_db_if_needed() + # Before processing, try to make sure our application state is cleaned + # up as would happen at the start of a Django request/response cycle. + prepare_for_new_work_cycle() self.emit_signals_from_message(msg) consecutive_errors = 0 diff --git a/requirements/base.in b/requirements/base.in index 021528e..ef8bf2c 100644 --- a/requirements/base.in +++ b/requirements/base.in @@ -2,8 +2,8 @@ -c constraints.txt Django # Web application framework -# openedx-events 8.0.0 removes an argument from consumer initialization -openedx-events>=8.0.0 # Events API +# openedx-events 9.0.1 adds utility to reset application state +openedx-events>=9.0.1 # Events API edx_django_utils edx_toggles