Skip to content

Commit

Permalink
updates with API connection and kafka testing
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael7371 committed Nov 6, 2024
1 parent 0bd0323 commit fc74fad
Show file tree
Hide file tree
Showing 18 changed files with 311 additions and 382 deletions.
Binary file modified jpo-s3-depositor/lib/jpo-ode-common-3.0.0-SNAPSHOT.jar
Binary file not shown.
Binary file modified jpo-s3-depositor/lib/jpo-ode-core-3.0.0-SNAPSHOT.jar
Binary file not shown.
Binary file modified jpo-s3-depositor/lib/jpo-ode-plugins-3.0.0-SNAPSHOT.jar
Binary file not shown.
15 changes: 15 additions & 0 deletions jpo-s3-depositor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<dependency>
<groupId>io.github.hakky54</groupId>
<artifactId>sslcontext-kickstart-for-pem</artifactId>
<version>8.3.7</version>
</dependency>

<dependency>
<groupId>org.springdoc</groupId>
Expand Down Expand Up @@ -124,6 +134,11 @@
<artifactId>protobuf-java-util</artifactId>
<version>${protobuf.version}</version>
</dependency>
<!-- <dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-jsonSchema</artifactId>
<version>2.15.0</version>
</dependency> -->


<!-- JPO-ODE Module Dependencies -->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package us.dot.its.jpo.ode.s3.depositor.gen;

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;

import us.dot.its.jpo.ode.model.OdeBsmData;
import us.dot.its.jpo.ode.model.OdeBsmMetadata;
import us.dot.its.jpo.ode.model.OdeBsmPayload;
import us.dot.its.jpo.ode.model.OdeData;
import us.dot.its.jpo.ode.model.OdeMsgMetadata;
import us.dot.its.jpo.ode.model.OdeMsgPayload;

import static com.fasterxml.jackson.annotation.JsonTypeInfo.*;

