diff --git a/examples/atum-examples/pom.xml b/examples/atum-examples/pom.xml
index d36ca019..d51bc4bf 100644
--- a/examples/atum-examples/pom.xml
+++ b/examples/atum-examples/pom.xml
@@ -101,7 +101,7 @@
org.scalatest
scalatest_${scala.binary.version}
${scalatest.version}
- test
+ compile
diff --git a/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements1.scala b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements1.scala
index 2616e858..5dea05fd 100644
--- a/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements1.scala
+++ b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements1.scala
@@ -20,9 +20,12 @@ import java.nio.file.{Files, Paths}
import org.apache.hadoop.fs.FileSystem
import org.apache.log4j.LogManager
import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.scalatest.concurrent.Eventually
import za.co.absa.atum.AtumImplicits._
-object SampleMeasurements1 {
+import scala.concurrent.duration.DurationInt
+
+object SampleMeasurements1 extends Eventually {
private val log = LogManager.getLogger(this.getClass)
@@ -55,11 +58,10 @@ object SampleMeasurements1 {
spark.disableControlMeasuresTracking()
spark.close()
- if (!Files.exists(Paths.get("data/output/stage1_job_results/_INFO"))) {
- throw new Exception("_INFO file not found at data/output/stage1_job_results")
- } else {
- log.info("File data/output/stage1_job_results/_INFO found.")
+ eventually(timeout(scaled(10.seconds)), interval(scaled(500.millis))) {
+ if (!Files.exists(Paths.get("data/output/stage1_job_results/_INFO"))) {
+ throw new Exception("_INFO file not found at data/output/stage1_job_results")
+ }
}
-
}
}
diff --git a/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements2.scala b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements2.scala
index 99fce0db..623185e5 100644
--- a/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements2.scala
+++ b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements2.scala
@@ -20,9 +20,13 @@ import java.nio.file.{Files, Paths}
import org.apache.hadoop.fs.FileSystem
import org.apache.log4j.LogManager
import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.scalatest.concurrent.Eventually
import za.co.absa.atum.AtumImplicits._
+import za.co.absa.atum.examples.SampleMeasurements1.{eventually, interval, scaled, timeout}
-object SampleMeasurements2 {
+import scala.concurrent.duration.DurationInt
+
+object SampleMeasurements2 extends Eventually {
private val log = LogManager.getLogger(this.getClass)
@@ -63,10 +67,10 @@ object SampleMeasurements2 {
spark.disableControlMeasuresTracking()
spark.close()
- if (!Files.exists(Paths.get("data/output/stage2_job_results/_INFO"))) {
- throw new Exception("_INFO file not found at data/output/stage2_job_results")
- } else {
- log.info("File data/output/stage2_job_results/_INFO found.")
+ eventually(timeout(scaled(10.seconds)), interval(scaled(500.millis))) {
+ if (!Files.exists(Paths.get("data/output/stage2_job_results/_INFO"))) {
+ throw new Exception("_INFO file not found at data/output/stage2_job_results")
+ }
}
}
}
diff --git a/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements3.scala b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements3.scala
index e0b7a623..33fcd8bb 100644
--- a/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements3.scala
+++ b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements3.scala
@@ -20,12 +20,15 @@ import java.nio.file.{Files, Paths}
import org.apache.hadoop.fs.FileSystem
import org.apache.log4j.LogManager
import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.scalatest.concurrent.Eventually
import za.co.absa.atum.AtumImplicits._
+import za.co.absa.atum.examples.SampleMeasurements2.{eventually, interval, scaled, timeout}
import za.co.absa.atum.model.ControlMeasure
import za.co.absa.atum.utils.{BuildProperties, FileUtils, SerializationUtils}
+import scala.concurrent.duration.DurationInt
-object SampleMeasurements3 {
+object SampleMeasurements3 extends Eventually {
case class MyBuildProperties(projectName: String, buildVersion: String) extends BuildProperties
private val log = LogManager.getLogger(this.getClass)
@@ -55,25 +58,24 @@ object SampleMeasurements3 {
.write.mode(SaveMode.Overwrite)
.parquet("data/output/stage3_job_results")
-
spark.disableControlMeasuresTracking()
spark.close()
- if (!Files.exists(Paths.get("data/output/stage3_job_results/_INFO"))) {
- throw new Exception("_INFO file not found at data/output/stage3_job_results")
- } else {
- log.info("File data/output/stage3_job_results/_INFO found. Checking its content...")
- }
+ eventually(timeout(scaled(10.seconds)), interval(scaled(500.millis))) {
+ if (!Files.exists(Paths.get("data/output/stage3_job_results/_INFO"))) {
+ throw new Exception("_INFO file not found at data/output/stage3_job_results")
+ }
- val jsonInfoFile = FileUtils.readFileToString("data/output/stage3_job_results/_INFO")
- val measureObject1: ControlMeasure = SerializationUtils.fromJson[ControlMeasure](jsonInfoFile)
- val checkpoint = measureObject1.checkpoints.filter(_.name == "checkpoint1").head
+ val jsonInfoFile = FileUtils.readFileToString("data/output/stage3_job_results/_INFO")
+ val measureObject1: ControlMeasure = SerializationUtils.fromJson[ControlMeasure](jsonInfoFile)
+ val checkpoint = measureObject1.checkpoints.filter(_.name == "checkpoint1").head
- if (!checkpoint.software.contains("MySoftware") || !checkpoint.version.contains("v007")) {
- throw new Exception(s"Software or Version was not set properly. Got name ${checkpoint.software} and version ${checkpoint.version}")
- } else {
- log.info("_INFO file correctly contained custom SW Name and version.")
- }
+ if (!checkpoint.software.contains("MySoftware") || !checkpoint.version.contains("v007")) {
+ throw new Exception(s"Software or Version was not set properly. Got name ${checkpoint.software} and version ${checkpoint.version}")
+ } else {
+ log.info("_INFO file correctly contained custom SW Name and version.")
+ }
+ }
}
}