diff --git a/jpo-s3-depositor/lib/jpo-ode-common-3.0.0-SNAPSHOT.jar b/jpo-s3-depositor/lib/jpo-ode-common-3.0.0-SNAPSHOT.jar
index e402245..db4dc6d 100644
Binary files a/jpo-s3-depositor/lib/jpo-ode-common-3.0.0-SNAPSHOT.jar and b/jpo-s3-depositor/lib/jpo-ode-common-3.0.0-SNAPSHOT.jar differ
diff --git a/jpo-s3-depositor/lib/jpo-ode-core-3.0.0-SNAPSHOT.jar b/jpo-s3-depositor/lib/jpo-ode-core-3.0.0-SNAPSHOT.jar
index ead2de0..6516c55 100644
Binary files a/jpo-s3-depositor/lib/jpo-ode-core-3.0.0-SNAPSHOT.jar and b/jpo-s3-depositor/lib/jpo-ode-core-3.0.0-SNAPSHOT.jar differ
diff --git a/jpo-s3-depositor/lib/jpo-ode-plugins-3.0.0-SNAPSHOT.jar b/jpo-s3-depositor/lib/jpo-ode-plugins-3.0.0-SNAPSHOT.jar
index 354a3ab..0b5eef2 100644
Binary files a/jpo-s3-depositor/lib/jpo-ode-plugins-3.0.0-SNAPSHOT.jar and b/jpo-s3-depositor/lib/jpo-ode-plugins-3.0.0-SNAPSHOT.jar differ
diff --git a/jpo-s3-depositor/pom.xml b/jpo-s3-depositor/pom.xml
index b4dfe03..aac2c9f 100644
--- a/jpo-s3-depositor/pom.xml
+++ b/jpo-s3-depositor/pom.xml
@@ -71,6 +71,16 @@
org.springframework.kafka
spring-kafka
+
+ org.eclipse.paho
+ org.eclipse.paho.client.mqttv3
+ 1.2.5
+
+
+ io.github.hakky54
+ sslcontext-kickstart-for-pem
+ 8.3.7
+
org.springdoc
@@ -124,6 +134,11 @@
protobuf-java-util
${protobuf.version}
+
diff --git a/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/gen/FullOdeBsmData.java b/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/gen/FullOdeBsmData.java
new file mode 100644
index 0000000..3d34738
--- /dev/null
+++ b/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/gen/FullOdeBsmData.java
@@ -0,0 +1,40 @@
+package us.dot.its.jpo.ode.s3.depositor.gen;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
+import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
+
+import us.dot.its.jpo.ode.model.OdeBsmData;
+import us.dot.its.jpo.ode.model.OdeBsmMetadata;
+import us.dot.its.jpo.ode.model.OdeBsmPayload;
+import us.dot.its.jpo.ode.model.OdeData;
+import us.dot.its.jpo.ode.model.OdeMsgMetadata;
+import us.dot.its.jpo.ode.model.OdeMsgPayload;
+
+import static com.fasterxml.jackson.annotation.JsonTypeInfo.*;
+
+public class FullOdeBsmData extends OdeData {
+
+ private static final long serialVersionUID = 4944935387116447760L;
+
+ public FullOdeBsmData() {
+ super();
+ }
+
+ public FullOdeBsmData(OdeBsmMetadata metadata, OdeBsmPayload payload) {
+ super(metadata, payload);
+ }
+
+ @Override
+ @JsonTypeInfo(use = Id.CLASS, include = As.EXISTING_PROPERTY, defaultImpl = OdeBsmMetadata.class)
+ public void setMetadata(OdeMsgMetadata metadata) {
+ super.setMetadata(metadata);
+ }
+
+ @Override
+ @JsonTypeInfo(use = Id.CLASS, include = As.EXISTING_PROPERTY, defaultImpl = OdeBsmPayload.class)
+ public void setPayload(OdeMsgPayload payload) {
+ super.setPayload(payload);
+ }
+
+}
diff --git a/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/gen/GenOdeSchemas.java b/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/gen/GenOdeSchemas.java
new file mode 100644
index 0000000..c3d9594
--- /dev/null
+++ b/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/gen/GenOdeSchemas.java
@@ -0,0 +1,30 @@
+package us.dot.its.jpo.ode.s3.depositor.gen;
+
+import org.springframework.stereotype.Controller;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import jakarta.annotation.PostConstruct;
+import us.dot.its.jpo.ode.model.OdeBsmData;
+
+@Controller
+public class GenOdeSchemas {
+
+ // @PostConstruct
+ // public void generateSchema() {
+ // HyperSchemaFactoryWrapper bsm = new HyperSchemaFactoryWrapper();
+ // ObjectMapper mapper = new ObjectMapper();
+ // try {
+ // mapper.acceptJsonFormatVisitor(FullOdeBsmData.class, bsm);
+ // JsonSchema bsmSchema = bsm.finalSchema();
+ // String schemaJson =
+ // mapper.writerWithDefaultPrettyPrinter().writeValueAsString(bsmSchema);
+ // System.out.println(schemaJson);
+ // } catch (JsonMappingException e) {
+ // e.printStackTrace();
+ // } catch (Exception e) {
+ // e.printStackTrace();
+ // }
+ // }
+}
diff --git a/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/imp/ImpDepositorService.java b/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/imp/ImpDepositorService.java
index 148797d..d52223a 100644
--- a/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/imp/ImpDepositorService.java
+++ b/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/imp/ImpDepositorService.java
@@ -6,9 +6,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import us.dot.its.jpo.ode.depositor.GeoRoutedMsg;
-import us.dot.its.jpo.ode.depositor.GeoRoutedMsgOrBuilder;
-import us.dot.its.jpo.ode.depositor.GeoRoutedMsg_PB2;
+// import us.dot.its.jpo.ode.depositor.GeoRoutedMsg;
+// import us.dot.its.jpo.ode.depositor.GeoRoutedMsgOrBuilder;
+// import us.dot.its.jpo.ode.depositor.GeoRoutedMsg_PB2;
import us.dot.its.jpo.ode.model.OdeBsmData;
import us.dot.its.jpo.ode.model.OdeBsmMetadata;
import us.dot.its.jpo.ode.model.OdeBsmPayload;
@@ -17,34 +17,49 @@
import us.dot.its.jpo.ode.model.OdeTimPayload;
import us.dot.its.jpo.ode.plugin.j2735.OdeTravelerInformationMessage;
import us.dot.its.jpo.ode.s3.depositor.DateJsonMapper;
+import us.dot.its.jpo.ode.s3.depositor.DepositorProperties;
import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Service;
+
import us.dot.its.jpo.ode.model.OdeMsgMetadata.GeneratedBy;
-// @Service
-public class ImpDepositorService {
+@Service
+public class ImpDepositorService implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ImpDepositorService.class);
-
private final ObjectMapper mapper = DateJsonMapper.getInstance();
+ private final ImpMqttService mqttService;
+ private final DepositorProperties properties;
- // TODO: Use the filtered TMC topic instead of the mixed TIM topic
- @KafkaListener(topics = "topic.OdeTimJson", groupId = "jpo-s3-depositor", concurrency = "${listen.concurrency:3}")
- public void listenTopic1(String message) {
- try {
- OdeTimData timMsg = mapper.readValue(message, OdeTimData.class);
- OdeTimMetadata timMetadata = (OdeTimMetadata) timMsg.getMetadata();
-
- if (timMetadata.getRecordGeneratedBy().equals(GeneratedBy.T)) {
+ public ImpDepositorService(DepositorProperties properties, ImpMqttService mqttService) {
+ this.mqttService = mqttService;
+ this.properties = properties;
+ }
- }
+ @Override
+ public void run() {
+ // Logic to start the service
+ logger.info("ImpDepositorService is running");
+ // You can add any initialization logic here if needed
+ }
- OdeTimPayload timPayload = (OdeTimPayload) timMsg.getPayload();
- OdeTravelerInformationMessage timData = (OdeTravelerInformationMessage) timPayload.getData();
- timData.get
+ // TODO: Use the filtered TMC topic instead of the mixed TIM topic
+ // @KafkaListener(topics = "topic.OdeTimJsonTMCFiltered", groupId =
+ // "jpo-s3-depositor", concurrency = "${listen.concurrency:1}")
+ // public void tmcTimListener(String message) {
+ // try {
+ // OdeTimData timMsg = mapper.readValue(message, OdeTimData.class);
+ // OdeTimMetadata timMetadata = (OdeTimMetadata) timMsg.getMetadata();
+ // } catch (Exception e) {
+ // // Handle exception
+ // }
+ // }
- timPayload.getTim().getRecord().get(0).getFrameType();
+ @KafkaListener(topics = "topic.OdeBsmJson", groupId = "jpo-s3-depositor", concurrency = "${listen.concurrency:1}")
+ public void bsmListener(String message) {
+ try {
+ OdeBsmData msg = mapper.readValue(message, OdeBsmData.class);
- logger.info("Received message: " + pojo1.getMetadata().getAsn1());
} catch (Exception e) {
// Handle exception
}
diff --git a/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/imp/ImpMqttService.java b/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/imp/ImpMqttService.java
new file mode 100644
index 0000000..4422462
--- /dev/null
+++ b/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/imp/ImpMqttService.java
@@ -0,0 +1,79 @@
+package us.dot.its.jpo.ode.s3.depositor.imp;
+
+import java.io.File;
+import java.nio.file.Paths;
+
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.X509ExtendedKeyManager;
+import javax.net.ssl.X509ExtendedTrustManager;
+
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import nl.altindag.ssl.SSLFactory;
+import nl.altindag.ssl.pem.util.PemUtils;
+import us.dot.its.jpo.ode.s3.depositor.DepositorProperties;
+import us.dot.its.jpo.ode.s3.depositor.models.imp.ConfigData;
+import us.dot.its.jpo.ode.s3.depositor.utils.CommonUtils;
+
+@Service
+public class ImpMqttService {
+ private static final Logger logger = LoggerFactory.getLogger(ImpMqttService.class);
+ private MqttClient client;
+
+ public ImpMqttService(DepositorProperties properties) throws MqttException {
+ ConfigData impConfig = CommonUtils.readConfigFile(properties.getImpCertPath() + "/config.json");
+
+ String uri = impConfig.getImpMqttUri().toString().replace("mqtt://", "ssl://");
+
+ client = new MqttClient(uri, impConfig.getDeviceID());
+ MqttConnectOptions options = new MqttConnectOptions();
+ options.setCleanSession(true);
+ options.setSocketFactory(createSocketFactory(impConfig.getCaCertPath(), impConfig.getClientCertPath(),
+ impConfig.getKeyFilePath()));
+ client.connect(options);
+ }
+
+ public static SSLSocketFactory createSocketFactory(String caCertPath, String clientCertPath,
+ String privateKeyPath) {
+ // Convert to absolute paths
+ String absoluteCaCertPath = Paths.get(caCertPath).toAbsolutePath().toString();
+ String absoluteClientCertPath = Paths.get(clientCertPath).toAbsolutePath().toString();
+ String absolutePrivateKeyPath = Paths.get(privateKeyPath).toAbsolutePath().toString();
+
+ logger.info("CA Cert Path: {}", absoluteCaCertPath);
+ logger.info("Client Cert Path: {}", absoluteClientCertPath);
+ logger.info("Private Key Path: {}", absolutePrivateKeyPath);
+
+ // Check if files exist
+ if (!new File(absoluteCaCertPath).exists()) {
+ throw new IllegalArgumentException("CA Certificate file not found at path: " + absoluteCaCertPath);
+ }
+ if (!new File(absoluteClientCertPath).exists()) {
+ throw new IllegalArgumentException("Client Certificate file not found at path: " + absoluteClientCertPath);
+ }
+ if (!new File(absolutePrivateKeyPath).exists()) {
+ throw new IllegalArgumentException("Private Key file not found at path: " + absolutePrivateKeyPath);
+ }
+
+ X509ExtendedKeyManager keyManager = PemUtils.loadIdentityMaterial(Paths.get(absoluteClientCertPath),
+ Paths.get(absolutePrivateKeyPath));
+ X509ExtendedTrustManager trustManager = PemUtils.loadTrustMaterial(Paths.get(absoluteCaCertPath));
+
+ var sslFactory = SSLFactory.builder().withIdentityMaterial(keyManager).withTrustMaterial(trustManager).build();
+
+ var sslSocketFactory = sslFactory.getSslSocketFactory();
+ return sslSocketFactory;
+ }
+
+ public void publish(String topic, String messageContent) throws MqttException {
+ MqttMessage message = new MqttMessage();
+ message.setPayload(messageContent.getBytes());
+ client.publish(topic, message);
+ }
+}
\ No newline at end of file
diff --git a/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/imp/ImpRegistration.java b/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/imp/ImpRegistration.java
index 282f527..d375ea3 100644
--- a/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/imp/ImpRegistration.java
+++ b/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/imp/ImpRegistration.java
@@ -2,6 +2,7 @@
import us.dot.its.jpo.ode.s3.depositor.DateJsonMapper;
import us.dot.its.jpo.ode.s3.depositor.DepositorProperties;
+import us.dot.its.jpo.ode.s3.depositor.utils.CommonUtils;
import us.dot.its.jpo.ode.s3.depositor.models.imp.AuthToken;
import us.dot.its.jpo.ode.s3.depositor.models.imp.AuthTokenRequest;
import us.dot.its.jpo.ode.s3.depositor.models.imp.ClientCompleteResponse;
@@ -65,9 +66,9 @@ public ConfigData registerClientPartner() {
ClientRegistrationResponse registrationResponse = register(token);
- writeToFile(caCertPath, registrationResponse.getCertificate().getCaPem());
- writeToFile(certPath, registrationResponse.getCertificate().getCertPem());
- writeToFile(keyPath, registrationResponse.getCertificate().getKeyPem());
+ CommonUtils.writeToFile(caCertPath, registrationResponse.getCertificate().getCaPem());
+ CommonUtils.writeToFile(certPath, registrationResponse.getCertificate().getCertPem());
+ CommonUtils.writeToFile(keyPath, registrationResponse.getCertificate().getKeyPem());
deviceID = registrationResponse.getDeviceID();
} else {
@@ -80,14 +81,12 @@ public ConfigData registerClientPartner() {
ClientConnectionResponse connectionResponse = connection(token, deviceID);
URI uri = new URI(connectionResponse.getMqttURL());
- String host = uri.getHost();
- int port = uri.getPort();
configData = new ConfigData(configPath, caCertPath, certPath, keyPath, properties.getImpVendor(),
- properties.getImpNetworkType(), host, port, deviceID);
+ properties.getImpNetworkType(), uri, deviceID);
String configDataJson = objectMapper.writeValueAsString(configData);
- writeToFile(configPath, configDataJson);
+ CommonUtils.writeToFile(configPath, configDataJson);
return configData;
} catch (Exception e) {
@@ -150,17 +149,6 @@ private ClientConnectionResponse connection(String token, String deviceID) {
}
}
- private void writeToFile(String filePath, String content) {
- try {
- Files.createDirectories(Paths.get(filePath).getParent());
- try (FileWriter writer = new FileWriter(filePath)) {
- writer.write(content);
- }
- } catch (IOException e) {
- logger.error("writeToFile error: " + e.getStackTrace());
- }
- }
-
private boolean validRegistration(String configPath) {
boolean valid = false;
File file = new File(configPath);
diff --git a/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/imp/ImpUtil.java b/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/imp/ImpUtil.java
index 4b27c40..d924ced 100644
--- a/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/imp/ImpUtil.java
+++ b/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/imp/ImpUtil.java
@@ -1,14 +1,21 @@
-package us.dot.its.jpo.ode.s3.depositor.imp;
+// package us.dot.its.jpo.ode.s3.depositor.imp;
-import us.dot.its.jpo.ode.depositor.GeoRoutedMsg;
+// import com.google.protobuf.ByteString;
-public class ImpUtil {
+// import us.dot.its.jpo.ode.depositor.GeoRoutedMsg;
+// import us.dot.its.jpo.ode.depositor.GeoRoutedMsgOrBuilder;
- public GeoRoutedMsg getGeoRoutedMsg(String asn1String, String odeReceivedAt, ) {
-
- byte[] asn1 = pojo1.getMetadata().getAsn1().getBytes();
- ByteString asn1ByteString = ByteString.copyFrom(asn1);
+// public class ImpUtil {
- GeoRoutedMsgOrBuilder geoRoutedMsg = GeoRoutedMsg.newBuilder().setMsgBytes(asn1ByteString).set;
- }
-}
+// public GeoRoutedMsg getGeoRoutedMsg(String asn1String, String odeReceivedAt)
+// {
+
+// // byte[] asn1 = pojo1.getMetadata().getAsn1().getBytes();
+// ByteString asn1ByteString = ByteString.copyFrom(asn1String.getBytes());
+
+// GeoRoutedMsg geoRoutedMsg =
+// GeoRoutedMsg.newBuilder().setMsgBytes(asn1ByteString).build();
+
+// return geoRoutedMsg;
+// }
+// }
diff --git a/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/models/imp/ConfigData.java b/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/models/imp/ConfigData.java
index b89b51b..635c8fe 100644
--- a/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/models/imp/ConfigData.java
+++ b/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/models/imp/ConfigData.java
@@ -1,9 +1,11 @@
package us.dot.its.jpo.ode.s3.depositor.models.imp;
+import java.io.Serializable;
import java.math.BigDecimal;
+import java.net.URI;
import java.util.Objects;
-public class ConfigData {
+public class ConfigData implements Serializable {
private String configFilePath;
private String caCertPath;
@@ -11,23 +13,21 @@ public class ConfigData {
private String keyFilePath;
private String impVendor;
private NetworkType networkType;
- private String impIpHost;
- private int impPort;
+ private URI impMqttUri;
private String deviceID;
public ConfigData() {
}
public ConfigData(String configFilePath, String caCertPath, String clientCertPath, String keyFilePath,
- String impVendor, NetworkType networkType, String impIpHost, int impPort, String deviceID) {
+ String impVendor, NetworkType networkType, URI impMqttUri, String deviceID) {
this.configFilePath = configFilePath;
this.caCertPath = caCertPath;
this.clientCertPath = clientCertPath;
this.keyFilePath = keyFilePath;
this.impVendor = impVendor;
this.networkType = networkType;
- this.impIpHost = impIpHost;
- this.impPort = impPort;
+ this.impMqttUri = impMqttUri;
this.deviceID = deviceID;
}
@@ -79,20 +79,12 @@ public void setNetworkType(NetworkType networkType) {
this.networkType = networkType;
}
- public String getImpIpHost() {
- return this.impIpHost;
+ public URI getImpMqttUri() {
+ return this.impMqttUri;
}
- public void setImpIpHost(String impIpHost) {
- this.impIpHost = impIpHost;
- }
-
- public int getImpPort() {
- return this.impPort;
- }
-
- public void setImpPort(int impPort) {
- this.impPort = impPort;
+ public void setImpMqttUri(URI impMqttUri) {
+ this.impMqttUri = impMqttUri;
}
public String getDeviceID() {
@@ -133,13 +125,8 @@ public ConfigData networkType(NetworkType networkType) {
return this;
}
- public ConfigData impIpHost(String impIpHost) {
- setImpIpHost(impIpHost);
- return this;
- }
-
- public ConfigData impPort(int impPort) {
- setImpPort(impPort);
+ public ConfigData impMqttUri(URI impMqttUri) {
+ setImpMqttUri(impMqttUri);
return this;
}
@@ -162,23 +149,21 @@ public boolean equals(Object o) {
&& Objects.equals(keyFilePath, configData.keyFilePath)
&& Objects.equals(impVendor, configData.impVendor)
&& Objects.equals(networkType, configData.networkType)
- && Objects.equals(impIpHost, configData.impIpHost) && impPort == configData.impPort
- && Objects.equals(deviceID, configData.deviceID);
+ && Objects.equals(impMqttUri, configData.impMqttUri) && Objects.equals(deviceID, configData.deviceID);
}
@Override
public int hashCode() {
- return Objects.hash(configFilePath, caCertPath, clientCertPath, keyFilePath, impVendor, networkType, impIpHost,
- impPort, deviceID);
+ return Objects.hash(configFilePath, caCertPath, clientCertPath, keyFilePath, impVendor, networkType, impMqttUri,
+ deviceID);
}
@Override
public String toString() {
return "{" + " configFilePath='" + getConfigFilePath() + "'" + ", caCertPath='" + getCaCertPath() + "'"
+ ", clientCertPath='" + getClientCertPath() + "'" + ", keyFilePath='" + getKeyFilePath() + "'"
- + ", impVendor='" + getImpVendor() + "'" + ", networkType='" + getNetworkType() + "'" + ", impIpHost='"
- + getImpIpHost() + "'" + ", impPort='" + getImpPort() + "'" + ", deviceID='" + getDeviceID() + "'"
- + "}";
+ + ", impVendor='" + getImpVendor() + "'" + ", networkType='" + getNetworkType() + "'" + ", impMqttUri='"
+ + getImpMqttUri() + "'" + ", deviceID='" + getDeviceID() + "'" + "}";
}
}
\ No newline at end of file
diff --git a/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/threads/ServiceThreadController.java b/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/threads/ServiceThreadController.java
index 213c57f..6a15b09 100644
--- a/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/threads/ServiceThreadController.java
+++ b/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/threads/ServiceThreadController.java
@@ -4,13 +4,15 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
-
+import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import us.dot.its.jpo.ode.s3.depositor.DepositorProperties;
+import us.dot.its.jpo.ode.s3.depositor.imp.ImpDepositorService;
+import us.dot.its.jpo.ode.s3.depositor.imp.ImpMqttService;
import us.dot.its.jpo.ode.s3.depositor.imp.ImpRegistration;
import java.util.HashMap;
@@ -41,12 +43,24 @@ public ServiceThreadController(DepositorProperties depositorProperties) {
public void startImpServices(DepositorProperties depositorProperties) {
logger.info("Starting IMP services");
- // sm.startServices();
-
var impRegistrationService = new ImpRegistration(depositorProperties);
var response = impRegistrationService.registerClientPartner();
- var sm = new ServiceManager(new ServiceThreadFactory("ServiceManager"));
+ if (response != null) {
+ try {
+ var impMqttService = new ImpMqttService(depositorProperties);
+ var impDepositorService = new ImpDepositorService(depositorProperties, impMqttService);
+
+ var sm = new ServiceManager(new ServiceThreadFactory("ImpServiceManager"));
+ sm.submit(impDepositorService);
+
+ logger.info("IMP services started successfully");
+ } catch (MqttException e) {
+ logger.error("Failed to start IMP services", e);
+ }
+ } else {
+ logger.error("IMP registration failed, services will not be started");
+ }
}
diff --git a/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/utils/CommonUtils.java b/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/utils/CommonUtils.java
index 507382b..a6f273f 100644
--- a/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/utils/CommonUtils.java
+++ b/jpo-s3-depositor/src/main/java/us/dot/its/jpo/ode/s3/depositor/utils/CommonUtils.java
@@ -1,6 +1,35 @@
package us.dot.its.jpo.ode.s3.depositor.utils;
+import java.io.ByteArrayInputStream;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.KeyFactory;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.util.Base64;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import us.dot.its.jpo.ode.s3.depositor.models.imp.ConfigData;
+
public class CommonUtils {
+ private static final Logger logger = LoggerFactory.getLogger(CommonUtils.class);
+ private static ObjectMapper objectMapper = new ObjectMapper();
+
public static String getEnvironmentVariable(String variableName) {
String value = System.getenv(variableName);
return value;
@@ -15,4 +44,27 @@ public static String getEnvironmentVariable(String variableName, String defaultV
}
return value;
}
+
+ public static void writeToFile(String filePath, String content) {
+ try {
+ Files.createDirectories(Paths.get(filePath).getParent());
+ try (FileWriter writer = new FileWriter(filePath)) {
+ writer.write(content);
+ }
+ } catch (IOException e) {
+ logger.error("writeToFile IOException: " + e.getStackTrace());
+ }
+ }
+
+ public static ConfigData readConfigFile(String filePath) {
+ try {
+ String fileContent = new String(Files.readAllBytes(Paths.get(filePath)));
+ ConfigData configData = objectMapper.readValue(fileContent, ConfigData.class);
+
+ return configData;
+ } catch (IOException e) {
+ logger.error("writeToFile IOException: " + e.getStackTrace());
+ return null;
+ }
+ }
}
diff --git a/jpo-s3-depositor/src/main/resources/application.yaml b/jpo-s3-depositor/src/main/resources/application.yaml
index 73e744b..50b2dcd 100644
--- a/jpo-s3-depositor/src/main/resources/application.yaml
+++ b/jpo-s3-depositor/src/main/resources/application.yaml
@@ -6,8 +6,8 @@ version: ^project.version^
server.port: 8082
# Kafka properties
-spring.kafka.bootstrap-servers: ${KAFKA_BROKER_IP:localhost}:9092
-spring.kafka.consumer.group-id: ^project.artifactId^
+spring.kafka.bootstrap-servers: ${BOOTSTRAP_SERVER:localhost:9092}
+spring.kafka.consumer.group-id: jpo-s3-base-consumer-group
# logging.level.org.apache.kafka: INFO
logging.level:
diff --git a/mongo-connector/Dockerfile b/mongo-connector/Dockerfile
deleted file mode 100644
index 902101a..0000000
--- a/mongo-connector/Dockerfile
+++ /dev/null
@@ -1,11 +0,0 @@
-# Note that this image requires a version of Mongodb of 3.6 or later
-FROM confluentinc/cp-kafka-connect:6.1.9
-
-COPY connect_wait.sh /scripts/connect_wait.sh
-
-# Docs: https://www.mongodb.com/docs/kafka-connector/current/
-RUN confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.11.1
-# Docs: https://docs.confluent.io/platform/current/connect/transforms/overview.html
-RUN confluent-hub install --no-prompt confluentinc/connect-transforms:1.4.3
-
-CMD ["bash", "-c", "/scripts/connect_wait.sh"]
\ No newline at end of file
diff --git a/mongo-connector/connect_start.sh b/mongo-connector/connect_start.sh
deleted file mode 100755
index f5d0c30..0000000
--- a/mongo-connector/connect_start.sh
+++ /dev/null
@@ -1,123 +0,0 @@
-# bin/bash
-echo "------------------------------------------"
-echo "Kafka connector creation started."
-echo "------------------------------------------"
-
-declare -A OdeRawEncodedBSMJson=([name]="topic.OdeRawEncodedBSMJson" [collection]="OdeRawEncodedBSMJson"
- [convert_timestamp]=false [timefield]="" [use_key]=false [key]="" [add_timestamp]=true)
-declare -A OdeBsmJson=([name]="topic.OdeBsmJson" [collection]="OdeBsmJson"
- [convert_timestamp]=false [timefield]="" [use_key]=false [key]="" [add_timestamp]=true)
-
-declare -A OdeRawEncodedMAPJson=([name]="topic.OdeRawEncodedMAPJson" [collection]="OdeRawEncodedMAPJson"
- [convert_timestamp]=false [timefield]="" [use_key]=false [key]="" [add_timestamp]=true)
-declare -A OdeMapJson=([name]="topic.OdeMapJson" [collection]="OdeMapJson"
- [convert_timestamp]=false [timefield]="" [use_key]=false [key]="" [add_timestamp]=true)
-
-declare -A OdeRawEncodedSPATJson=([name]="topic.OdeRawEncodedSPATJson" [collection]="OdeRawEncodedSPATJson"
- [convert_timestamp]=false [timefield]="" [use_key]=false [key]="" [add_timestamp]=true)
-declare -A OdeSpatJson=([name]="topic.OdeSpatJson" [collection]="OdeSpatJson"
- [convert_timestamp]=false [timefield]="" [use_key]=false [key]="" [add_timestamp]=true)
-
-declare -A OdeRawEncodedTIMJson=([name]="topic.OdeRawEncodedTIMJson" [collection]="OdeRawEncodedTIMJson"
- [convert_timestamp]=false [timefield]="" [use_key]=false [key]="" [add_timestamp]=true)
-declare -A OdeTimJson=([name]="topic.OdeTimJson" [collection]="OdeTimJson"
- [convert_timestamp]=false [timefield]="" [use_key]=false [key]="" [add_timestamp]=true)
-
-declare -A OdeRawEncodedPsmJson=([name]="topic.OdeRawEncodedPsmJson" [collection]="OdeRawEncodedPsmJson"
- [convert_timestamp]=false [timefield]="" [use_key]=false [key]="" [add_timestamp]=true)
-declare -A OdePsmJson=([name]="topic.OdePsmJson" [collection]="OdePsmJson"
- [convert_timestamp]=false [timefield]="" [use_key]=false [key]="" [add_timestamp]=true)
-
-function createSink() {
- local -n topic=$1
- local name=${topic[name]}
- local collection=${topic[collection]}
- local timefield=${topic[timefield]}
- local convert_timestamp=${topic[convert_timestamp]}
- local use_key=${topic[use_key]}
- local key=${topic[key]}
- local add_timestamp=${topic[add_timestamp]}
-
- echo "Creating sink connector with parameters:"
- echo "name=$name"
- echo "collection=$collection"
- echo "timefield=$timefield"
- echo "convert_timestamp=$convert_timestamp"
- echo "use_key=$use_key"
- echo "key=$key"
- echo "add_timestamp=$add_timestamp"
-
- local connectConfig=' {
- "group.id":"connector-consumer",
- "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
- "tasks.max":3,
- "topics":"'$name'",
- "connection.uri":"'$MONGO_URI'",
- "database":"'$MONGO_DB_NAME'",
- "collection":"'$collection'",
- "key.converter":"org.apache.kafka.connect.storage.StringConverter",
- "key.converter.schemas.enable":false,
- "value.converter":"org.apache.kafka.connect.json.JsonConverter",
- "value.converter.schemas.enable":false,
- "errors.tolerance": "all",
- "mongo.errors.tolerance": "all",
- "errors.deadletterqueue.topic.name": "",
- "errors.log.enable": false,
- "errors.log.include.messages": false,
- "errors.deadletterqueue.topic.replication.factor": 0'
-
-
- if [ "$convert_timestamp" == true ]
- then
- local connectConfig=''$connectConfig',
- "transforms": "TimestampConverter",
- "transforms.TimestampConverter.field": "'$timefield'",
- "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
- "transforms.TimestampConverter.target.type": "Timestamp"'
- fi
-
- if [ "$add_timestamp" == true ]
- then
- local connectConfig=''$connectConfig',
- "transforms": "AddTimestamp,AddedTimestampConverter",
- "transforms.AddTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
- "transforms.AddTimestamp.timestamp.field": "recordGeneratedAt",
- "transforms.AddedTimestampConverter.field": "recordGeneratedAt",
- "transforms.AddedTimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
- "transforms.AddedTimestampConverter.target.type": "Timestamp"'
- fi
-
- if [ "$use_key" == true ]
- then
- local connectConfig=''$connectConfig',
- "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
- "document.id.strategy.partial.value.projection.list": "'$key'",
- "document.id.strategy.partial.value.projection.type": "AllowList",
- "document.id.strategy.overwrite.existing": true'
- fi
-
- local connectConfig=''$connectConfig' }'
-
- echo " Creating connector with Config : $connectConfig"
-
- curl -X PUT http://localhost:8083/connectors/MongoSink.${name}/config -H "Content-Type: application/json" -d "$connectConfig"
-}
-
-createSink OdeRawEncodedBSMJson
-createSink OdeBsmJson
-
-createSink OdeRawEncodedMAPJson
-createSink OdeMapJson
-
-createSink OdeRawEncodedSPATJson
-createSink OdeSpatJson
-
-createSink OdeRawEncodedTIMJson
-createSink OdeTimJson
-
-createSink OdeRawEncodedPsmJson
-createSink OdePsmJson
-
-echo "----------------------------------"
-echo "ODE Kafka connector creation complete!"
-echo "----------------------------------"
\ No newline at end of file
diff --git a/mongo-connector/connect_wait.sh b/mongo-connector/connect_wait.sh
deleted file mode 100755
index 421faef..0000000
--- a/mongo-connector/connect_wait.sh
+++ /dev/null
@@ -1,20 +0,0 @@
-#!/bin/bash
-
-/etc/confluent/docker/run &
-echo "Waiting for Kafka Connect to start listening on kafka-connect"
-while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -eq 000 ] ; do
- echo -e $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
- sleep 5
-done
-sleep 10
-echo -e "\n--\n+> Creating Kafka Connect MongoDB sink"
-
-# Check if connect_start.sh exists
-if [ ! -f /scripts/connect_start.sh ]; then
- echo "Error: connect_start.sh does not exist, starting without any connectors."
-else
- echo "Connect_start.sh exists, starting with connectors."
- bash /scripts/connect_start.sh
-fi
-
-sleep infinity
\ No newline at end of file
diff --git a/mongo-connector/create_indexes.js b/mongo-connector/create_indexes.js
deleted file mode 100755
index 0f4e207..0000000
--- a/mongo-connector/create_indexes.js
+++ /dev/null
@@ -1,142 +0,0 @@
-// Create indexes on all collections
-
-/*
-This is the second script responsible for configuring mongoDB automatically on startup.
-This script is responsible for creating collections, adding indexes and TTLs
-*/
-
-console.log("");
-console.log("Running create_indexes.js");
-
-const ttlInDays = process.env.MONGO_COLLECTION_TTL; // TTL in days
-const dbName = process.env.MONGO_DB_NAME; // TTL in days
-
-const expire_seconds = ttlInDays * 24 * 60 * 60;
-const retry_milliseconds = 10000;
-
-// name -> collection name
-// ttlField -> field to perform ttl on
-// timeField -> field to index for time queries
-
-const collections = [
- {name: "OdeBsmJson", ttlField: "recordGeneratedAt", timeField: "metadata.odeReceivedAt"},
- {name: "OdeRawEncodedBSMJson", ttlField: "recordGeneratedAt", timeField: "none"},
-
- {name: "OdeMapJson", ttlField: "recordGeneratedAt", timeField: "metadata.odeReceivedAt"},
- {name: "OdeRawEncodedMAPJson", ttlField: "recordGeneratedAt", timeField: "none"},
-
- {name: "OdeSpatJson", ttlField: "recordGeneratedAt", timeField: "metadata.odeReceivedAt"},
- {name: "OdeRawEncodedSPATJson", ttlField: "recordGeneratedAt", timeField: "none"},
-
- {name: "OdeTimJson", ttlField: "recordGeneratedAt", timeField: "metadata.odeReceivedAt"},
- {name: "OdeRawEncodedTIMJson", ttlField: "recordGeneratedAt", timeField: "none"},
-
- {name: "OdePsmJson", ttlField: "recordGeneratedAt", timeField: "metadata.odeReceivedAt"},
- {name: "OdeRawEncodedPsmJson", ttlField: "recordGeneratedAt", timeField: "none"},
-];
-
-try{
- db = db.getSiblingDB(dbName);
- print("Connected to the MongoDB instance.");
-} catch (error) {
- print("Error connecting to the MongoDB instance: " + error);
-}
-
-
-// Wait for the collections to exist in mongo before trying to create indexes on them
-let missing_collection_count;
-do {
- print("");
- try {
- missing_collection_count = 0;
- const collection_names = db.getCollectionNames();
- for (collection of collections) {
- console.log("Creating Indexes for Collection" + collection);
- // Create Collection if It doesn't exist
- let created = false;
- if(!collection_names.includes(collection.name)){
- created = createCollection(collection);
- // created = true;
- }else{
- created = true;
- }
-
- if(created){
- if (collection.hasOwnProperty('ttlField') && collection.ttlField !== 'none') {
- createTTLIndex(collection);
- }
-
-
- }else{
- missing_collection_count++;
- console.log("Collection " + collection.name + " does not exist yet");
- }
- }
- if (missing_collection_count > 0) {
- print("Waiting on " + missing_collection_count + " collections to be created...will try again in " + retry_milliseconds + " ms");
- sleep(retry_milliseconds);
- }
- } catch (err) {
- console.log("Error while setting up TTL indexes in collections");
- console.log(rs.status());
- console.error(err);
- sleep(retry_milliseconds);
- }
-} while (missing_collection_count > 0);
-
-console.log("Finished Creating All TTL indexes");
-
-
-function createCollection(collection){
- try {
- db.createCollection(collection.name);
- return true;
- } catch (err) {
- console.log("Unable to Create Collection: " + collection.name);
- console.log(err);
- return false;
- }
-}
-
-// Create TTL Indexes
-function createTTLIndex(collection) {
- if (ttlIndexExists(collection)) {
- console.log("TTL index already exists for " + collection.name);
- return;
- }
-
- const collection_name = collection.name;
- const timeField = collection.ttlField;
-
- console.log(
- "Creating TTL index for " + collection_name + " to remove documents after " +
- expire_seconds +
- " seconds"
- );
-
- try {
- var index_json = {};
- index_json[timeField] = 1;
- db[collection_name].createIndex(index_json,
- {expireAfterSeconds: expire_seconds}
- );
- console.log("Created TTL index for " + collection_name + " using the field: " + timeField + " as the timestamp");
- } catch (err) {
- var pattern_json = {};
- pattern_json[timeField] = 1;
- db.runCommand({
- "collMod": collection_name,
- "index": {
- keyPattern: pattern_json,
- expireAfterSeconds: expire_seconds
- }
- });
- console.log("Updated TTL index for " + collection_name + " using the field: " + timeField + " as the timestamp");
- }
-
-}
-
-
-function ttlIndexExists(collection) {
- return db[collection.name].getIndexes().find((idx) => idx.hasOwnProperty('expireAfterSeconds')) !== undefined;
-}
\ No newline at end of file