Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate Asn1DecodedDataRouter to use Spring Kafka #131

Draft
wants to merge 175 commits into
base: dev
Choose a base branch
from

Conversation

mcook42
Copy link

@mcook42 mcook42 commented Dec 11, 2024

PR Details

Description

  • Sealed MessageSerializer and MessageDeserializer classes in preparation for marking them final after the full Spring Kafka migration is complete
  • Removed now-obsolete filters from KafkaConsumerConfig. All Asn1DecodedData is routed through one listener again, so the filter isn't needed anymore
  • Replaced the Asn1DecodedDataRouter with the Spring Kafka implementation (previously named Asn1DecodedDataListener)
  • Refactored all Kafka-related tests to use a more standard, less bug-prone approach to topic, consumer, and producer creation
  • (Optional) Deleted and deprecated unnecessary MessageSerializer and MessageDeserializer generic class extensions.

Related Issue

Motivation and Context

Implementing Spring Kafka gives us better lifecycle management of producers and consumers, more reusable producer/consumer code, easier testability, and a more robust production-ready Kafka library. This is part of a more significant effort to replace our hand-rolled Kafka implementation with Spring Kafka. The previous changesets related to this effort are #118, #116, #123, and #129

How Has This Been Tested?

Unit and Integration tests were added before making any functional changes. These continue to pass after the code changes are made. Some additional unit and integration tests were added after the functionality was changed to increase coverage. I have also run all data from the udpsender_[msgType].py scripts found under scripts/tests through a live local system started up with make rebuild. I confirmed there were no errors in the logs. I also confirmed that all expected messages were produced to and consumed from the correct queues by using the kafka-ui container available at localhost:8001 (on my local machine, of course).

Types of changes

  • Defect fix (non-breaking change that fixes an issue)
  • New feature (non-breaking change that adds functionality)
  • Breaking change (fix or feature that cause existing functionality to change)

Checklist:

  • I have added any new packages to the sonar-scanner.properties file
  • My change requires a change to the documentation.
  • I have updated the documentation accordingly.
  • I have read the CONTRIBUTING document.
    ODE Contributing Guide
  • I have added tests to cover my changes.
  • All new and existing tests passed.

…removing from both, and comparing the expected and actual JSON
…Test_ValidSSM.txt and the resulting SsmReceiverTest_ValidSSM_expected.json
…erTest_ValidSPAT.txt and the resulting SpatReceiverTest_ValidSPAT_expected.json
…Test_ValidPSM.txt and the resulting PsmReceiverTest_ValidPSM_expected.json
…Test_ValidSRM.txt and the resulting SrmReceiverTest_ValidSRM_expected.json
Since this test suite produces to and consumes from the same topics in multiple tests, we needed a better way than `getSingleRecord` to perform test assertions. By using uuids as keys, we are able to reuse the same topics for multiple tests and select the correct records for test assertions without risk of tests conflicting with one another.
This change helps simplify managing unique topic names for the disabledTopics set provided by OdeKafkaProperties. It is necessary to prevent data from other tests leaking onto the disabled topics and causing intermittent test failures in this suite
@@ -27,4 +27,4 @@ jobs:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
run: |
ls -la && pwd
mvn -e -X clean org.jacoco:jacoco-maven-plugin:prepare-agent package sonar:sonar -Dsonar.projectKey=usdot.jpo.ode:jpo-ode -Dsonar.projectName=jpo-ode -Dsonar.organization=usdot-jpo-ode -Dsonar.host.url=https://sonarcloud.io -Dsonar.branch.name=$GITHUB_REF_NAME
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: -X enables debug logging. It generated so much text that GH's runners were struggling to render test failures in the browser. Turning off debug logging doesn't mean we'll lose the information needed to determine the cause of test failures. It just makes finding failures in the logs easier

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I like this

Comment on lines +3 to +9
on:
pull_request:
push:
branches:
- "develop"
- "master"
- "release/*"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this change will stop us from running the ci workflow every time a push to any branch happens. This will restrict the ci workflow to only the push to our main branches and any time a PR is opened or updated. It should reduce unnecessary costs associated with running the workflow

