Skip to content

Commit

Permalink
#22 examples sbt guild - data moved to test/resources, data/output cl…
Browse files Browse the repository at this point in the history
…eanup
  • Loading branch information
dk1844 committed Nov 18, 2021
1 parent 7d332fc commit d75e4e2
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 11 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,4 @@ examples/data/output

pom.xml.bkp
.bsp
/data/
10 changes: 9 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,15 @@ lazy val s3sdkExtension = (project in file("atum-s3-sdk-extension"))
)
.dependsOn(core)

// TODO examples building is missing
lazy val examples = (project in file("examples"))
.settings(
name := "examples",
libraryDependencies ++= (rootDependencies ++ examplesDependencies),
assembly / test := (Test / test).value,
Test / parallelExecution := false, // Atum Control framework could attempt to double-initialize and fail
mergeStrategy
).dependsOn(core)


val mergeStrategy: Def.SettingsDefinition = assembly / assemblyMergeStrategy := {
case PathList("META-INF", _) => MergeStrategy.discard
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package za.co.absa.atum.examples

import java.nio.file.{Files, Paths}

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.scalatest.concurrent.Eventually
import za.co.absa.atum.AtumImplicits._
Expand All @@ -35,24 +35,27 @@ object SampleMeasurements1 extends Eventually {
implicit val fs: FileSystem = FileSystem.get(hadoopConfiguration)

// Initializing library to hook up to Apache Spark
spark.enableControlMeasuresTracking(Some("data/input/wikidata.csv.info"), None)
val inputCsvInfo = this.getClass.getResource("/input/wikidata.csv.info").toString // path from test resources

spark.enableControlMeasuresTracking(Some(inputCsvInfo), None)
.setControlMeasuresWorkflow("Job 1")

val inputCsv = this.getClass.getResource("/input/wikidata.csv").toString // path from test resources
// A business logic of a spark job ...

spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("data/input/wikidata.csv")
.csv(inputCsv)
.as("source")
.filter($"total_response_size" > 1000)
.setCheckpoint("checkpoint1")
.write.mode(SaveMode.Overwrite)
.parquet("data/output/stage1_job_results")

eventually(timeout(scaled(10.seconds)), interval(scaled(500.millis))) {
if (!Files.exists(Paths.get("data/output/stage1_job_results/_INFO")))
if (!fs.exists(new Path("data/output/stage1_job_results/_INFO"))) {
throw new Exception("_INFO file not found at data/output/stage1_job_results")
}
}

spark.disableControlMeasuresTracking()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

package za.co.absa.atum.examples

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}
import za.co.absa.atum.AtumImplicits._ // using basic Atum without extensions

Expand Down Expand Up @@ -52,5 +52,8 @@ object SampleMeasurements2 {
.parquet("data/output/stage2_job_results")

spark.disableControlMeasuresTracking()

fs.delete(new Path("data/output/stage1_job_results"), true)
fs.delete(new Path("data/output/stage2_job_results"), true)
}
}
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class HdfsInfoIntegrationSuite extends AnyFlatSpec with SparkTestBase with Match
LocalFsTestUtils.safeDeleteTestDir(tempDir)
}

private val inputCsv = "data/input/wikidata.csv"
private val inputCsv = getClass.getResource("/input/wikidata.csv").toString
private def readSparkInputCsv(inputCsvPath: String): DataFrame = spark.read
.option("header", "true")
.option("inferSchema", "true")
Expand All @@ -54,8 +54,9 @@ class HdfsInfoIntegrationSuite extends AnyFlatSpec with SparkTestBase with Match
.parquet(outputPath)

eventually(timeout(scaled(10.seconds)), interval(scaled(500.millis))) {
if (!Files.exists(Paths.get(outputPath)))
if (!Files.exists(Paths.get(outputPath))) {
throw new Exception("_INFO file not found at " + outputPath)
}
}
}

Expand All @@ -72,7 +73,8 @@ class HdfsInfoIntegrationSuite extends AnyFlatSpec with SparkTestBase with Match
implicit val fs: FileSystem = FileSystem.get(hadoopConfiguration)

// Initializing library to hook up to Apache Spark
spark.enableControlMeasuresTracking(sourceInfoFilePath = Some("data/input/wikidata.csv.info"), destinationInfoFilePath = destinationOptInfoFilePath)
val inputCsvInfo = getClass.getResource("/input/wikidata.csv.info").toString
spark.enableControlMeasuresTracking(sourceInfoFilePath = Some(inputCsvInfo), destinationInfoFilePath = destinationOptInfoFilePath)
.setControlMeasuresWorkflow("Job 1")

import spark.implicits._
Expand Down
13 changes: 12 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ object Dependencies {
val absaCommons = "0.0.27"
val typesafeConfig = "1.4.1"
val mockitoScala = "1.15.0"
val scalatest = "3.2.9"
val specs2 = "2.5"
val aws = "2.13.65"
}

// TODO alternate for multiversion build (hint: getScalaDependency(scalaVersion.value) in cobrix ); see Issue #121
Expand All @@ -47,7 +50,10 @@ object Dependencies {
lazy val mockitoScalaScalatest = "org.mockito" %% "mockito-scala-scalatest" % Versions.mockitoScala % Test
lazy val hadoopMinicluster = "org.apache.hadoop" % "hadoop-minicluster" % Versions.hadoop % Test

lazy val sdkS3 = "software.amazon.awssdk" % "s3" % "2.13.65"
lazy val scalaTestProvided = "org.scalatest" %% "scalatest" % Versions.scalatest % Provided
lazy val specs2core = "org.specs2" %% "specs2-core" % Versions.specs2 % Test

lazy val sdkS3 = "software.amazon.awssdk" % "s3" % Versions.aws

lazy val rootDependencies: Seq[ModuleID] = Seq(
sparkCore,
Expand All @@ -73,6 +79,11 @@ object Dependencies {
hadoopMinicluster,
)

lazy val examplesDependencies: Seq[ModuleID] = Seq(
specs2core,
scalaTestProvided
)

lazy val s3sdkExtensionDependencies: Seq[ModuleID] = Seq(
absaCommons,
sdkS3,
Expand Down

0 comments on commit d75e4e2

Please sign in to comment.