Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate Asn1EncodedDataRouter to use Spring Kafka #138

Draft
wants to merge 136 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 113 commits
Commits
Show all changes
136 commits
Select commit Hold shift + click to select a range
35f386b
style: reformat Asn1EncodedDataRouter and Asn1CommandManager
mcook42 Dec 17, 2024
f7000c9
chore: delete disabled tests from Asn1EncodedDataRouterTest to start …
mcook42 Dec 17, 2024
62b34b3
refactor: extract nested logic to named methods
mcook42 Dec 17, 2024
d04010d
test: add asn1-encoder-output-tim.xml
mcook42 Dec 18, 2024
b51b14a
refactor: encapsulate date setting logic for code clarity
mcook42 Dec 18, 2024
23821e4
chore: rename asn1-encoder-output-tim.xml to asn1-encoder-output-unsi…
mcook42 Dec 18, 2024
1cb9d97
refactor: pull object creation up to allow mocking of dependencies
mcook42 Dec 18, 2024
f4147e4
test: initial processSNMPDeposit test setup. encoder not consuming
mcook42 Dec 18, 2024
eed0a82
test: processEncodedTimUnsecured_depositsToSdxTopic
mcook42 Dec 18, 2024
c43c3e8
test: processEncodedTimUnsecured_depositsToSdxTopic -> processEncoded…
mcook42 Dec 18, 2024
785795a
refactor: pass asn1CommandManager into Asn1EncodedDataRouter to impro…
mcook42 Dec 18, 2024
80295bc
refactor: introduce ISecurityServicesClient and SecurityServicesClien…
mcook42 Dec 18, 2024
1011688
refactor: use ISecurityServicesClient in Asn1EncodedDataRouter
mcook42 Dec 18, 2024
6a85fbf
test: processSNMPDepositOnly
mcook42 Dec 19, 2024
3dce2a4
style: update license header to not be javadoc
mcook42 Dec 19, 2024
3898ad9
test: update tests in Asn1EncodedDataRouterTest to enable running tog…
mcook42 Dec 19, 2024
225960c
test: processEncodedTimUnsecured in Asn1EncodedDataRouterTest
mcook42 Dec 19, 2024
1aba9e0
test: make consumer groups unique between tests to keep them isolated
mcook42 Dec 19, 2024
e7e7bf1
test: tweak securityServices settings to get code to run through proc…
mcook42 Dec 19, 2024
7aa4920
test: disable all Asn1CommandManagerTest tests as they don't work wit…
mcook42 Dec 19, 2024
7e69562
refactor: move asn1CommandManager.depositToSdw to Asn1EncodedDataRouter
mcook42 Dec 19, 2024
ab2a633
style: added missing Javadocs and reorganized Asn1CommandManager
mcook42 Dec 19, 2024
b4473a3
refactor: don't swallow exception on startup of rsuDepositor in Asn1C…
mcook42 Dec 19, 2024
c1f1178
refactor: ResponseEvent to use generic Address type
mcook42 Dec 19, 2024
79e0e0c
style: reformat and add missing Javadocs SnmpSession
mcook42 Dec 19, 2024
24fbba7
chore: delete unused AsnCodecRouterServiceController.java
mcook42 Dec 19, 2024
c99bbe8
refactor: pull Asn1CommandManager functionality into Asn1EncodedDataR…
mcook42 Dec 19, 2024
cdb14a2
refactor: replace SDXDepositorTopics.java reference with Value refere…
mcook42 Dec 19, 2024
da1b379
refactor: introduce Config beans for Asn1EncodedDataRouter dependencies
mcook42 Dec 19, 2024
1c0e260
refactor: convert Asn1EncodedDataRouter to KafkaListener and update t…
mcook42 Dec 19, 2024
5060560
Revert "refactor: convert Asn1EncodedDataRouter to KafkaListener and …
mcook42 Dec 20, 2024
d5e8f1b
Revert "refactor: introduce Config beans for Asn1EncodedDataRouter de…
mcook42 Dec 20, 2024
b57d766
Revert "refactor: replace SDXDepositorTopics.java reference with Valu…
mcook42 Dec 20, 2024
34a9dc1
Revert "refactor: pull Asn1CommandManager functionality into Asn1Enco…
mcook42 Dec 20, 2024
692d3b2
Revert "chore: delete unused AsnCodecRouterServiceController.java"
mcook42 Dec 20, 2024
f08c0fd
Revert "style: reformat and add missing Javadocs SnmpSession"
mcook42 Dec 20, 2024
f05adff
Revert "refactor: ResponseEvent to use generic Address type"
mcook42 Dec 20, 2024
1bc84ab
Revert "refactor: don't swallow exception on startup of rsuDepositor …
mcook42 Dec 20, 2024
bd4b72e
Revert "style: added missing Javadocs and reorganized Asn1CommandMana…
mcook42 Dec 20, 2024
d898850
Revert "refactor: move asn1CommandManager.depositToSdw to Asn1Encoded…
mcook42 Dec 20, 2024
e995cd1
test: adjust tests to expect different streamId when message is signed
mcook42 Dec 23, 2024
9a5ade1
refactor: move asn1CommandManager.depositToSdw to Asn1EncodedDataRouter
mcook42 Dec 19, 2024
5acc7d8
style: added missing Javadocs and reorganized Asn1CommandManager
mcook42 Dec 19, 2024
ca18d0a
refactor: don't swallow exception on startup of rsuDepositor in Asn1C…
mcook42 Dec 19, 2024
fb9d43a
refactor: ResponseEvent to use generic Address type
mcook42 Dec 19, 2024
e5d4b26
style: reformat and add missing Javadocs SnmpSession
mcook42 Dec 19, 2024
edce8c7
chore: delete unused AsnCodecRouterServiceController.java
mcook42 Dec 19, 2024
6ffa17c
refactor: pull Asn1CommandManager functionality into Asn1EncodedDataR…
mcook42 Dec 19, 2024
9a652e2
refactor: replace SDXDepositorTopics.java reference with Value refere…
mcook42 Dec 19, 2024
b82e513
refactor: introduce Config beans for Asn1EncodedDataRouter dependencies
mcook42 Dec 19, 2024
70ebddf
refactor: convert Asn1EncodedDataRouter to KafkaListener and update t…
mcook42 Dec 19, 2024
5722486
chore: delete now unused Asn1CommandManager.java
mcook42 Dec 23, 2024
4f4737d
refactor: isRunning now only checks for RUNNING in OdeTimJsonTopology
mcook42 Dec 23, 2024
2a8b541
test: Asn1EncodedDataRouterTest await running state of odeTimJsonTopo…
mcook42 Dec 23, 2024
975c3f2
chore: use one mock ISecurityServicesClient for the test suite
mcook42 Dec 23, 2024
1bb81ec
chore: properly cleanup OdeTimJsonTopology on shutdown
mcook42 Dec 23, 2024
0fc4bdf
chore: set destroyMethod for odeTimJsonTopology bean
mcook42 Dec 23, 2024
b85bbcb
chore: more gracefully handle/prevent errors when adding topics to Em…
mcook42 Dec 23, 2024
130dd7b
chore: add debug message when skipping depositing to filtered tim topic
mcook42 Dec 23, 2024
f07f493
chore: use a test configuration bean to manage OdeTimJsonTopology str…
mcook42 Dec 23, 2024
aebe535
refactor: move TestKafkaStreamsConfig to own file for reusability
mcook42 Dec 23, 2024
17eb75f
chore: reorganize Asn1EncodedDataRouterTest
mcook42 Dec 23, 2024
b46b790
test: delete OdeTimJsonTopologyTest.java as it is indirectly tested (…
mcook42 Dec 23, 2024
ac8771e
refactor: undo unnecessary retries in OdeTimJsonTopology.query
mcook42 Dec 23, 2024
6d43329
refactor: swap KafkaTemplate for defaultStringMessageProducer in Asn1…
mcook42 Dec 23, 2024
072b74f
style: add Javadocs and remove unused parameters from constructor in …
mcook42 Dec 23, 2024
087a4e8
refactor: remove redundant EventLogger usage
mcook42 Dec 23, 2024
5ec266d
refactor: remove dead code from Asn1EncodedDataRouter.listen
mcook42 Dec 23, 2024
01f7333
style: rename variables for better clarity in Asn1EncodedDataRouter
mcook42 Dec 23, 2024
9c3a3a3
chore: replace OdeKafkaProperties loading in Asn1EncodedDataRouterTes…
mcook42 Dec 23, 2024
6e4ca6b
refactor: untangle message signing from message production
mcook42 Dec 23, 2024
963a3de
Merge branch 'dev' of https://github.com/CDOT-CV/jpo-ode into mcook42…
mcook42 Dec 23, 2024
b002fbb
chore: add explicit imports to KafkaProducerConfig to ensure correct …
mcook42 Dec 23, 2024
b19e328
chore: add explicit load of OdeKafkaProperties in RawEncodedPSMJsonRo…
mcook42 Dec 23, 2024
da4f857
chore: reorder constructor params for easier reading Asn1EncodedDataR…
mcook42 Dec 24, 2024
c7136d1
refactor: take all internal methods private in Asn1EncodedDataRouter
mcook42 Dec 24, 2024
6578b37
refactor: move Asn1EncodedDataRouter to kafka.listeners.asn1 package
mcook42 Dec 24, 2024
60b89b9
chore: remove explicit imports. they cause errors with resolution
mcook42 Dec 24, 2024
84079e5
chore: correct topic in Asn1EncodedDataRouter.listen annotation
mcook42 Dec 24, 2024
586f701
chore: remove unused stop method from OdeTimJsonTopology.
mcook42 Dec 24, 2024
f0f7030
chore: include id in KafkaListener annotation on Asn1EncodedDataRoute…
mcook42 Dec 24, 2024
ee88d53
refactor: extract streamId from message body to use as key when produ…
mcook42 Dec 24, 2024
531ee0c
style: reformat files edited in previous commit
mcook42 Dec 24, 2024
f250b8c
chore: add SerializationConfig to test class dependencies
mcook42 Dec 24, 2024
c128e7e
test: encapsulate duplicate setup logic in Asn1EncodedDataRouterTest
mcook42 Dec 24, 2024
a09fbb1
test: streamline resource loading
mcook42 Dec 24, 2024
9c04c3d
chore: use correct key deserializer in Receiver tests where consuming…
mcook42 Dec 24, 2024
853ce65
style: correct naming of streamId variable
mcook42 Dec 24, 2024
c7b8938
style: add missing Javadocs to RsuDepositorConfig
mcook42 Dec 24, 2024
3e414a1
style: add missing Javadocs and reformat RsuDepositor and test
mcook42 Dec 24, 2024
2c01cb3
style: add missing Javadocs to TestKafkaStreamsConfig
mcook42 Dec 24, 2024
5999898
refactor: use ObjectMapper bean in Asn1EncodedDataRouter where possible
mcook42 Dec 24, 2024
82af676
refactor: renamed and reorganized processUnsignedMessage path in Asn1…
mcook42 Dec 24, 2024
735dfa7
refactor: deduplicate extraction of metadata and payload JSONObjects
mcook42 Dec 24, 2024
1b1c39c
refactor: error handling and use enum for start flag
mcook42 Dec 24, 2024
977a2b7
refactor: add XmlMapper for XML handling
mcook42 Dec 24, 2024
fcc2e8c
revert: undo move of resources to different package
mcook42 Dec 26, 2024
7c8e68c
Revert "refactor: move Asn1EncodedDataRouter to kafka.listeners.asn1 …
mcook42 Dec 26, 2024
0eee335
test: update resource package path after reverting package move
mcook42 Dec 26, 2024
08550c0
refactor: introduce SignatureResultModel
mcook42 Dec 26, 2024
bcb6901
refactor: rename SecurityServicesClientImpl.java to SecurityServicesC…
mcook42 Dec 26, 2024
dbe49ea
refactor: SecurityServicesClient and add unit test coverage
mcook42 Dec 26, 2024
44b73cc
chore: remove SecurityServicesConfig and related dependencies.
mcook42 Dec 26, 2024
2246c2d
chore: correct import paths in Asn1EncodedDataRouter and test
mcook42 Dec 26, 2024
b920745
test: handle null responses and improve error handling in signing.
mcook42 Dec 26, 2024
5c5ba73
refactor: use autowired props to drive URL in SecurityServicesClientTest
mcook42 Dec 26, 2024
411c562
refactor: use MockSecurityServicesClient in Asn1EncodedDataRouterTest
mcook42 Dec 26, 2024
6d60ed2
refactor: service request extraction to use metadata JSON.
mcook42 Dec 26, 2024
0d30254
docs: simplify comments and remove redundant JavaDoc.
mcook42 Dec 26, 2024
69ac11e
docs: remove dead comments
mcook42 Dec 26, 2024
1e7ffbc
chore: simplify RSU message handling logic.
mcook42 Dec 27, 2024
7a58bce
chore: simplify error message for SDX deposit failure
mcook42 Dec 27, 2024
fd715ab
refactor: don't use TimTransmorgrifier to buildEncodings
mcook42 Dec 27, 2024
bcfcfc6
chore: rename buildJsonTimFromPacket to buildTimFromPacket
mcook42 Dec 27, 2024
e757746
refactor: don't use KafkaListenerConfig in Asn1EncodedDataRouterTest …
mcook42 Dec 27, 2024
1018461
fix: incorrect expiration date calculation in TIM processing.
mcook42 Dec 27, 2024
a765be7
style: reorganize methods in Asn1EncodedDataRouterTest
mcook42 Dec 27, 2024
285c880
fix: replace outdated Date handling to remove system time variability
mcook42 Dec 27, 2024
1c75ab6
fix: handle RestClientException during TIM signing process
mcook42 Dec 27, 2024
dc48020
chore: correct TIM header processing in Asn1EncodedDataRouter
mcook42 Dec 30, 2024
70827e7
refactor: TIM signing and certification expiration logic.
mcook42 Jan 2, 2025
52209b4
refactor: TIM signing exception handling
mcook42 Jan 2, 2025
789acc9
test: add better test failure messaging
mcook42 Jan 2, 2025
c63b92d
test: add better test failure messaging
mcook42 Jan 2, 2025
c8055ad
test: add better test failure messaging (actually stringify iterable)
mcook42 Jan 2, 2025
bd9f820
test: simplify testing assertions for better failure reporting
mcook42 Jan 2, 2025
5b86319
Merge branch 'dev' of https://github.com/CDOT-CV/jpo-ode into mcook42…
mcook42 Jan 2, 2025
95f154f
test: update expected-asn1-encoded-router-snmp-deposit.xml with schem…
mcook42 Jan 2, 2025
e47f528
fix: correctly handle data signing flow and log clear exception messa…
mcook42 Jan 2, 2025
4a8f4f9
test: add unit test for handling server error in signMessage
mcook42 Jan 2, 2025
48f703f
Merge branch 'dev' of https://github.com/CDOT-CV/jpo-ode into mcook42…
mcook42 Jan 3, 2025
495c00f
chore: set scope to provided for annotation processors
mcook42 Jan 6, 2025
5072425
chore: add mock sec service to docker-compose.yml and sample.env
mcook42 Jan 6, 2025
8b7fce2
fix: add error handling for failed decoding
mcook42 Jan 6, 2025
b71c2e9
fix: deserialize sec srvcs response from valid json
mcook42 Jan 6, 2025
5a718c5
Merge branch 'dev' of https://github.com/CDOT-CV/jpo-ode into mcook42…
mcook42 Jan 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,23 @@


/**
* The OdeTimJsonTopology class sets up and manages a Kafka Streams topology
* for processing TIM (Traveler Information Message) JSON data from the
* OdeTimJson Kafka topic.
* This class creates a K-Table that houses TMC-generated TIMs which can be
* queried by UUID.
* The OdeTimJsonTopology class sets up and manages a Kafka Streams topology for processing TIM
* (Traveler Information Message) JSON data from the OdeTimJson Kafka topic. This class creates a
* K-Table that houses TMC-generated TIMs which can be queried by UUID.
**/
@Slf4j
public class OdeTimJsonTopology {

private final KafkaStreams streams;

/**
* Constructs an instance of OdeTimJsonTopology to set up and manage a Kafka Streams
* topology for processing TIM JSON data.
* Constructs an instance of OdeTimJsonTopology to set up and manage a Kafka Streams topology for
* processing TIM JSON data.
*
* @param odeKafkaProps the properties containing Kafka configuration, including brokers
* and optional Confluent-specific configuration for authentication.
* @param topic the Kafka topic from which TIM JSON data is consumed to build the topology.
* @param odeKafkaProps the properties containing Kafka configuration, including brokers and
* optional Confluent-specific configuration for authentication.
* @param topic the Kafka topic from which TIM JSON data is consumed to build the
* topology.
*/
public OdeTimJsonTopology(OdeKafkaProperties odeKafkaProps, String topic) {

Expand All @@ -52,20 +51,15 @@ public OdeTimJsonTopology(OdeKafkaProperties odeKafkaProps, String topic) {
streams.start();
}

public void stop() {
log.info("Stopping Ode Tim Json Topology");
streams.close();
}

public boolean isRunning() {
return streams.state().isRunningOrRebalancing();
return streams.state().equals(KafkaStreams.State.RUNNING);
Comment on lines -61 to +55
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this method is only used in tests for now, and in the tests, a state of REBALANCING would cause an error. Addressing this possible bug may be worth the effort in a follow-up PR or separate work item. Still, I don't believe the changes belong here since I didn't change how the OdeTimJsonTopology works in this PR.

}

/**
* Builds a Kafka Streams topology for processing TIM JSON data.
*
* @param topic the Kafka topic from which TIM JSON data is consumed and used
* to build the topology.
* @param topic the Kafka topic from which TIM JSON data is consumed and used to build the
* topology.
* @return the constructed Kafka Streams topology.
*/
public Topology buildTopology(String topic) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package us.dot.its.jpo.ode.http;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.web.client.RestTemplate;

/**
* This class provides a configuration for creating and managing
* a {@link RestTemplate} bean, which is used for making HTTP requests
* to external services.
*
* <p><strong>NOTE:</strong> As of 5.0 the {@link RestTemplate} class is in maintenance mode, with
* only minor requests for changes and bugs to be accepted going forward. Please,
* consider using the {@code org.springframework.web.reactive.client.WebClient}
* which has a more modern API and supports sync, async, and streaming scenarios.
* Whenever we the time or resources to update our Spring version,
* we should replace usages of RestTemplate with WebClient.</p>
*/
@Configuration
public class WebClientConfig {

/**
* Creates and configures a {@link RestTemplate} bean with a custom
* {@link MappingJackson2HttpMessageConverter} to use the provided
* {@link ObjectMapper} for JSON serialization and deserialization.
*
* @param mapper the {@link ObjectMapper} to be used for configuring
* JSON message conversion.
* @return a configured {@link RestTemplate} instance that includes
* the custom JSON message converter.
*/
@Bean
public RestTemplate restTemplate(ObjectMapper mapper) {
var template = new RestTemplate();
MappingJackson2HttpMessageConverter customConverter = new MappingJackson2HttpMessageConverter();
customConverter.setObjectMapper(mapper);
template.getMessageConverters().add(customConverter);
return template;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package us.dot.its.jpo.ode.kafka;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import us.dot.its.jpo.ode.OdeTimJsonTopology;

/**
* KafkaStreamsConfig is a Spring configuration class that provides
* beans related to Kafka Streams topology setup.
*/
@Configuration
public class KafkaStreamsConfig {

@Bean
public OdeTimJsonTopology odeTimJsonTopology(
@Value("${ode.kafka.topics.json.tim}") String timTopic,
OdeKafkaProperties odeKafkaProperties) {
return new OdeTimJsonTopology(odeKafkaProperties, timTopic);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package us.dot.its.jpo.ode.kafka.listeners.asn1;

import com.fasterxml.jackson.core.JsonProcessingException;
import joptsimple.internal.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.json.JSONObject;
Expand Down Expand Up @@ -30,8 +32,8 @@
* processing and forwarding it to different topics based on specific criteria.
*
* <p>This listener is specifically designed to handle decoded data produced by the asn1_codec.
* Upon receiving a payload, it transforms the payload and then determines the appropriate
* Kafka topic to forward the processed data.</p>
* Upon receiving a payload, it transforms the payload and then determines the appropriate Kafka
* topic to forward the processed data.</p>
*
* <p>The class utilizes Spring Kafka's annotation-driven listener configuration,
* allowing it to automatically consume messages from a configured Kafka topic.</p>
Expand Down Expand Up @@ -68,7 +70,8 @@ public Asn1DecodedDataRouter(KafkaTemplate<String, String> kafkaTemplate,
id = "Asn1DecodedDataRouter",
topics = "${ode.kafka.topics.asn1.decoder-output}"
)
public void listen(ConsumerRecord<String, String> consumerRecord) throws XmlUtilsException {
public void listen(ConsumerRecord<String, String> consumerRecord)
throws XmlUtilsException, JsonProcessingException {
log.debug("Key: {} payload: {}", consumerRecord.key(), consumerRecord.value());

JSONObject consumed = XmlUtils.toJSONObject(consumerRecord.value())
Expand All @@ -80,16 +83,23 @@ public void listen(ConsumerRecord<String, String> consumerRecord) throws XmlUtil
.getInt("messageId")
);

var metadataJson = XmlUtils.toJSONObject(consumerRecord.value())
.getJSONObject(OdeAsn1Data.class.getSimpleName())
.getJSONObject(AppContext.METADATA_STRING);
OdeLogMetadata.RecordType recordType = OdeLogMetadata.RecordType
.valueOf(XmlUtils.toJSONObject(consumerRecord.value())
.getJSONObject(OdeAsn1Data.class.getSimpleName())
.getJSONObject(AppContext.METADATA_STRING)
.getString("recordType")
);
.valueOf(metadataJson.getString("recordType"));

String streamId;
if (Strings.isNullOrEmpty(consumerRecord.key())
|| "null".equalsIgnoreCase(consumerRecord.key())) {
streamId = metadataJson.getJSONObject("serialId").getString("streamId");
} else {
streamId = consumerRecord.key();
}

switch (messageId) {
case BasicSafetyMessage -> routeBSM(consumerRecord, recordType);
case TravelerInformation -> routeTIM(consumerRecord, recordType);
case TravelerInformation -> routeTIM(consumerRecord, streamId, recordType);
case SPATMessage -> routeSPAT(consumerRecord, recordType);
case MAPMessage -> routeMAP(consumerRecord, recordType);
case SSMMessage -> routeSSM(consumerRecord, recordType);
Expand Down Expand Up @@ -156,17 +166,18 @@ private void routeMAP(ConsumerRecord<String, String> consumerRecord, RecordType
kafkaTemplate.send(jsonTopics.getMap(), odeMapData);
}

private void routeTIM(ConsumerRecord<String, String> consumerRecord, RecordType recordType)
throws XmlUtilsException {
private void routeTIM(ConsumerRecord<String, String> consumerRecord,
String streamId,
RecordType type) throws XmlUtilsException {
String odeTimData =
OdeTimDataCreatorHelper.createOdeTimDataFromDecoded(consumerRecord.value()).toString();
switch (recordType) {
switch (type) {
case dnMsg -> kafkaTemplate.send(jsonTopics.getDnMessage(), consumerRecord.key(), odeTimData);
case rxMsg -> kafkaTemplate.send(jsonTopics.getRxTim(), consumerRecord.key(), odeTimData);
default -> log.trace("Consumed TIM data with record type: {}", recordType);
default -> log.trace("Consumed TIM data with record type: {}", type);
}
// Send all TIMs also to OdeTimJson
kafkaTemplate.send(jsonTopics.getTim(), consumerRecord.key(), odeTimData);
kafkaTemplate.send(jsonTopics.getTim(), streamId, odeTimData);
Comment on lines -169 to +180
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: the key is empty when consuming from the asn1 decoder output topic. We need to include the streamId as the key when publishing TIM data so that the downstream consumers can use the streamId to lookup the TIM JSON from the K-Table in OdeTimJsonTopology. If we don't produce with a streamId (which we conditionally set a few lines above this method)) then we will never produce to the TMCFiltered topic in the Asn1EncodedDataRouter

}

private void routeBSM(ConsumerRecord<String, String> consumerRecord, RecordType recordType)
Expand Down

This file was deleted.

Loading
Loading