import us.dot.its.jpo.ode.util.SerializationUtils;

public class MessagingDeserializer<T> implements Deserializer<T> {

SerializationUtils<T> deserializer = new SerializationUtils<T>();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: default methods are provided for both configure and close, so overriding them is unnecessary


SerializationUtils<T> serializer = new SerializationUtils<T>();

@Override
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: default methods are provided for both configure and close, so overriding them is unnecessary

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: much of this code comes from the original implementations at Asn1DecodedDataListener and services/Asn1DecodedDataRouter

@mcook42 mcook42 marked this pull request as ready for review December 12, 2024 22:24
Copy link
Collaborator

@drewjj drewjj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! All the tests look good and pass. Each message type's decoded sample data is accurate for the input/output. I just have one minor comment about a debug log.

However.. decoded BSM messages don't seem to show up in the topic.OdeBsmJson or topic.OdeBsmPojo topics. The tests evidently pass for the BSM decode routing but I wonder what the difference is between the test and a deployed environment.

@@ -27,4 +27,4 @@ jobs:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
run: |
ls -la && pwd
mvn -e -X clean org.jacoco:jacoco-maven-plugin:prepare-agent package sonar:sonar -Dsonar.projectKey=usdot.jpo.ode:jpo-ode -Dsonar.projectName=jpo-ode -Dsonar.organization=usdot-jpo-ode -Dsonar.host.url=https://sonarcloud.io -Dsonar.branch.name=$GITHUB_REF_NAME
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I like this

id = "Asn1DecodedDataRouter",
topics = "${ode.kafka.topics.asn1.decoder-output}"
)
public void listen(ConsumerRecord<String, String> consumerRecord) throws XmlUtilsException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this now. Very clean!

…oggingProducerListener.java

Co-authored-by: Drew Johnston <[email protected]>
Comment on lines 22 to 35
/**
* MessagingDeserializer is a generic base class implementing the Kafka Deserializer interface to
* provide serialization of objects for use in Kafka messages.
*
* <p>This class uses a generic type parameter, allowing it to handle serialization
* of various types. Internal serialization is performed using an instance of the SerializationUtils
* class, which leverages Kryo for efficient object serialization.</p>
*
* <p>The class is declared as sealed, restricting which other classes can directly extend it. It
* will
* soon be marked as final to prevent incorrect usage through unnecessary subtyping</p>
*
* @param <T> the type of data to be serialized
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(blocking) These comments indicate that the class is for serialization, but the class is for deserialization.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

corrected in d7f2bbe

* data by processing and forwarding it to different topics based on specific criteria.
*
* <p>This listener is specifically designed to handle decoded data produced by the asn1_codec.
* Upon receiving a payload, it uses transforms the payload and then determines the appropriate
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(blocking) Looks like a typo here: 'uses transforms' should probably just be 'transforms'

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

corrected in d7f2bbe

*
* @param kafkaTemplate the KafkaTemplate used for sending messages to Kafka topics.
*/
public Asn1DecodedDataRouter(KafkaTemplate<String, String> kafkaTemplate,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(non-blocking) Is the @Autowired annotation necessary here, or does the class @Component annotation allow Spring to wire everything up without it?

@mcook42 mcook42 marked this pull request as draft December 13, 2024 00:12
@mcook42
Copy link
Author

mcook42 commented Dec 13, 2024

marking as draft until I can figure out a nice way to handle the disabled topic exceptions blocking the rest of the code execution

…eptingKafkaTemplate

This allows us to prevent sending to disabled topics without throwing exceptions. It allows graceful blocking of sends to disabled topics where the previous exception-driven approach was interrupting normal code paths.
@mcook42
Copy link
Author

mcook42 commented Dec 13, 2024

image

@drewjj I'm able to confirm that the updates in da34d26 allow us to publish to the topic.OdeBsmJson after blocking the attempt to send to topicOdeBsmTxPojo when running locally.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants