diff --git a/.vscode/settings.json b/.vscode/settings.json
new file mode 100644
index 0000000..0b4f2bb
--- /dev/null
+++ b/.vscode/settings.json
@@ -0,0 +1,15 @@
+{
+ "java.test.config": {
+ "env": {
+ "TEST_VARIABLE": "testValue",
+ "TEST_VARIABLE_EMPTY": "",
+ "AWS_ACCESS_KEY_ID": "testAccessKey",
+ "AWS_SECRET_ACCESS_KEY": "testSecretKey",
+ "AWS_EXPIRATION": "2020-01-01 00:00:00",
+ "AWS_SESSION_TOKEN": "testSessionToken",
+ "API_ENDPOINT": "testApiEndpoint",
+ "CONFLUENT_KEY": "testConfluentKey",
+ "CONFLUENT_SECRET": "testConfluentSecret",
+ }
+ },
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index c29687b..6b52fa1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,16 +10,41 @@
21
21
+ 1.49
usdot-jpo-ode
+
+ -javaagent:${user.home}/.m2/repository/org/jmockit/jmockit/${jmockit.version}/jmockit-${jmockit.version}.jar
+
+ 0.8.11
+
+ org.jmockit
+ jmockit
+ ${jmockit.version}
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ 5.9.3
+ test
+
junit
junit
- 4.13.1
+ 4.13.2
+ test
+
+
+ org.mockito
+ mockito-core
+ 3.3.3
test
+
+
org.apache.kafka
@@ -101,6 +126,36 @@
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 3.2.5
+
+ -javaagent:${user.home}/.m2/repository/org/jmockit/jmockit/${jmockit.version}/jmockit-${jmockit.version}.jar -Xshare:off
+
+ ${loader.path}
+ ${project.build.directory}
+
+
+ testValue
+
+ testAccessKey
+ testSecretKey
+ testSessionToken
+ 2020-01-01 00:00:00
+ testApiEndpoint
+ testConfluentKey
+ testConfluentSecret
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 3.2.5
+
+
+
diff --git a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java
index 1f62e90..0e98b02 100644
--- a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java
+++ b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java
@@ -26,10 +26,24 @@
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentials;
@@ -53,19 +67,6 @@
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
public class AwsDepositor {
private final Logger logger = LoggerFactory.getLogger(AwsDepositor.class);
private final long CONSUMER_POLL_TIMEOUT_MS = 60000;
@@ -79,6 +80,8 @@ public class AwsDepositor {
private String keyName;
private boolean waitOpt;
+ private boolean runDepositor = true;
+
private String K_AWS_ACCESS_KEY_ID;
private String K_AWS_SECRET_ACCESS_KEY;
private String K_AWS_SESSION_TOKEN;
@@ -98,45 +101,12 @@ public class AwsDepositor {
public static void main(String[] args) throws Exception {
AwsDepositor awsDepositor = new AwsDepositor();
- awsDepositor.run(args);
+ awsDepositor.run();
}
- public void run(String[] args) throws Exception {
- endpoint = getEnvironmentVariable("BOOTSTRAP_SERVER", "");
- topic = getEnvironmentVariable("DEPOSIT_TOPIC", "");
- group = getEnvironmentVariable("DEPOSIT_GROUP", "");
- destination = getEnvironmentVariable("DESTINATION", "firehose");
- if (System.getenv("WAIT") != null && System.getenv("WAIT") != "")
- { waitOpt = true; }
- else
- { waitOpt = false; }
-
- // S3 properties
- bucketName = getEnvironmentVariable("DEPOSIT_BUCKET_NAME", "");
- awsRegion = getEnvironmentVariable("REGION", "us-east-1");
- keyName = getEnvironmentVariable("DEPOSIT_KEY_NAME", "");
-
- K_AWS_ACCESS_KEY_ID = getEnvironmentVariable("AWS_ACCESS_KEY_ID", "AccessKeyId");
- K_AWS_SECRET_ACCESS_KEY = getEnvironmentVariable("AWS_SECRET_ACCESS_KEY", "SecretAccessKey");
- K_AWS_SESSION_TOKEN = getEnvironmentVariable("AWS_SESSION_TOKEN", "SessionToken");
- K_AWS_EXPIRATION = getEnvironmentVariable("AWS_EXPIRATION", "Expiration");
- API_ENDPOINT = getEnvironmentVariable("API_ENDPOINT", "");
- HEADER_Accept = getEnvironmentVariable("HEADER_ACCEPT", "application/json");
- HEADER_X_API_KEY = getEnvironmentVariable("HEADER_X_API_KEY", "");
-
- logger.debug("Bucket name: {}", bucketName);
- logger.debug("AWS Region: {}", awsRegion);
- logger.debug("Key name: {}", keyName);
- logger.debug("Kafka topic: {}", topic);
- logger.debug("Destination: {}", destination);
- logger.debug("Wait: {}", waitOpt);
- logger.debug("AWS_ACCESS_KEY_ID: {}", K_AWS_ACCESS_KEY_ID);
- logger.debug("AWS_SECRET_ACCESS_KEY: {}", K_AWS_SECRET_ACCESS_KEY);
- logger.debug("AWS_SESSION_TOKEN: {}", K_AWS_SESSION_TOKEN);
- logger.debug("AWS_EXPIRATION: {}", K_AWS_EXPIRATION);
- logger.debug("API_ENDPOINT: {}", API_ENDPOINT);
- logger.debug("HEADER_Accept: {}", HEADER_Accept);
- logger.debug("HEADER_X_API_KEY: {}", HEADER_X_API_KEY);
+ public void run() throws Exception {
+ // Pull in environment variables
+ depositorSetup();
if (API_ENDPOINT.length() > 0) {
JSONObject profile = generateAWSProfile();
@@ -187,8 +157,8 @@ public void run(String[] args) throws Exception {
}
- while (true) {
- KafkaConsumer stringConsumer = new KafkaConsumer(props);
+ while (getRunDepositor()) {
+ KafkaConsumer stringConsumer = getKafkaConsumer(props);
logger.debug("Subscribing to topic " + topic);
stringConsumer.subscribe(Arrays.asList(topic));
@@ -196,7 +166,7 @@ public void run(String[] args) throws Exception {
try {
boolean gotMessages = false;
- while (true) {
+ while (getRunDepositor()) {
ConsumerRecords records = stringConsumer.poll(Duration.ofMillis(CONSUMER_POLL_TIMEOUT_MS));
if (records != null && !records.isEmpty()) {
for (ConsumerRecord record : records) {
@@ -234,7 +204,7 @@ public void run(String[] args) throws Exception {
}
}
- private static void addConfluentProperties(Properties props) {
+ static void addConfluentProperties(Properties props) {
props.put("ssl.endpoint.identification.algorithm", "https");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
@@ -250,7 +220,7 @@ private static void addConfluentProperties(Properties props) {
}
}
- private void depositToFirehose(AmazonKinesisFirehoseAsync firehose, ConsumerRecord record)
+ void depositToFirehose(AmazonKinesisFirehoseAsync firehose, ConsumerRecord record)
throws InterruptedException, ExecutionException, IOException {
try {
// IMPORTANT!!!
@@ -261,9 +231,8 @@ private void depositToFirehose(AmazonKinesisFirehoseAsync firehose, ConsumerReco
ByteBuffer data = convertStringToByteBuffer(msg, Charset.defaultCharset());
// Check the expiration time for the profile credentials
- LocalDateTime current_datetime = LocalDateTime.now();
- LocalDateTime expiration_datetime = LocalDateTime.parse(AWS_EXPIRATION,
- DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
+ LocalDateTime current_datetime = getLocalDateTime();
+ LocalDateTime expiration_datetime = getExpirationDateTime();
System.out.println();
if (expiration_datetime.isBefore(current_datetime) && API_ENDPOINT.length() > 0) {
// If credential is expired, generate aws credentials
@@ -307,7 +276,7 @@ private void depositToFirehose(AmazonKinesisFirehoseAsync firehose, ConsumerReco
}
}
- private void depositToS3(AmazonS3 s3, ConsumerRecord record) throws IOException {
+ void depositToS3(AmazonS3 s3, ConsumerRecord record) throws IOException {
try {
long time = System.currentTimeMillis();
String timeStamp = Long.toString(time);
@@ -346,7 +315,7 @@ private void depositToS3(AmazonS3 s3, ConsumerRecord record) thr
}
}
- private void depositToGCS(Storage gcsStorage, String depositBucket, ConsumerRecord record) {
+ void depositToGCS(Storage gcsStorage, String depositBucket, ConsumerRecord record) {
String recordValue = record.value();
Bucket bucket = gcsStorage.get(depositBucket);
byte[] bytes = recordValue.getBytes(Charset.defaultCharset());
@@ -362,7 +331,7 @@ private void depositToGCS(Storage gcsStorage, String depositBucket, ConsumerReco
}
}
- private AmazonKinesisFirehoseAsync buildFirehoseClient(String awsRegion) {
+ AmazonKinesisFirehoseAsync buildFirehoseClient(String awsRegion) {
// Default is to deposit to Kinesis/Firehose, override via .env
// variables if S3 deposit desired
logger.debug("=============================");
@@ -372,7 +341,7 @@ private AmazonKinesisFirehoseAsync buildFirehoseClient(String awsRegion) {
return AmazonKinesisFirehoseAsyncClientBuilder.standard().withRegion(awsRegion).build();
}
- private AmazonS3 createS3Client(String awsRegion) {
+ AmazonS3 createS3Client(String awsRegion) {
logger.debug("============== ========");
logger.debug("Connecting to Amazon S3");
logger.debug("=======================");
@@ -397,7 +366,7 @@ public ByteBuffer convertStringToByteBuffer(String msg, Charset charset) {
return ByteBuffer.wrap(msg.getBytes(charset));
}
- private File createSampleFile(String json) throws IOException {
+ File createSampleFile(String json) throws IOException {
File file = File.createTempFile("aws-java-sdk-", ".json");
file.deleteOnExit();
@@ -408,8 +377,8 @@ private File createSampleFile(String json) throws IOException {
return file;
}
- private JSONObject generateAWSProfile() throws IOException {
- CloseableHttpClient client = HttpClients.createDefault();
+ JSONObject generateAWSProfile() throws IOException {
+ CloseableHttpClient client = getHttpClient();
HttpPost httpPost = new HttpPost(API_ENDPOINT);
JSONObject jsonResult = new JSONObject();
String json = "{}";
@@ -435,7 +404,9 @@ private JSONObject generateAWSProfile() throws IOException {
return jsonResult;
}
- private static String getEnvironmentVariable(String variableName, String defaultValue) {
+ static String getEnvironmentVariable(String variableName, String defaultValue) {
+ // get all environment variables
+ Map env = System.getenv();
String value = System.getenv(variableName);
if (value == null || value.equals("")) {
System.out.println("Something went wrong retrieving the environment variable " + variableName);
@@ -445,4 +416,62 @@ private static String getEnvironmentVariable(String variableName, String default
return value;
}
+ CloseableHttpClient getHttpClient() {
+ return HttpClients.createDefault();
+ }
+
+ LocalDateTime getLocalDateTime() {
+ return LocalDateTime.now();
+ }
+
+ LocalDateTime getExpirationDateTime() {
+ return LocalDateTime.parse(K_AWS_EXPIRATION,
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
+ }
+
+ void depositorSetup() {
+ endpoint = getEnvironmentVariable("BOOTSTRAP_SERVER", "");
+ topic = getEnvironmentVariable("DEPOSIT_TOPIC", "");
+ group = getEnvironmentVariable("DEPOSIT_GROUP", "");
+ destination = getEnvironmentVariable("DESTINATION", "firehose");
+ if (System.getenv("WAIT") != null && System.getenv("WAIT") != "")
+ { waitOpt = true; }
+ else
+ { waitOpt = false; }
+
+ // S3 properties
+ bucketName = getEnvironmentVariable("DEPOSIT_BUCKET_NAME", "");
+ awsRegion = getEnvironmentVariable("REGION", "us-east-1");
+ keyName = getEnvironmentVariable("DEPOSIT_KEY_NAME", "");
+
+ K_AWS_ACCESS_KEY_ID = getEnvironmentVariable("AWS_ACCESS_KEY_ID", "AccessKeyId");
+ K_AWS_SECRET_ACCESS_KEY = getEnvironmentVariable("AWS_SECRET_ACCESS_KEY", "SecretAccessKey");
+ K_AWS_SESSION_TOKEN = getEnvironmentVariable("AWS_SESSION_TOKEN", "SessionToken");
+ K_AWS_EXPIRATION = getEnvironmentVariable("AWS_EXPIRATION", "Expiration");
+ API_ENDPOINT = getEnvironmentVariable("API_ENDPOINT", "");
+ HEADER_Accept = getEnvironmentVariable("HEADER_ACCEPT", "application/json");
+ HEADER_X_API_KEY = getEnvironmentVariable("HEADER_X_API_KEY", "");
+
+ logger.debug("Bucket name: {}", bucketName);
+ logger.debug("AWS Region: {}", awsRegion);
+ logger.debug("Key name: {}", keyName);
+ logger.debug("Kafka topic: {}", topic);
+ logger.debug("Destination: {}", destination);
+ logger.debug("Wait: {}", waitOpt);
+ logger.debug("AWS_ACCESS_KEY_ID: {}", K_AWS_ACCESS_KEY_ID);
+ logger.debug("AWS_SECRET_ACCESS_KEY: {}", K_AWS_SECRET_ACCESS_KEY);
+ logger.debug("AWS_SESSION_TOKEN: {}", K_AWS_SESSION_TOKEN);
+ logger.debug("AWS_EXPIRATION: {}", K_AWS_EXPIRATION);
+ logger.debug("API_ENDPOINT: {}", API_ENDPOINT);
+ logger.debug("HEADER_Accept: {}", HEADER_Accept);
+ logger.debug("HEADER_X_API_KEY: {}", HEADER_X_API_KEY);
+ }
+
+ boolean getRunDepositor() {
+ return runDepositor;
+ }
+
+ KafkaConsumer getKafkaConsumer(Properties props) {
+ return new KafkaConsumer<>(props);
+ }
}
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/AddConfluentPropertiesTest.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/AddConfluentPropertiesTest.java
new file mode 100644
index 0000000..c3862b1
--- /dev/null
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/AddConfluentPropertiesTest.java
@@ -0,0 +1,19 @@
+package us.dot.its.jpo.ode.aws.depositor;
+
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import org.junit.jupiter.api.Test;
+
+public class AddConfluentPropertiesTest {
+ @Test
+ public void testAddConfluentProperties() {
+ Properties props = new Properties();
+ AwsDepositor.addConfluentProperties(props);
+
+ assertEquals("https", props.getProperty("ssl.endpoint.identification.algorithm"));
+ assertEquals("SASL_SSL", props.getProperty("security.protocol"));
+ assertEquals("PLAIN", props.getProperty("sasl.mechanism"));
+ assertEquals("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"testConfluentKey\" password=\"testConfluentSecret\";" , props.getProperty("sasl.jaas.config"));
+ }
+}
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/BuildFirehoseClientTest.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/BuildFirehoseClientTest.java
new file mode 100644
index 0000000..1683c59
--- /dev/null
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/BuildFirehoseClientTest.java
@@ -0,0 +1,18 @@
+package us.dot.its.jpo.ode.aws.depositor;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import org.junit.jupiter.api.Test;
+
+import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseAsync;
+
+public class BuildFirehoseClientTest {
+ @Test
+ public void testBuildFirehoseClient() {
+ AwsDepositor awsDepositor = new AwsDepositor();
+ String awsRegion = "us-east-1";
+
+ AmazonKinesisFirehoseAsync firehose = awsDepositor.buildFirehoseClient(awsRegion);
+
+ assertNotNull(firehose);
+ }
+}
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/ConvertStringToByteBufferTest.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/ConvertStringToByteBufferTest.java
new file mode 100644
index 0000000..f3ea403
--- /dev/null
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/ConvertStringToByteBufferTest.java
@@ -0,0 +1,20 @@
+package us.dot.its.jpo.ode.aws.depositor;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import org.junit.jupiter.api.Test;
+
+public class ConvertStringToByteBufferTest {
+ @Test
+ public void testConvertStringToByteBuffer() {
+ AwsDepositor awsDepositor = new AwsDepositor();
+ String input = "Test";
+ ByteBuffer expected = ByteBuffer.wrap(input.getBytes(StandardCharsets.UTF_8));
+
+ ByteBuffer result = awsDepositor.convertStringToByteBuffer(input, StandardCharsets.UTF_8);
+
+ assertEquals(expected, result);
+ }
+}
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/CreateS3ClientTest.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/CreateS3ClientTest.java
new file mode 100644
index 0000000..79bb555
--- /dev/null
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/CreateS3ClientTest.java
@@ -0,0 +1,25 @@
+package us.dot.its.jpo.ode.aws.depositor;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import org.junit.jupiter.api.Test;
+
+import com.amazonaws.services.s3.AmazonS3;
+
+public class CreateS3ClientTest {
+
+ @Test
+ public void testCreateS3Client() {
+ AwsDepositor awsDepositor = new AwsDepositor();
+ AmazonS3 s3Client = awsDepositor.createS3Client("us-east-1");
+ assertNotNull(s3Client);
+ }
+
+ @Test
+ public void testCreateS3Client_InvalidCredentials() {
+ AwsDepositor awsDepositor = new AwsDepositor();
+ assertThrows(IllegalArgumentException.class, () -> {
+ awsDepositor.createS3Client("invalid-region");
+ });
+ }
+}
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/CreateSampleFileTest.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/CreateSampleFileTest.java
new file mode 100644
index 0000000..6570fd4
--- /dev/null
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/CreateSampleFileTest.java
@@ -0,0 +1,23 @@
+package us.dot.its.jpo.ode.aws.depositor;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import org.junit.jupiter.api.Test;
+
+public class CreateSampleFileTest {
+ @Test
+ public void testCreateSampleFile() throws IOException {
+ AwsDepositor awsDepositor = new AwsDepositor();
+ String json = "{\"key\": \"value\"}";
+ File file = awsDepositor.createSampleFile(json);
+ assertNotNull(file);
+ assertTrue(file.exists());
+ assertTrue(file.isFile());
+ assertEquals(".json", file.getName().substring(file.getName().lastIndexOf(".")));
+ file.delete();
+ }
+}
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/DepositToFirehoseTest.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/DepositToFirehoseTest.java
new file mode 100644
index 0000000..5fd06e8
--- /dev/null
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/DepositToFirehoseTest.java
@@ -0,0 +1,75 @@
+package us.dot.its.jpo.ode.aws.depositor;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.time.LocalDateTime;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.json.JSONObject;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseAsync;
+import com.amazonaws.services.kinesisfirehose.model.PutRecordRequest;
+
+public class DepositToFirehoseTest {
+
+ @Test
+ public void testDepositToFirehose() throws InterruptedException, ExecutionException, IOException {
+
+ // Create a mock AmazonKinesisFirehoseAsync instance
+ AmazonKinesisFirehoseAsync firehose = mock(AmazonKinesisFirehoseAsync.class);
+
+ // Create a mock ConsumerRecord
+ ConsumerRecord mockRecord = mock(ConsumerRecord.class);
+ when(mockRecord.value()).thenReturn("Test Record");
+
+ AwsDepositor depositor = spy(new AwsDepositor());
+ doReturn(LocalDateTime.of(2024, 6, 26, 12, 0, 0)).when(depositor).getLocalDateTime();
+ doReturn(LocalDateTime.of(2024, 6, 26, 10, 0, 0)).when(depositor).getExpirationDateTime();
+
+ JSONObject generateAwsReturnVal = new JSONObject();
+ generateAwsReturnVal.put("testAccessKey", "test-access-key-id");
+ generateAwsReturnVal.put("testSecretKey", "test-secret-key");
+ generateAwsReturnVal.put("testSessionToken", "test-token");
+ generateAwsReturnVal.put("2020-01-01 00:00:00", "test-expiration");
+
+ doReturn(generateAwsReturnVal).when(depositor).generateAWSProfile();
+
+ // pull in necessary environment variables
+ depositor.depositorSetup();
+
+ // Call the depositToFirehose method
+ depositor.depositToFirehose(firehose, mockRecord);
+
+ // Verify that the putRecordAsync method was called on the mock AmazonKinesisFirehoseAsync instance
+ ArgumentCaptor putRecordRequestCaptor = ArgumentCaptor.forClass(PutRecordRequest.class);
+ verify(firehose).putRecordAsync(putRecordRequestCaptor.capture());
+
+ // Assert PutRecordRequest value is as expected
+ PutRecordRequest putRecordRequestResult = putRecordRequestCaptor.getValue();
+ assertEquals("Test Record\n", convertByteBufferToString(putRecordRequestResult.getRecord().getData()));
+ }
+
+ @Test
+ public void testGetExpirationDateTime() {
+ AwsDepositor depositor = new AwsDepositor();
+ depositor.depositorSetup();
+ LocalDateTime result = depositor.getExpirationDateTime();
+ assertEquals(LocalDateTime.of(2020, 01, 01, 0, 0, 0), result);
+ }
+
+ private String convertByteBufferToString(ByteBuffer buffer) {
+ byte[] bytes = new byte[buffer.remaining()];
+ buffer.get(bytes);
+ return new String(bytes, Charset.defaultCharset());
+ }
+}
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/DepositToGCSTest.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/DepositToGCSTest.java
new file mode 100644
index 0000000..b0cbdef
--- /dev/null
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/DepositToGCSTest.java
@@ -0,0 +1,35 @@
+package us.dot.its.jpo.ode.aws.depositor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.Test;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.Bucket;
+import com.google.cloud.storage.Storage;
+
+public class DepositToGCSTest {
+ @Test
+ public void testDepositToGCS() {
+ Storage gcsStorage = mock(Storage.class);
+ Bucket bucket = mock(Bucket.class);
+ Blob blob = mock(Blob.class);
+
+ ConsumerRecord record = mock(ConsumerRecord.class);
+ when(record.value()).thenReturn("test");
+
+ when(gcsStorage.get(anyString())).thenReturn(bucket);
+ when(bucket.create(anyString(), any(byte[].class))).thenReturn(blob);
+
+ AwsDepositor awsDepositor = new AwsDepositor();
+
+ awsDepositor.depositToGCS(gcsStorage, "depositBucket", record);
+
+ verify(gcsStorage).get("depositBucket");
+ verify(bucket).create(anyString(), any(byte[].class));
+ }
+}
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/DepositToS3Test.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/DepositToS3Test.java
new file mode 100644
index 0000000..6da5f0a
--- /dev/null
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/DepositToS3Test.java
@@ -0,0 +1,41 @@
+package us.dot.its.jpo.ode.aws.depositor;
+
+import java.io.IOException;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import static org.junit.Assert.assertNotNull;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+
+
+public class DepositToS3Test {
+ @Test
+ public void testDepositToS3() throws IOException {
+ // Mock necessary classes
+ AmazonS3 s3 = mock(AmazonS3.class);
+ ConsumerRecord mockRecord = mock(ConsumerRecord.class);
+
+ PutObjectResult result = new PutObjectResult();
+ when(mockRecord.value()).thenReturn("Test Record");
+ when(s3.putObject(any())).thenReturn(result);
+
+ AwsDepositor awsDepositor = new AwsDepositor();
+ awsDepositor.depositToS3(s3, mockRecord);
+
+ // Verify that the putObject method was called on the mock AmazonS3 instance
+ ArgumentCaptor putObjectRequestCaptor = ArgumentCaptor.forClass(PutObjectRequest.class);
+ verify(s3).putObject(putObjectRequestCaptor.capture());
+
+ // Assert that the putObjectRequest was created correctly
+ PutObjectRequest putObjectRequestResult = putObjectRequestCaptor.getValue();
+ assertNotNull(putObjectRequestResult);
+ }
+}
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/GenerateAWSProfileTest.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/GenerateAWSProfileTest.java
new file mode 100644
index 0000000..21225a3
--- /dev/null
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/GenerateAWSProfileTest.java
@@ -0,0 +1,71 @@
+package us.dot.its.jpo.ode.aws.depositor;
+
+import java.io.IOException;
+
+import org.apache.http.HttpVersion;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.message.BasicStatusLine;
+import org.json.JSONObject;
+import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import org.junit.jupiter.api.Test;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class GenerateAWSProfileTest {
+ @Test
+ void testGenerateAWSProfileSuccess() throws Exception {
+
+ AwsDepositor depositor = spy(new AwsDepositor());
+
+ // Mock the CloseableHttpResponse
+ CloseableHttpResponse mockResponse = mock(CloseableHttpResponse.class);
+ when(mockResponse.getStatusLine()).thenReturn(new BasicStatusLine(HttpVersion.HTTP_1_1, 200, "OK"));
+ when(mockResponse.getEntity()).thenReturn(new StringEntity("{\"key\":\"value\"}"));
+
+ // Mock the CloseableHttpClient
+ CloseableHttpClient mockClient = mock(CloseableHttpClient.class);
+ when(mockClient.execute(any())).thenReturn(mockResponse);
+
+ doReturn(mockClient).when(depositor).getHttpClient();
+
+ depositor.depositorSetup();
+ JSONObject result = depositor.generateAWSProfile();
+
+ assertNotNull(result);
+ assertEquals("value", result.getString("key"));
+
+ verify(mockClient, times(1)).execute((HttpPost) any());
+ verify(mockResponse, times(1)).close();
+ verify(mockClient, times(1)).close();
+ }
+
+ @Test
+ void testGenerateAWSProfileException() throws IOException {
+ AwsDepositor depositor = spy(new AwsDepositor());
+
+ // Mock the CloseableHttpResponse
+ CloseableHttpResponse mockResponse = mock(CloseableHttpResponse.class);
+ when(mockResponse.getStatusLine()).thenReturn(new BasicStatusLine(HttpVersion.HTTP_1_1, 200, "OK"));
+ when(mockResponse.getEntity()).thenReturn(null);
+
+ // Mock the CloseableHttpClient
+ CloseableHttpClient mockClient = mock(CloseableHttpClient.class);
+ when(mockClient.execute(any())).thenReturn(mockResponse);
+
+ doReturn(mockClient).when(depositor).getHttpClient();
+ Exception exception = assertThrows(Exception.class, depositor::generateAWSProfile);
+
+ // Verify the exception
+ assertNotNull(exception);
+ }
+}
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/GetEnvironmentVariableTest.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/GetEnvironmentVariableTest.java
new file mode 100644
index 0000000..ddc1c83
--- /dev/null
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/GetEnvironmentVariableTest.java
@@ -0,0 +1,31 @@
+package us.dot.its.jpo.ode.aws.depositor;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import org.junit.jupiter.api.Test;
+
+public class GetEnvironmentVariableTest {
+ private final String TEST_VARIABLE = "TEST_VARIABLE";
+ private final String TEST_VARIABLE_NO_ENV = "TEST_VARIABLE_NO_ENV";
+ private final String TEST_VARIABLE_EMPTY = "TEST_VARIABLE_EMPTY";
+ private final String DEFAULT_VALUE = "default";
+
+ @Test
+ void testGetEnvironmentVariableExists() throws Exception {
+ String expectedValue = "testValue";
+
+ // Test when the environment variable is set
+ String result = AwsDepositor.getEnvironmentVariable(TEST_VARIABLE, "");
+ assertEquals(expectedValue, result);
+ }
+
+ @Test
+ void testGetEnvironmentVariableNotSetOrEmpty() {
+ // Test when the environment variable is not set
+ String notSetResult = AwsDepositor.getEnvironmentVariable(TEST_VARIABLE_NO_ENV, DEFAULT_VALUE);
+ assertEquals(DEFAULT_VALUE, notSetResult);
+
+ // Test when the environment variable is empty
+ String emptyResult = AwsDepositor.getEnvironmentVariable(TEST_VARIABLE_EMPTY, DEFAULT_VALUE);
+ assertEquals(DEFAULT_VALUE, emptyResult);
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/RunTest.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/RunTest.java
new file mode 100644
index 0000000..6bbec89
--- /dev/null
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/RunTest.java
@@ -0,0 +1,86 @@
+package us.dot.its.jpo.ode.aws.depositor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.json.JSONObject;
+import org.junit.jupiter.api.Test;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class RunTest {
+ @Test
+ public void testRunNoRecords() throws Exception {
+ AwsDepositor depositor = spy(new AwsDepositor());
+
+ KafkaConsumer mockConsumer = mock(KafkaConsumer.class);
+ when(mockConsumer.poll(any())).thenReturn(null);
+
+ doReturn(mockConsumer).when(depositor).getKafkaConsumer(any());
+ doReturn(true, true, false).when(depositor).getRunDepositor();
+
+ JSONObject generateAwsReturnVal = new JSONObject();
+ generateAwsReturnVal.put("testAccessKey", "test-access-key-id");
+ generateAwsReturnVal.put("testSecretKey", "test-secret-key");
+ generateAwsReturnVal.put("testSessionToken", "test-token");
+ generateAwsReturnVal.put("2020-01-01 00:00:00", "test-expiration");
+
+ doReturn(generateAwsReturnVal).when(depositor).generateAWSProfile();
+
+ depositor.run();
+
+ verify(depositor, times(1)).getKafkaConsumer(any());
+ verify(depositor, times(4)).getRunDepositor();
+ }
+
+ @Test
+ public void testRunRecords() throws Exception {
+ AwsDepositor depositor = spy(new AwsDepositor());
+
+ KafkaConsumer mockConsumer = mock(KafkaConsumer.class);
+ when(mockConsumer.poll(any())).thenReturn(null);
+
+ doReturn(mockConsumer).when(depositor).getKafkaConsumer(any());
+ doReturn(true, true, false).when(depositor).getRunDepositor();
+
+ JSONObject generateAwsReturnVal = new JSONObject();
+ generateAwsReturnVal.put("testAccessKey", "test-access-key-id");
+ generateAwsReturnVal.put("testSecretKey", "test-secret-key");
+ generateAwsReturnVal.put("testSessionToken", "test-token");
+ generateAwsReturnVal.put("2020-01-01 00:00:00", "test-expiration");
+
+ doReturn(generateAwsReturnVal).when(depositor).generateAWSProfile();
+
+ doNothing().when(depositor).depositToFirehose(any(), any());
+
+ List> records = new ArrayList<>();
+ records.add(new ConsumerRecord<>("topic", 0, 0, "test", "test-value"));
+
+ TopicPartition topicPartition = new TopicPartition("topic", 0);
+
+ Map>> recordsMap = new HashMap<>();
+ recordsMap.put(topicPartition, records);
+
+ ConsumerRecords mockRecords = new ConsumerRecords<>(recordsMap);
+
+ when(mockConsumer.poll(any())).thenReturn(mockRecords);
+
+ depositor.run();
+
+ verify(depositor, times(1)).getKafkaConsumer(any());
+ verify(depositor, times(4)).getRunDepositor();
+ verify(depositor, times(1)).depositToFirehose(any(), any());
+ }
+}