public class FullOdeBsmData extends OdeData {

private static final long serialVersionUID = 4944935387116447760L;

public FullOdeBsmData() {
super();
}

public FullOdeBsmData(OdeBsmMetadata metadata, OdeBsmPayload payload) {
super(metadata, payload);
}

@Override
@JsonTypeInfo(use = Id.CLASS, include = As.EXISTING_PROPERTY, defaultImpl = OdeBsmMetadata.class)
public void setMetadata(OdeMsgMetadata metadata) {
super.setMetadata(metadata);
}

@Override
@JsonTypeInfo(use = Id.CLASS, include = As.EXISTING_PROPERTY, defaultImpl = OdeBsmPayload.class)
public void setPayload(OdeMsgPayload payload) {
super.setPayload(payload);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package us.dot.its.jpo.ode.s3.depositor.gen;

import org.springframework.stereotype.Controller;

import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import jakarta.annotation.PostConstruct;
import us.dot.its.jpo.ode.model.OdeBsmData;

@Controller
public class GenOdeSchemas {

// @PostConstruct
// public void generateSchema() {
// HyperSchemaFactoryWrapper bsm = new HyperSchemaFactoryWrapper();
// ObjectMapper mapper = new ObjectMapper();
// try {
// mapper.acceptJsonFormatVisitor(FullOdeBsmData.class, bsm);
// JsonSchema bsmSchema = bsm.finalSchema();
// String schemaJson =
// mapper.writerWithDefaultPrettyPrinter().writeValueAsString(bsmSchema);
// System.out.println(schemaJson);
// } catch (JsonMappingException e) {
// e.printStackTrace();
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import us.dot.its.jpo.ode.depositor.GeoRoutedMsg;
import us.dot.its.jpo.ode.depositor.GeoRoutedMsgOrBuilder;
import us.dot.its.jpo.ode.depositor.GeoRoutedMsg_PB2;
// import us.dot.its.jpo.ode.depositor.GeoRoutedMsg;
// import us.dot.its.jpo.ode.depositor.GeoRoutedMsgOrBuilder;
// import us.dot.its.jpo.ode.depositor.GeoRoutedMsg_PB2;
import us.dot.its.jpo.ode.model.OdeBsmData;
import us.dot.its.jpo.ode.model.OdeBsmMetadata;
import us.dot.its.jpo.ode.model.OdeBsmPayload;
Expand All @@ -17,34 +17,49 @@
import us.dot.its.jpo.ode.model.OdeTimPayload;
import us.dot.its.jpo.ode.plugin.j2735.OdeTravelerInformationMessage;
import us.dot.its.jpo.ode.s3.depositor.DateJsonMapper;
import us.dot.its.jpo.ode.s3.depositor.DepositorProperties;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import us.dot.its.jpo.ode.model.OdeMsgMetadata.GeneratedBy;

// @Service
public class ImpDepositorService {
@Service
public class ImpDepositorService implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ImpDepositorService.class);

private final ObjectMapper mapper = DateJsonMapper.getInstance();
private final ImpMqttService mqttService;
private final DepositorProperties properties;

// TODO: Use the filtered TMC topic instead of the mixed TIM topic
@KafkaListener(topics = "topic.OdeTimJson", groupId = "jpo-s3-depositor", concurrency = "${listen.concurrency:3}")
public void listenTopic1(String message) {
try {
OdeTimData timMsg = mapper.readValue(message, OdeTimData.class);
OdeTimMetadata timMetadata = (OdeTimMetadata) timMsg.getMetadata();

if (timMetadata.getRecordGeneratedBy().equals(GeneratedBy.T)) {
public ImpDepositorService(DepositorProperties properties, ImpMqttService mqttService) {
this.mqttService = mqttService;
this.properties = properties;
}

}
@Override
public void run() {
// Logic to start the service
logger.info("ImpDepositorService is running");
// You can add any initialization logic here if needed
}

OdeTimPayload timPayload = (OdeTimPayload) timMsg.getPayload();
OdeTravelerInformationMessage timData = (OdeTravelerInformationMessage) timPayload.getData();
timData.get
// TODO: Use the filtered TMC topic instead of the mixed TIM topic
// @KafkaListener(topics = "topic.OdeTimJsonTMCFiltered", groupId =
// "jpo-s3-depositor", concurrency = "${listen.concurrency:1}")
// public void tmcTimListener(String message) {
// try {
// OdeTimData timMsg = mapper.readValue(message, OdeTimData.class);
// OdeTimMetadata timMetadata = (OdeTimMetadata) timMsg.getMetadata();
// } catch (Exception e) {
// // Handle exception
// }
// }

timPayload.getTim().getRecord().get(0).getFrameType();
@KafkaListener(topics = "topic.OdeBsmJson", groupId = "jpo-s3-depositor", concurrency = "${listen.concurrency:1}")
public void bsmListener(String message) {
try {
OdeBsmData msg = mapper.readValue(message, OdeBsmData.class);

logger.info("Received message: " + pojo1.getMetadata().getAsn1());
} catch (Exception e) {
// Handle exception
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package us.dot.its.jpo.ode.s3.depositor.imp;

import java.io.File;
import java.nio.file.Paths;

import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.X509ExtendedKeyManager;
import javax.net.ssl.X509ExtendedTrustManager;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import nl.altindag.ssl.SSLFactory;
import nl.altindag.ssl.pem.util.PemUtils;
import us.dot.its.jpo.ode.s3.depositor.DepositorProperties;
import us.dot.its.jpo.ode.s3.depositor.models.imp.ConfigData;
import us.dot.its.jpo.ode.s3.depositor.utils.CommonUtils;

@Service
public class ImpMqttService {
private static final Logger logger = LoggerFactory.getLogger(ImpMqttService.class);
private MqttClient client;

public ImpMqttService(DepositorProperties properties) throws MqttException {
ConfigData impConfig = CommonUtils.readConfigFile(properties.getImpCertPath() + "/config.json");

String uri = impConfig.getImpMqttUri().toString().replace("mqtt://", "ssl://");

client = new MqttClient(uri, impConfig.getDeviceID());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setSocketFactory(createSocketFactory(impConfig.getCaCertPath(), impConfig.getClientCertPath(),
impConfig.getKeyFilePath()));
client.connect(options);
}

public static SSLSocketFactory createSocketFactory(String caCertPath, String clientCertPath,
String privateKeyPath) {
// Convert to absolute paths
String absoluteCaCertPath = Paths.get(caCertPath).toAbsolutePath().toString();
String absoluteClientCertPath = Paths.get(clientCertPath).toAbsolutePath().toString();
String absolutePrivateKeyPath = Paths.get(privateKeyPath).toAbsolutePath().toString();

logger.info("CA Cert Path: {}", absoluteCaCertPath);
logger.info("Client Cert Path: {}", absoluteClientCertPath);
logger.info("Private Key Path: {}", absolutePrivateKeyPath);

// Check if files exist
if (!new File(absoluteCaCertPath).exists()) {
throw new IllegalArgumentException("CA Certificate file not found at path: " + absoluteCaCertPath);
}
if (!new File(absoluteClientCertPath).exists()) {
throw new IllegalArgumentException("Client Certificate file not found at path: " + absoluteClientCertPath);
}
if (!new File(absolutePrivateKeyPath).exists()) {
throw new IllegalArgumentException("Private Key file not found at path: " + absolutePrivateKeyPath);
}

X509ExtendedKeyManager keyManager = PemUtils.loadIdentityMaterial(Paths.get(absoluteClientCertPath),
Paths.get(absolutePrivateKeyPath));
X509ExtendedTrustManager trustManager = PemUtils.loadTrustMaterial(Paths.get(absoluteCaCertPath));

var sslFactory = SSLFactory.builder().withIdentityMaterial(keyManager).withTrustMaterial(trustManager).build();

var sslSocketFactory = sslFactory.getSslSocketFactory();
return sslSocketFactory;
}

public void publish(String topic, String messageContent) throws MqttException {
MqttMessage message = new MqttMessage();
message.setPayload(messageContent.getBytes());
client.publish(topic, message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import us.dot.its.jpo.ode.s3.depositor.DateJsonMapper;
import us.dot.its.jpo.ode.s3.depositor.DepositorProperties;
import us.dot.its.jpo.ode.s3.depositor.utils.CommonUtils;
import us.dot.its.jpo.ode.s3.depositor.models.imp.AuthToken;
import us.dot.its.jpo.ode.s3.depositor.models.imp.AuthTokenRequest;
import us.dot.its.jpo.ode.s3.depositor.models.imp.ClientCompleteResponse;
Expand Down Expand Up @@ -65,9 +66,9 @@ public ConfigData registerClientPartner() {

ClientRegistrationResponse registrationResponse = register(token);

writeToFile(caCertPath, registrationResponse.getCertificate().getCaPem());
writeToFile(certPath, registrationResponse.getCertificate().getCertPem());
writeToFile(keyPath, registrationResponse.getCertificate().getKeyPem());
CommonUtils.writeToFile(caCertPath, registrationResponse.getCertificate().getCaPem());
CommonUtils.writeToFile(certPath, registrationResponse.getCertificate().getCertPem());
CommonUtils.writeToFile(keyPath, registrationResponse.getCertificate().getKeyPem());

deviceID = registrationResponse.getDeviceID();
} else {
Expand All @@ -80,14 +81,12 @@ public ConfigData registerClientPartner() {
ClientConnectionResponse connectionResponse = connection(token, deviceID);

URI uri = new URI(connectionResponse.getMqttURL());
String host = uri.getHost();
int port = uri.getPort();
configData = new ConfigData(configPath, caCertPath, certPath, keyPath, properties.getImpVendor(),
properties.getImpNetworkType(), host, port, deviceID);
properties.getImpNetworkType(), uri, deviceID);

String configDataJson = objectMapper.writeValueAsString(configData);

writeToFile(configPath, configDataJson);
CommonUtils.writeToFile(configPath, configDataJson);

return configData;
} catch (Exception e) {
Expand Down Expand Up @@ -150,17 +149,6 @@ private ClientConnectionResponse connection(String token, String deviceID) {
}
}

private void writeToFile(String filePath, String content) {
try {
Files.createDirectories(Paths.get(filePath).getParent());
try (FileWriter writer = new FileWriter(filePath)) {
writer.write(content);
}
} catch (IOException e) {
logger.error("writeToFile error: " + e.getStackTrace());
}
}

private boolean validRegistration(String configPath) {
boolean valid = false;
File file = new File(configPath);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
package us.dot.its.jpo.ode.s3.depositor.imp;
// package us.dot.its.jpo.ode.s3.depositor.imp;

import us.dot.its.jpo.ode.depositor.GeoRoutedMsg;
// import com.google.protobuf.ByteString;

public class ImpUtil {
// import us.dot.its.jpo.ode.depositor.GeoRoutedMsg;
// import us.dot.its.jpo.ode.depositor.GeoRoutedMsgOrBuilder;

public GeoRoutedMsg getGeoRoutedMsg(String asn1String, String odeReceivedAt, ) {

byte[] asn1 = pojo1.getMetadata().getAsn1().getBytes();
ByteString asn1ByteString = ByteString.copyFrom(asn1);
// public class ImpUtil {

GeoRoutedMsgOrBuilder geoRoutedMsg = GeoRoutedMsg.newBuilder().setMsgBytes(asn1ByteString).set;
}
}
// public GeoRoutedMsg getGeoRoutedMsg(String asn1String, String odeReceivedAt)
// {

// // byte[] asn1 = pojo1.getMetadata().getAsn1().getBytes();
// ByteString asn1ByteString = ByteString.copyFrom(asn1String.getBytes());

// GeoRoutedMsg geoRoutedMsg =
// GeoRoutedMsg.newBuilder().setMsgBytes(asn1ByteString).build();

// return geoRoutedMsg;
// }
// }
Loading

0 comments on commit fc74fad

Please sign in to comment.