Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: Switch to a single Producer, wrapped in an API singleton #32

Merged
merged 9 commits into from
Aug 31, 2022
14 changes: 14 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,20 @@ Unreleased

*

[0.5.0] - 2022-08-29
********************

Changed
=======

* **Breaking changes** in the producer module, refactored to expose a better API:

* Rather than `send_to_event_bus(...)`, relying code should now call `get_producer().send(...)`.
* The `sync` kwarg is gone; to flush and sync messages before shutdown, call `get_producer().pre_shutdown()` instead.

* Clarify that config module is for internal use only.
* Implementation changes: Only a single Producer is created, and is used for all signals.

[0.4.4] - 2022-08-26
********************

Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Kafka implementation for Open edX event bus.
"""

__version__ = '0.4.4'
__version__ = '0.5.0'
16 changes: 15 additions & 1 deletion edx_event_bus_kafka/config.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
"""
Configuration loading and validation.

This module is for internal use only.
"""

import warnings
from functools import lru_cache
from typing import Optional

from django.conf import settings
from django.dispatch import receiver
from django.test.signals import setting_changed

# See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst
try:
Expand All @@ -16,10 +21,13 @@


# return type (Optional[SchemaRegistryClient]) removed from signature to avoid error on import
def create_schema_registry_client():
@lru_cache
timmc-edx marked this conversation as resolved.
Show resolved Hide resolved
def get_schema_registry_client():
"""
Create a schema registry client from common settings.

This is cached for convenience.
timmc-edx marked this conversation as resolved.
Show resolved Hide resolved

Returns
None if confluent_kafka library is not available or the settings are invalid.
SchemaRegistryClient if it is.
Expand Down Expand Up @@ -69,3 +77,9 @@ def load_common_settings() -> Optional[dict]:
})

return base_settings


@receiver(setting_changed)
def _reset_state(sender, **kwargs): # pylint: disable=unused-argument
"""Reset caches during testing when settings change."""
get_schema_registry_client.cache_clear()
timmc-edx marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 2 additions & 2 deletions edx_event_bus_kafka/consumer/event_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from openedx_events.learning.signals import SESSION_LOGIN_COMPLETED
from openedx_events.tooling import OpenEdxPublicSignal

from edx_event_bus_kafka.config import create_schema_registry_client, load_common_settings
from edx_event_bus_kafka.config import get_schema_registry_client, load_common_settings

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -68,7 +68,7 @@ def _create_consumer(self):
DeserializingConsumer if it is.
"""

schema_registry_client = create_schema_registry_client()
schema_registry_client = get_schema_registry_client()

# TODO (EventBus):
# 1. Reevaluate if all consumers should listen for the earliest unprocessed offset (auto.offset.reset)
Expand Down
7 changes: 4 additions & 3 deletions edx_event_bus_kafka/management/commands/produce_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from django.utils.module_loading import import_string
from openedx_events.tooling import OpenEdxPublicSignal

from edx_event_bus_kafka.publishing.event_producer import send_to_event_bus
from edx_event_bus_kafka.publishing.event_producer import get_producer

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -53,12 +53,13 @@ def add_arguments(self, parser):

def handle(self, *args, **options):
try:
send_to_event_bus(
producer = get_producer()
producer.send(
signal=import_string(options['signal'][0]),
topic=options['topic'][0],
event_key_field=options['key_field'][0],
event_data=json.loads(options['data'][0]),
sync=True, # otherwise command may exit before delivery is complete
)
producer.pre_shutdown() # otherwise command may exit before delivery is complete
except Exception: # pylint: disable=broad-except
logger.exception("Error producing Kafka event")
204 changes: 99 additions & 105 deletions edx_event_bus_kafka/publishing/event_producer.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
"""
Produce Kafka events from signals.

