Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Unit Tests #26

Merged
merged 6 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"java.test.config": {
"env": {
"TEST_VARIABLE": "testValue",
"TEST_VARIABLE_EMPTY": "",
"AWS_ACCESS_KEY_ID": "testAccessKey",
"AWS_SECRET_ACCESS_KEY": "testSecretKey",
"AWS_EXPIRATION": "2020-01-01 00:00:00",
"AWS_SESSION_TOKEN": "testSessionToken",
"API_ENDPOINT": "testApiEndpoint",
"CONFLUENT_KEY": "testConfluentKey",
"CONFLUENT_SECRET": "testConfluentSecret",
}
},
}
57 changes: 56 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,41 @@
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<jmockit.version>1.49</jmockit.version>
<!-- Allow override of github organization when publishing artifacts to github -->
<github_organization>usdot-jpo-ode</github_organization>
<argLine>
-javaagent:${user.home}/.m2/repository/org/jmockit/jmockit/${jmockit.version}/jmockit-${jmockit.version}.jar
</argLine>
<jacoco.version>0.8.11</jacoco.version>
</properties>
<dependencies>
<dependency>
<groupId>org.jmockit</groupId>
<artifactId>jmockit</artifactId>
<version>${jmockit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.9.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.3.3</version>
<scope>test</scope>
</dependency>


<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
Expand Down Expand Up @@ -101,6 +126,36 @@
</descriptorRefs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.2.5</version>
<configuration>
<argLine>-javaagent:${user.home}/.m2/repository/org/jmockit/jmockit/${jmockit.version}/jmockit-${jmockit.version}.jar -Xshare:off</argLine>
<systemPropertyVariables>
<loader.path>${loader.path}</loader.path>
<buildDirectory>${project.build.directory}</buildDirectory>
</systemPropertyVariables>
<environmentVariables>
<TEST_VARIABLE>testValue</TEST_VARIABLE>
<TEST_VARIABLE_EMPTY></TEST_VARIABLE_EMPTY>
<AWS_ACCESS_KEY_ID>testAccessKey</AWS_ACCESS_KEY_ID>
<AWS_SECRET_ACCESS_KEY>testSecretKey</AWS_SECRET_ACCESS_KEY>
<AWS_SESSION_TOKEN>testSessionToken</AWS_SESSION_TOKEN>
<AWS_EXPIRATION>2020-01-01 00:00:00</AWS_EXPIRATION>
<API_ENDPOINT>testApiEndpoint</API_ENDPOINT>
<CONFLUENT_KEY>testConfluentKey</CONFLUENT_KEY>
<CONFLUENT_SECRET>testConfluentSecret</CONFLUENT_SECRET>
</environmentVariables>
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.2.5</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
<distributionManagement>
Expand Down
161 changes: 95 additions & 66 deletions src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,24 @@
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentials;
Expand All @@ -53,19 +67,6 @@
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AwsDepositor {
private final Logger logger = LoggerFactory.getLogger(AwsDepositor.class);
private final long CONSUMER_POLL_TIMEOUT_MS = 60000;
Expand All @@ -79,6 +80,8 @@ public class AwsDepositor {
private String keyName;
private boolean waitOpt;

private boolean runDepositor = true;

private String K_AWS_ACCESS_KEY_ID;
private String K_AWS_SECRET_ACCESS_KEY;
private String K_AWS_SESSION_TOKEN;
Expand All @@ -98,45 +101,12 @@ public class AwsDepositor {

public static void main(String[] args) throws Exception {
AwsDepositor awsDepositor = new AwsDepositor();
awsDepositor.run(args);
awsDepositor.run();
}

public void run(String[] args) throws Exception {
endpoint = getEnvironmentVariable("BOOTSTRAP_SERVER", "");
topic = getEnvironmentVariable("DEPOSIT_TOPIC", "");
group = getEnvironmentVariable("DEPOSIT_GROUP", "");
destination = getEnvironmentVariable("DESTINATION", "firehose");
if (System.getenv("WAIT") != null && System.getenv("WAIT") != "")
{ waitOpt = true; }
else
{ waitOpt = false; }

// S3 properties
bucketName = getEnvironmentVariable("DEPOSIT_BUCKET_NAME", "");
awsRegion = getEnvironmentVariable("REGION", "us-east-1");
keyName = getEnvironmentVariable("DEPOSIT_KEY_NAME", "");

K_AWS_ACCESS_KEY_ID = getEnvironmentVariable("AWS_ACCESS_KEY_ID", "AccessKeyId");
K_AWS_SECRET_ACCESS_KEY = getEnvironmentVariable("AWS_SECRET_ACCESS_KEY", "SecretAccessKey");
K_AWS_SESSION_TOKEN = getEnvironmentVariable("AWS_SESSION_TOKEN", "SessionToken");
K_AWS_EXPIRATION = getEnvironmentVariable("AWS_EXPIRATION", "Expiration");
API_ENDPOINT = getEnvironmentVariable("API_ENDPOINT", "");
HEADER_Accept = getEnvironmentVariable("HEADER_ACCEPT", "application/json");
HEADER_X_API_KEY = getEnvironmentVariable("HEADER_X_API_KEY", "");

logger.debug("Bucket name: {}", bucketName);
logger.debug("AWS Region: {}", awsRegion);
logger.debug("Key name: {}", keyName);
logger.debug("Kafka topic: {}", topic);
logger.debug("Destination: {}", destination);
logger.debug("Wait: {}", waitOpt);
logger.debug("AWS_ACCESS_KEY_ID: {}", K_AWS_ACCESS_KEY_ID);
logger.debug("AWS_SECRET_ACCESS_KEY: {}", K_AWS_SECRET_ACCESS_KEY);
logger.debug("AWS_SESSION_TOKEN: {}", K_AWS_SESSION_TOKEN);
logger.debug("AWS_EXPIRATION: {}", K_AWS_EXPIRATION);
logger.debug("API_ENDPOINT: {}", API_ENDPOINT);
logger.debug("HEADER_Accept: {}", HEADER_Accept);
logger.debug("HEADER_X_API_KEY: {}", HEADER_X_API_KEY);
public void run() throws Exception {
// Pull in environment variables
depositorSetup();

if (API_ENDPOINT.length() > 0) {
JSONObject profile = generateAWSProfile();
Expand Down Expand Up @@ -187,16 +157,16 @@ public void run(String[] args) throws Exception {
}


while (true) {
KafkaConsumer<String, String> stringConsumer = new KafkaConsumer<String, String>(props);
while (getRunDepositor()) {
KafkaConsumer<String, String> stringConsumer = getKafkaConsumer(props);

logger.debug("Subscribing to topic " + topic);
stringConsumer.subscribe(Arrays.asList(topic));

try {
boolean gotMessages = false;

while (true) {
while (getRunDepositor()) {
ConsumerRecords<String, String> records = stringConsumer.poll(Duration.ofMillis(CONSUMER_POLL_TIMEOUT_MS));
if (records != null && !records.isEmpty()) {
for (ConsumerRecord<String, String> record : records) {
Expand Down Expand Up @@ -234,7 +204,7 @@ public void run(String[] args) throws Exception {
}
}

private static void addConfluentProperties(Properties props) {
static void addConfluentProperties(Properties props) {
props.put("ssl.endpoint.identification.algorithm", "https");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
Expand All @@ -250,7 +220,7 @@ private static void addConfluentProperties(Properties props) {
}
}

private void depositToFirehose(AmazonKinesisFirehoseAsync firehose, ConsumerRecord<String, String> record)
void depositToFirehose(AmazonKinesisFirehoseAsync firehose, ConsumerRecord<String, String> record)
throws InterruptedException, ExecutionException, IOException {
try {
// IMPORTANT!!!
Expand All @@ -261,9 +231,8 @@ private void depositToFirehose(AmazonKinesisFirehoseAsync firehose, ConsumerReco
ByteBuffer data = convertStringToByteBuffer(msg, Charset.defaultCharset());

// Check the expiration time for the profile credentials
LocalDateTime current_datetime = LocalDateTime.now();
LocalDateTime expiration_datetime = LocalDateTime.parse(AWS_EXPIRATION,
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
LocalDateTime current_datetime = getLocalDateTime();
LocalDateTime expiration_datetime = getExpirationDateTime();
System.out.println();
if (expiration_datetime.isBefore(current_datetime) && API_ENDPOINT.length() > 0) {
// If credential is expired, generate aws credentials
Expand Down Expand Up @@ -307,7 +276,7 @@ private void depositToFirehose(AmazonKinesisFirehoseAsync firehose, ConsumerReco
}
}

private void depositToS3(AmazonS3 s3, ConsumerRecord<String, String> record) throws IOException {
void depositToS3(AmazonS3 s3, ConsumerRecord<String, String> record) throws IOException {
try {
long time = System.currentTimeMillis();
String timeStamp = Long.toString(time);
Expand Down Expand Up @@ -346,7 +315,7 @@ private void depositToS3(AmazonS3 s3, ConsumerRecord<String, String> record) thr
}
}

private void depositToGCS(Storage gcsStorage, String depositBucket, ConsumerRecord<String, String> record) {
void depositToGCS(Storage gcsStorage, String depositBucket, ConsumerRecord<String, String> record) {
String recordValue = record.value();
Bucket bucket = gcsStorage.get(depositBucket);
byte[] bytes = recordValue.getBytes(Charset.defaultCharset());
Expand All @@ -362,7 +331,7 @@ private void depositToGCS(Storage gcsStorage, String depositBucket, ConsumerReco
}
}

private AmazonKinesisFirehoseAsync buildFirehoseClient(String awsRegion) {
AmazonKinesisFirehoseAsync buildFirehoseClient(String awsRegion) {
// Default is to deposit to Kinesis/Firehose, override via .env
// variables if S3 deposit desired
logger.debug("=============================");
Expand All @@ -372,7 +341,7 @@ private AmazonKinesisFirehoseAsync buildFirehoseClient(String awsRegion) {
return AmazonKinesisFirehoseAsyncClientBuilder.standard().withRegion(awsRegion).build();
}

private AmazonS3 createS3Client(String awsRegion) {
AmazonS3 createS3Client(String awsRegion) {
logger.debug("============== ========");
logger.debug("Connecting to Amazon S3");
logger.debug("=======================");
Expand All @@ -397,7 +366,7 @@ public ByteBuffer convertStringToByteBuffer(String msg, Charset charset) {
return ByteBuffer.wrap(msg.getBytes(charset));
}

private File createSampleFile(String json) throws IOException {
File createSampleFile(String json) throws IOException {
File file = File.createTempFile("aws-java-sdk-", ".json");
file.deleteOnExit();

Expand All @@ -408,8 +377,8 @@ private File createSampleFile(String json) throws IOException {
return file;
}

private JSONObject generateAWSProfile() throws IOException {
CloseableHttpClient client = HttpClients.createDefault();
JSONObject generateAWSProfile() throws IOException {
CloseableHttpClient client = getHttpClient();
HttpPost httpPost = new HttpPost(API_ENDPOINT);
JSONObject jsonResult = new JSONObject();
String json = "{}";
Expand All @@ -435,7 +404,9 @@ private JSONObject generateAWSProfile() throws IOException {
return jsonResult;
}

private static String getEnvironmentVariable(String variableName, String defaultValue) {
static String getEnvironmentVariable(String variableName, String defaultValue) {
// get all environment variables
Map<String, String> env = System.getenv();
String value = System.getenv(variableName);
if (value == null || value.equals("")) {
System.out.println("Something went wrong retrieving the environment variable " + variableName);
Expand All @@ -445,4 +416,62 @@ private static String getEnvironmentVariable(String variableName, String default
return value;
}

CloseableHttpClient getHttpClient() {
return HttpClients.createDefault();
}

LocalDateTime getLocalDateTime() {
return LocalDateTime.now();
}

LocalDateTime getExpirationDateTime() {
return LocalDateTime.parse(K_AWS_EXPIRATION,
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
}

void depositorSetup() {
endpoint = getEnvironmentVariable("BOOTSTRAP_SERVER", "");
topic = getEnvironmentVariable("DEPOSIT_TOPIC", "");
group = getEnvironmentVariable("DEPOSIT_GROUP", "");
destination = getEnvironmentVariable("DESTINATION", "firehose");
if (System.getenv("WAIT") != null && System.getenv("WAIT") != "")
{ waitOpt = true; }
else
{ waitOpt = false; }

// S3 properties
bucketName = getEnvironmentVariable("DEPOSIT_BUCKET_NAME", "");
awsRegion = getEnvironmentVariable("REGION", "us-east-1");
keyName = getEnvironmentVariable("DEPOSIT_KEY_NAME", "");

K_AWS_ACCESS_KEY_ID = getEnvironmentVariable("AWS_ACCESS_KEY_ID", "AccessKeyId");
K_AWS_SECRET_ACCESS_KEY = getEnvironmentVariable("AWS_SECRET_ACCESS_KEY", "SecretAccessKey");
K_AWS_SESSION_TOKEN = getEnvironmentVariable("AWS_SESSION_TOKEN", "SessionToken");
K_AWS_EXPIRATION = getEnvironmentVariable("AWS_EXPIRATION", "Expiration");
API_ENDPOINT = getEnvironmentVariable("API_ENDPOINT", "");
HEADER_Accept = getEnvironmentVariable("HEADER_ACCEPT", "application/json");
HEADER_X_API_KEY = getEnvironmentVariable("HEADER_X_API_KEY", "");

logger.debug("Bucket name: {}", bucketName);
logger.debug("AWS Region: {}", awsRegion);
logger.debug("Key name: {}", keyName);
logger.debug("Kafka topic: {}", topic);
logger.debug("Destination: {}", destination);
logger.debug("Wait: {}", waitOpt);
logger.debug("AWS_ACCESS_KEY_ID: {}", K_AWS_ACCESS_KEY_ID);
logger.debug("AWS_SECRET_ACCESS_KEY: {}", K_AWS_SECRET_ACCESS_KEY);
logger.debug("AWS_SESSION_TOKEN: {}", K_AWS_SESSION_TOKEN);
logger.debug("AWS_EXPIRATION: {}", K_AWS_EXPIRATION);
logger.debug("API_ENDPOINT: {}", API_ENDPOINT);
logger.debug("HEADER_Accept: {}", HEADER_Accept);
logger.debug("HEADER_X_API_KEY: {}", HEADER_X_API_KEY);
}

boolean getRunDepositor() {
return runDepositor;
}

KafkaConsumer<String, String> getKafkaConsumer(Properties props) {
return new KafkaConsumer<>(props);
}
}
Loading
Loading