Skip to content

Commit

Permalink
updates with working TIM deposit to the IMP
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael7371 committed Nov 11, 2024
1 parent fc74fad commit f52f3e8
Show file tree
Hide file tree
Showing 52 changed files with 2,182 additions and 175 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ consumer-example.iml
.env

*.pem
config.json
config.json
*.lck
1 change: 1 addition & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"mainClass": "us.dot.its.jpo.ode.s3.depositor.DepositorApplication",
"projectName": "jpo-s3-depositor",
"envFile": "${workspaceFolder}/.env",
// "vmArgs": "-Djavax.net.debug=ssl,handshake,data,trustmanager"
}
]
}
8 changes: 8 additions & 0 deletions jpo-s3-depositor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,19 @@
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>

<!-- Used to ingest `.pem` files into a JKS programmatically -->
<dependency>
<groupId>io.github.hakky54</groupId>
<artifactId>sslcontext-kickstart-for-pem</artifactId>
<version>8.3.7</version>
</dependency>
<!-- Used to convert latitude and longitude to a geohash -->
<dependency>
<groupId>ch.hsr</groupId>
<artifactId>geohash</artifactId>
<version>1.4.0</version>
</dependency>

<dependency>
<groupId>org.springdoc</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
@EnableKafka
@EnableConfigurationProperties(DepositorProperties.class)
public class DepositorApplication {
private static final Logger logger = LoggerFactory.getLogger(DepositorApplication.class);

static final int DEFAULT_NO_THREADS = 10;
static final String DEFAULT_SCHEMA = "default";

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,66 +1,68 @@
package us.dot.its.jpo.ode.s3.depositor.imp;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.ByteString;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// import us.dot.its.jpo.ode.depositor.GeoRoutedMsg;
// import us.dot.its.jpo.ode.depositor.GeoRoutedMsgOrBuilder;
// import us.dot.its.jpo.ode.depositor.GeoRoutedMsg_PB2;
import us.dot.its.jpo.ode.model.OdeBsmData;
import us.dot.its.jpo.ode.model.OdeBsmMetadata;
import us.dot.its.jpo.ode.model.OdeBsmPayload;
import us.dot.its.jpo.ode.model.OdeTimData;
import us.dot.its.jpo.ode.model.OdeTimMetadata;
import us.dot.its.jpo.ode.model.OdeTimPayload;
import us.dot.its.jpo.ode.plugin.j2735.OdeTravelerInformationMessage;

import lombok.extern.slf4j.Slf4j;

import us.dot.its.jpo.ode.s3.depositor.DateJsonMapper;
import us.dot.its.jpo.ode.s3.depositor.DepositorProperties;
import us.dot.its.jpo.ode.s3.depositor.models.ode.OdeTimData;

import java.util.List;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import org.springframework.stereotype.Component;

import us.dot.its.jpo.ode.model.OdeMsgMetadata.GeneratedBy;
import us.dot.its.jpo.ode.depositor.GeoRoutedMsg;

@Service
public class ImpDepositorService implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ImpDepositorService.class);
@Component
// @DependsOn("impRegistration")
@Slf4j
public class ImpDepositorService {
private final ObjectMapper mapper = DateJsonMapper.getInstance();
private final ImpMqttService mqttService;
private final DepositorProperties properties;

public ImpDepositorService(DepositorProperties properties, ImpMqttService mqttService) {
this.mqttService = mqttService;
public ImpDepositorService(DepositorProperties properties) {
this.properties = properties;
}

@Override
public void run() {
// Logic to start the service
logger.info("ImpDepositorService is running");
// You can add any initialization logic here if needed
var registration = new ImpRegistration(properties);
var response = registration.registerClientPartner();

if (response != null) {
log.info("IMP registration successful");
} else {
log.error("IMP registration failed, services will not be started");
}

this.mqttService = new ImpMqttService(properties);
}

// TODO: Use the filtered TMC topic instead of the mixed TIM topic
// @KafkaListener(topics = "topic.OdeTimJsonTMCFiltered", groupId =
// "jpo-s3-depositor", concurrency = "${listen.concurrency:1}")
// public void tmcTimListener(String message) {
// try {
// OdeTimData timMsg = mapper.readValue(message, OdeTimData.class);
// OdeTimMetadata timMetadata = (OdeTimMetadata) timMsg.getMetadata();
// } catch (Exception e) {
// // Handle exception
// }
// @Override
// public void run() {
// // Logic to start the service
// log.info("ImpDepositorService is running");
// // You can add any initialization logic here if needed
// }

@KafkaListener(topics = "topic.OdeBsmJson", groupId = "jpo-s3-depositor", concurrency = "${listen.concurrency:1}")
public void bsmListener(String message) {
@KafkaListener(topics = "topic.OdeTimJsonTMCFiltered", groupId = "jpo-s3-depositor", concurrency = "${listen.concurrency:1}")
public void tmcTimListener(String message) {
try {
OdeBsmData msg = mapper.readValue(message, OdeBsmData.class);
OdeTimData timMsg = mapper.readValue(message, OdeTimData.class);
String asn1String = timMsg.getMetadata().getAsn1();
String odeReceivedAt = timMsg.getMetadata().getOdeReceivedAt();
GeoRoutedMsg geoRoutedMsg = ImpUtil.getGeoRoutedMsg(asn1String, odeReceivedAt);
List<String> topicList = ImpUtil.getRegionalTimTopicList(timMsg, properties);

log.info("Received TIM message: {}", geoRoutedMsg);
log.info("Sending TIM message to MQTT topics: {}", topicList);

for (String topic : topicList) {
mqttService.publish(topic, geoRoutedMsg);
}

} catch (Exception e) {
log.error("Error processing TIM message", e);
// Handle exception
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,48 +7,102 @@
import javax.net.ssl.X509ExtendedKeyManager;
import javax.net.ssl.X509ExtendedTrustManager;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import lombok.extern.slf4j.Slf4j;
import nl.altindag.ssl.SSLFactory;
import nl.altindag.ssl.pem.util.PemUtils;
import us.dot.its.jpo.ode.depositor.GeoRoutedMsg;
import us.dot.its.jpo.ode.s3.depositor.DepositorProperties;
import us.dot.its.jpo.ode.s3.depositor.models.imp.ConfigData;
import us.dot.its.jpo.ode.s3.depositor.utils.CommonUtils;

@Service
@Slf4j
public class ImpMqttService {
private static final Logger logger = LoggerFactory.getLogger(ImpMqttService.class);
private MqttClient client;
private final String brokerUri;
private final String clientId;
private final MqttConnectOptions options;

public ImpMqttService(DepositorProperties properties) throws MqttException {
public ImpMqttService(DepositorProperties properties) {
waitForFiles(properties.getImpCertPath());
ConfigData impConfig = CommonUtils.readConfigFile(properties.getImpCertPath() + "/config.json");
this.brokerUri = impConfig.getImpMqttUri().toString().replace("mqtt://", "ssl://");
this.clientId = impConfig.getDeviceID();

String uri = impConfig.getImpMqttUri().toString().replace("mqtt://", "ssl://");
SSLSocketFactory socketFactory = createSocketFactory(impConfig.getCaCertPath(), impConfig.getClientCertPath(),
impConfig.getKeyFilePath());

client = new MqttClient(uri, impConfig.getDeviceID());
MqttConnectOptions options = new MqttConnectOptions();
this.options = new MqttConnectOptions();
options.setCleanSession(true);
options.setSocketFactory(createSocketFactory(impConfig.getCaCertPath(), impConfig.getClientCertPath(),
impConfig.getKeyFilePath()));
options.setSocketFactory(socketFactory);
options.setConnectionTimeout(30);
options.setKeepAliveInterval(60);
options.setAutomaticReconnect(true);
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);

try {
connect();
} catch (MqttException e) {
log.error("Error while connecting to broker", e);
}
}

private void connect() throws MqttException {
client = new MqttClient(brokerUri, clientId);
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
log.error("Connection lost", cause);
}

@Override
public void messageArrived(String topic, MqttMessage message) {
// Handle incoming messages
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// Handle delivery completion
}

});

client.connect(options);
}

private void waitForFiles(String filePath) {
try {
Thread.sleep(5000);
File configFile = new File(filePath + "/config.json");
File caCertFile = new File(filePath + "/imp-ca.pem");
File clientCertFile = new File(filePath + "/imp-cert.pem");
File keyFile = new File(filePath + "/imp-key.pem");
while (!configFile.exists() || !caCertFile.exists() || !clientCertFile.exists() || !keyFile.exists()) {
Thread.sleep(1000); // Wait for 1 second before checking again
}
log.info("All files found");
} catch (InterruptedException e) {
log.error("Error while waiting for config file", e);
}
}

public static SSLSocketFactory createSocketFactory(String caCertPath, String clientCertPath,
String privateKeyPath) {
// Convert to absolute paths
String absoluteCaCertPath = Paths.get(caCertPath).toAbsolutePath().toString();
String absoluteClientCertPath = Paths.get(clientCertPath).toAbsolutePath().toString();
String absolutePrivateKeyPath = Paths.get(privateKeyPath).toAbsolutePath().toString();

logger.info("CA Cert Path: {}", absoluteCaCertPath);
logger.info("Client Cert Path: {}", absoluteClientCertPath);
logger.info("Private Key Path: {}", absolutePrivateKeyPath);
log.info("CA Cert Path: {}", absoluteCaCertPath);
log.info("Client Cert Path: {}", absoluteClientCertPath);
log.info("Private Key Path: {}", absolutePrivateKeyPath);

// Check if files exist
if (!new File(absoluteCaCertPath).exists()) {
Expand All @@ -71,9 +125,9 @@ public static SSLSocketFactory createSocketFactory(String caCertPath, String cli
return sslSocketFactory;
}

public void publish(String topic, String messageContent) throws MqttException {
MqttMessage message = new MqttMessage();
message.setPayload(messageContent.getBytes());
client.publish(topic, message);
public void publish(String topic, GeoRoutedMsg geoRoutedMsg) throws MqttException {
boolean retained = false;
log.debug("Publishing message to topic: {}", topic);
client.publish(topic, geoRoutedMsg.toByteArray(), 0, retained);
}
}
Loading

0 comments on commit f52f3e8

Please sign in to comment.