Main function is ``send_to_event_bus``.
Main function is ``get_producer()``.
"""

import json
import logging
from functools import lru_cache
from typing import Any, List
from typing import Any, List, Optional

from django.dispatch import receiver
from django.test.signals import setting_changed
from openedx_events.event_bus.avro.serializer import AvroSignalSerializer
from openedx_events.tooling import OpenEdxPublicSignal

from edx_event_bus_kafka.config import create_schema_registry_client, load_common_settings
from edx_event_bus_kafka.config import get_schema_registry_client, load_common_settings

logger = logging.getLogger(__name__)

# See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst
try:
import confluent_kafka
from confluent_kafka import SerializingProducer
from confluent_kafka import Producer
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import MessageField, SerializationContext
except ImportError: # pragma: no cover
confluent_kafka = None

Expand Down Expand Up @@ -113,63 +114,25 @@ def extract_key_schema(signal_serializer: AvroSignalSerializer, event_key_field:


@lru_cache
def get_serializer(signal: OpenEdxPublicSignal) -> AvroSignalSerializer:
def get_serializers(signal: OpenEdxPublicSignal, event_key_field: str):
robrap marked this conversation as resolved.
Show resolved Hide resolved
"""
Get the serializer for a signal.
Get the key and value serializers for a signal and a key field path.

This is cached in order to save work re-transforming classes into Avro schemas.
"""
return AvroSignalSerializer(signal)


# Note: This caching is required, since otherwise the Producer will
# fall out of scope and be garbage-collected, destroying the
# outbound-message queue and threads. The use of this cache allows the
# producers to be long-lived.
#
# We are also likely to need to iterate through this cache at server
# shutdown in order to flush each of the producers, which means the
# cache needs to never evict. See https://github.com/openedx/event-bus-kafka/issues/11
# for more details.
#
# (Why not change the code to use a single Producer rather than multiple
# SerializerProducer? Because the code actually turns out to be significantly
# uglier that way due to the number of separate values that need to be passed
# around in bundles. There aren't clear "cut-off" points. Additionally, it
# makes unit testing harder/uglier since now the mocks need to either deal with
# serialized bytes or mock out the serializers. Getting this down to a single
# Producer doesn't really seem worth the trouble.)

# return type (Optional[SerializingProducer]) removed from signature to avoid error on import

@lru_cache(maxsize=None) # Never evict an entry -- it's a small set and we need to keep all of them.
def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str):
"""
Create the producer for a signal and a key field path.

If essential settings are missing or invalid, warn and return None.

Arguments:
signal: The OpenEdxPublicSignal to make a producer for
event_key_field: Path to the event data field to use as the event key (period-delimited
string naming the dictionary keys to descend)
signal: The OpenEdxPublicSignal to make a serializer for.
event_key_field: Path to descend in the signal schema to find the subschema for the key
(period-delimited string naming the field names to descend).

Returns:
None if confluent_kafka is not defined or the settings are invalid.
SerializingProducer if it is.
2-tuple of AvroSignalSerializers, for event key and value
"""
if not confluent_kafka: # pragma: no cover
logger.warning('Library confluent-kafka not available. Cannot create event producer.')
return None
client = get_schema_registry_client()
if client is None:
raise Exception('Cannot create Kafka serializers -- missing library or settings')
robrap marked this conversation as resolved.
Show resolved Hide resolved

schema_registry_client = create_schema_registry_client()
if schema_registry_client is None:
return None

producer_settings = load_common_settings()
if producer_settings is None:
return None

signal_serializer = get_serializer(signal)
signal_serializer = AvroSignalSerializer(signal)

def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument
"""Tells Avro how to turn objects into dictionaries."""
Expand All @@ -178,21 +141,95 @@ def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument
# Serializers for key and value components of Kafka event
key_serializer = AvroSerializer(
schema_str=extract_key_schema(signal_serializer, event_key_field),
schema_registry_client=schema_registry_client,
schema_registry_client=client,
to_dict=inner_to_dict,
)
value_serializer = AvroSerializer(
schema_str=signal_serializer.schema_string(),
schema_registry_client=schema_registry_client,
schema_registry_client=client,
to_dict=inner_to_dict,
)

producer_settings.update({
'key.serializer': key_serializer,
'value.serializer': value_serializer,
})
return key_serializer, value_serializer


class EventProducerKafka():
"""
API singleton for event production to Kafka.

This is just a wrapper around a confluent_kafka Producer that knows how to
serialize a signal to event wire format.

Only one instance (of Producer or this wrapper) should be created,
since it is stateful and needs lifecycle management.
"""

def __init__(self, producer):
self.producer = producer

def send(
self, *, signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict,
) -> None:
"""
Send a signal event to the event bus under the specified topic.

