From a4ba9f705ee244b401fd8c4124b0a322dfeb2fc2 Mon Sep 17 00:00:00 2001 From: Daniel Kavan Date: Mon, 2 Aug 2021 12:59:03 +0200 Subject: [PATCH] #99 eventually is back for compile scope - to demonstrate sample measurements _INFO output --- examples/atum-examples/pom.xml | 2 +- .../atum/examples/SampleMeasurements1.scala | 14 ++++---- .../atum/examples/SampleMeasurements2.scala | 14 +++++--- .../atum/examples/SampleMeasurements3.scala | 32 ++++++++++--------- 4 files changed, 35 insertions(+), 27 deletions(-) diff --git a/examples/atum-examples/pom.xml b/examples/atum-examples/pom.xml index d36ca019..d51bc4bf 100644 --- a/examples/atum-examples/pom.xml +++ b/examples/atum-examples/pom.xml @@ -101,7 +101,7 @@ org.scalatest scalatest_${scala.binary.version} ${scalatest.version} - test + compile diff --git a/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements1.scala b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements1.scala index 2616e858..5dea05fd 100644 --- a/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements1.scala +++ b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements1.scala @@ -20,9 +20,12 @@ import java.nio.file.{Files, Paths} import org.apache.hadoop.fs.FileSystem import org.apache.log4j.LogManager import org.apache.spark.sql.{SaveMode, SparkSession} +import org.scalatest.concurrent.Eventually import za.co.absa.atum.AtumImplicits._ -object SampleMeasurements1 { +import scala.concurrent.duration.DurationInt + +object SampleMeasurements1 extends Eventually { private val log = LogManager.getLogger(this.getClass) @@ -55,11 +58,10 @@ object SampleMeasurements1 { spark.disableControlMeasuresTracking() spark.close() - if (!Files.exists(Paths.get("data/output/stage1_job_results/_INFO"))) { - throw new Exception("_INFO file not found at data/output/stage1_job_results") - } else { - log.info("File data/output/stage1_job_results/_INFO found.") + eventually(timeout(scaled(10.seconds)), interval(scaled(500.millis))) { + if (!Files.exists(Paths.get("data/output/stage1_job_results/_INFO"))) { + throw new Exception("_INFO file not found at data/output/stage1_job_results") + } } - } } diff --git a/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements2.scala b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements2.scala index 99fce0db..623185e5 100644 --- a/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements2.scala +++ b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements2.scala @@ -20,9 +20,13 @@ import java.nio.file.{Files, Paths} import org.apache.hadoop.fs.FileSystem import org.apache.log4j.LogManager import org.apache.spark.sql.{SaveMode, SparkSession} +import org.scalatest.concurrent.Eventually import za.co.absa.atum.AtumImplicits._ +import za.co.absa.atum.examples.SampleMeasurements1.{eventually, interval, scaled, timeout} -object SampleMeasurements2 { +import scala.concurrent.duration.DurationInt + +object SampleMeasurements2 extends Eventually { private val log = LogManager.getLogger(this.getClass) @@ -63,10 +67,10 @@ object SampleMeasurements2 { spark.disableControlMeasuresTracking() spark.close() - if (!Files.exists(Paths.get("data/output/stage2_job_results/_INFO"))) { - throw new Exception("_INFO file not found at data/output/stage2_job_results") - } else { - log.info("File data/output/stage2_job_results/_INFO found.") + eventually(timeout(scaled(10.seconds)), interval(scaled(500.millis))) { + if (!Files.exists(Paths.get("data/output/stage2_job_results/_INFO"))) { + throw new Exception("_INFO file not found at data/output/stage2_job_results") + } } } } diff --git a/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements3.scala b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements3.scala index e0b7a623..33fcd8bb 100644 --- a/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements3.scala +++ b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements3.scala @@ -20,12 +20,15 @@ import java.nio.file.{Files, Paths} import org.apache.hadoop.fs.FileSystem import org.apache.log4j.LogManager import org.apache.spark.sql.{SaveMode, SparkSession} +import org.scalatest.concurrent.Eventually import za.co.absa.atum.AtumImplicits._ +import za.co.absa.atum.examples.SampleMeasurements2.{eventually, interval, scaled, timeout} import za.co.absa.atum.model.ControlMeasure import za.co.absa.atum.utils.{BuildProperties, FileUtils, SerializationUtils} +import scala.concurrent.duration.DurationInt -object SampleMeasurements3 { +object SampleMeasurements3 extends Eventually { case class MyBuildProperties(projectName: String, buildVersion: String) extends BuildProperties private val log = LogManager.getLogger(this.getClass) @@ -55,25 +58,24 @@ object SampleMeasurements3 { .write.mode(SaveMode.Overwrite) .parquet("data/output/stage3_job_results") - spark.disableControlMeasuresTracking() spark.close() - if (!Files.exists(Paths.get("data/output/stage3_job_results/_INFO"))) { - throw new Exception("_INFO file not found at data/output/stage3_job_results") - } else { - log.info("File data/output/stage3_job_results/_INFO found. Checking its content...") - } + eventually(timeout(scaled(10.seconds)), interval(scaled(500.millis))) { + if (!Files.exists(Paths.get("data/output/stage3_job_results/_INFO"))) { + throw new Exception("_INFO file not found at data/output/stage3_job_results") + } - val jsonInfoFile = FileUtils.readFileToString("data/output/stage3_job_results/_INFO") - val measureObject1: ControlMeasure = SerializationUtils.fromJson[ControlMeasure](jsonInfoFile) - val checkpoint = measureObject1.checkpoints.filter(_.name == "checkpoint1").head + val jsonInfoFile = FileUtils.readFileToString("data/output/stage3_job_results/_INFO") + val measureObject1: ControlMeasure = SerializationUtils.fromJson[ControlMeasure](jsonInfoFile) + val checkpoint = measureObject1.checkpoints.filter(_.name == "checkpoint1").head - if (!checkpoint.software.contains("MySoftware") || !checkpoint.version.contains("v007")) { - throw new Exception(s"Software or Version was not set properly. Got name ${checkpoint.software} and version ${checkpoint.version}") - } else { - log.info("_INFO file correctly contained custom SW Name and version.") - } + if (!checkpoint.software.contains("MySoftware") || !checkpoint.version.contains("v007")) { + throw new Exception(s"Software or Version was not set properly. Got name ${checkpoint.software} and version ${checkpoint.version}") + } else { + log.info("_INFO file correctly contained custom SW Name and version.") + } + } } }