-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fixup! Improve coverage and error messages
- Provide better error message when get_serializers cannot get a client - Add basic test coverage for serializer construction I can't figure out how to test the serializers, though -- they want to talk to a server. Also: - Lift test data to be instance variables
- Loading branch information
Showing
2 changed files
with
34 additions
and
24 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,12 +14,20 @@ | |
|
||
import edx_event_bus_kafka.publishing.event_producer as ep | ||
|
||
# See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst | ||
try: | ||
from confluent_kafka.schema_registry.avro import AvroSerializer | ||
except ImportError: # pragma: no cover | ||
pass | ||
|
||
|
||
class TestEventProducer(TestCase): | ||
"""Test producer.""" | ||
|
||
def test_extract_event_key(self): | ||
event_data = { | ||
def setUp(self): | ||
super().setUp() | ||
self.signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED | ||
self.event_data = { | ||
'user': UserData( | ||
id=123, | ||
is_active=True, | ||
|
@@ -31,17 +39,17 @@ def test_extract_event_key(self): | |
) | ||
} | ||
|
||
assert ep.extract_event_key(event_data, 'user.pii.username') == 'foobob' | ||
def test_extract_event_key(self): | ||
assert ep.extract_event_key(self.event_data, 'user.pii.username') == 'foobob' | ||
with pytest.raises(Exception, | ||
match="Could not extract key from event; lookup in xxx failed at 'xxx' in dictionary"): | ||
ep.extract_event_key(event_data, 'xxx') | ||
ep.extract_event_key(self.event_data, 'xxx') | ||
with pytest.raises(Exception, | ||
match="Could not extract key from event; lookup in user.xxx failed at 'xxx' in object"): | ||
ep.extract_event_key(event_data, 'user.xxx') | ||
ep.extract_event_key(self.event_data, 'user.xxx') | ||
|
||
def test_descend_avro_schema(self): | ||
signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED | ||
schema = AvroSignalSerializer(signal).schema | ||
schema = AvroSignalSerializer(self.signal).schema | ||
|
||
assert ep.descend_avro_schema(schema, ['user', 'pii', 'username']) == {"name": "username", "type": "string"} | ||
|
||
|
@@ -51,10 +59,19 @@ def test_descend_avro_schema(self): | |
assert isinstance(excinfo.value.__cause__, IndexError) | ||
|
||
def test_extract_key_schema(self): | ||
signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED | ||
schema = ep.extract_key_schema(AvroSignalSerializer(signal), 'user.pii.username') | ||
schema = ep.extract_key_schema(AvroSignalSerializer(self.signal), 'user.pii.username') | ||
assert schema == '{"name": "username", "type": "string"}' | ||
|
||
def test_serializers_configured(self): | ||
with override_settings(EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345'): | ||
key_ser, value_ser = ep.get_serializers(self.signal, 'user.id') | ||
assert isinstance(key_ser, AvroSerializer) | ||
assert isinstance(value_ser, AvroSerializer) | ||
|
||
def test_serializers_unconfigured(self): | ||
with pytest.raises(Exception, match="missing library or settings"): | ||
ep.get_serializers(self.signal, 'user.id') | ||
|
||
def test_get_producer_unconfigured(self): | ||
"""With missing essential settings, just warn and return None.""" | ||
with warnings.catch_warnings(record=True) as caught_warnings: | ||
|
@@ -100,28 +117,18 @@ def test_on_event_deliver(self, mock_logger): | |
) | ||
) | ||
def test_send_to_event_bus(self, mock_get_serializers): | ||
signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED | ||
event_data = { | ||
'user': UserData( | ||
id=123, | ||
is_active=True, | ||
pii=UserPersonalData( | ||
username='foobob', | ||
email='[email protected]', | ||
name="Bob Foo", | ||
) | ||
) | ||
} | ||
|
||
with override_settings( | ||
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345', | ||
EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='http://localhost:54321', | ||
): | ||
producer_api = ep.get_producer() | ||
with patch.object(producer_api, 'producer', autospec=True) as mock_producer: | ||
producer_api.send(signal=signal, topic='user_stuff', event_key_field='user.id', event_data=event_data) | ||
producer_api.send( | ||
signal=self.signal, topic='user_stuff', | ||
event_key_field='user.id', event_data=self.event_data | ||
) | ||
|
||
mock_get_serializers.assert_called_once_with(signal, 'user.id') | ||
mock_get_serializers.assert_called_once_with(self.signal, 'user.id') | ||
|
||
mock_producer.produce.assert_called_once_with( | ||
'user_stuff', key=b'key-bytes-here', value=b'value-bytes-here', | ||
|