Arguments:
signal: The original OpenEdxPublicSignal the event was sent to
topic: The event bus topic for the event
event_key_field: Path to the event data field to use as the event key (period-delimited
string naming the dictionary keys to descend)
event_data: The event data (kwargs) sent to the signal
"""
event_key = extract_event_key(event_data, event_key_field)
headers = {EVENT_TYPE_HEADER_KEY: signal.event_type}

key_serializer, value_serializer = get_serializers(signal, event_key_field)
key_bytes = key_serializer(event_key, SerializationContext(topic, MessageField.KEY, headers))
value_bytes = value_serializer(event_data, SerializationContext(topic, MessageField.VALUE, headers))

self.producer.produce(
topic, key=key_bytes, value=value_bytes, headers=headers, on_delivery=on_event_deliver,
)

# Opportunistically ensure any pending callbacks from recent event-sends are triggered.
#
# This assumes events come regularly, or that we're not concerned about
# high latency between delivery and callback. If those assumptions are
# false, we should switch to calling poll(1.0) or similar in a loop on
# a separate thread. Or do both.
#
# Issue: https://github.com/openedx/event-bus-kafka/issues/31
self.producer.poll(0)

def pre_shutdown(self):
robrap marked this conversation as resolved.
Show resolved Hide resolved
"""
Prepare producer for a clean shutdown.

return SerializingProducer(producer_settings)
Flush pending outbound events, wait for acknowledgement, and process callbacks.
"""
self.producer.flush(-1)


# Note: This caching is required, since otherwise the Producer will
# fall out of scope and be garbage-collected, destroying the
# outbound-message queue and threads. The use of this cache allows the
# producer to be long-lived.
@lru_cache
timmc-edx marked this conversation as resolved.
Show resolved Hide resolved
def get_producer() -> Optional[EventProducerKafka]:
"""
Create or retrieve Producer API singleton.

If confluent-kafka library or essential settings are missing, warn and return None.
"""
if not confluent_kafka: # pragma: no cover
logger.warning('Library confluent-kafka not available. Cannot create event producer.')
return None

producer_settings = load_common_settings()
if producer_settings is None:
return None

return EventProducerKafka(Producer(producer_settings))


def on_event_deliver(err, evt):
Expand All @@ -214,51 +251,8 @@ def on_event_deliver(err, evt):
f"partition={evt.partition()}")


def send_to_event_bus(
signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict,
sync: bool = False,
) -> None:
"""
Send a signal event to the event bus under the specified topic.

If the Kafka settings are missing or invalid, return with a warning.

Arguments:
signal: The original OpenEdxPublicSignal the event was sent to
topic: The event bus topic for the event
event_key_field: Path to the event data field to use as the event key (period-delimited
string naming the dictionary keys to descend)
event_data: The event data (kwargs) sent to the signal
sync: Whether to wait indefinitely for event to be received by the message bus (probably
only want to use this for testing)
"""
producer = get_producer_for_signal(signal, event_key_field)
if producer is None: # Note: SerializingProducer has False truthiness when len() == 0
return

event_key = extract_event_key(event_data, event_key_field)
producer.produce(topic, key=event_key, value=event_data,
on_delivery=on_event_deliver,
headers={EVENT_TYPE_HEADER_KEY: signal.event_type})

if sync:
# Wait for all buffered events to send, then wait for all of
# them to be acknowledged, and trigger all callbacks.
producer.flush(-1)
else:
# Opportunistically ensure any pending callbacks from recent events are triggered.
#
# This assumes events come regularly, or that we're not concerned about
# high latency between delivery and callback. If those assumptions are
# false, we should switch to calling poll(1.0) or similar in a loop on
# a separate thread.
#
# Docs: https://github.com/edenhill/librdkafka/blob/4faeb8132521da70b6bcde14423a14eb7ed5c55e/src/rdkafka.h#L3079
producer.poll(0)


@receiver(setting_changed)
def _reset_caches(sender, **kwargs): # pylint: disable=unused-argument
"""Reset caches during testing when settings change."""
timmc-edx marked this conversation as resolved.
Show resolved Hide resolved
get_serializer.cache_clear()
get_producer_for_signal.cache_clear()
get_serializers.cache_clear()
get_producer.cache_clear()
Loading