Skip to content

Commit

Permalink
Merge pull request #986 from AbsaOSS/feature/spline-978-uuid-collisio…
Browse files Browse the repository at this point in the history
…n-handling

Feature/spline 978 UUID collision handling
  • Loading branch information
wajda authored Oct 22, 2021
2 parents 958b4c0 + b925f0a commit 25fd2cb
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 25 deletions.
2 changes: 1 addition & 1 deletion build/parent-pom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
<java.version>1.8</java.version>
<scala.version>2.12.13</scala.version>
<scala.compat.version>2.12</scala.compat.version>
<logback.version>1.2.3</logback.version>
<logback.version>1.2.6</logback.version>
<slf4j.version>1.7.25</slf4j.version>
<json4s.version>3.6.7</json4s.version>
<spring.version>5.2.2.RELEASE</spring.version>
Expand Down
2 changes: 1 addition & 1 deletion persistence/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
<dependency>
<groupId>com.arangodb</groupId>
<artifactId>arangodb-java-driver</artifactId>
<version>6.12.2</version>
<version>6.14.0</version>
</dependency>
<dependency>
<groupId>com.arangodb</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ object DataSource {
*/
case class ExecutionPlan(
name: Option[ExecutionPlan.Name],
discriminator: Option[ExecutionPlan.Discriminator],
systemInfo: Map[String, Any],
agentInfo: Map[String, Any],
extra: Map[String, Any],
Expand All @@ -117,6 +118,7 @@ case class ExecutionPlan(

object ExecutionPlan {
type Name = String
type Discriminator = String
}

/**
Expand All @@ -127,6 +129,7 @@ object ExecutionPlan {
case class Progress(
timestamp: Long,
durationNs: Option[Progress.JobDurationInNanos],
discriminator: Option[ExecutionPlan.Discriminator],
error: Option[Any],
extra: Map[String, Any],
override val _key: ArangoDocument.Key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ package za.co.absa.spline.producer.model.v1_1

import za.co.absa.spline.producer.model.v1_1.ExecutionEvent._

import java.util.UUID
import scala.language.implicitConversions

case class ExecutionEvent(
planId: UUID,
planId: ExecutionPlan.Id,
timestamp: Long,
durationNs: Option[DurationNs],
discriminator: Option[ExecutionPlan.Discriminator] = None,
error: Option[Any] = None,
extra: Map[String, Any] = Map.empty
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import za.co.absa.spline.common.validation.{Constraint, ValidationUtils}
import java.util.UUID

case class ExecutionPlan(
id: UUID = UUID.randomUUID(),
id: ExecutionPlan.Id = UUID.randomUUID(),
name: Option[ExecutionPlan.Name],
discriminator: Option[ExecutionPlan.Discriminator] = None,

operations: Operations,
attributes: Seq[Attribute] = Nil,
Expand All @@ -45,8 +46,10 @@ case class ExecutionPlan(
}

object ExecutionPlan {
type Id = UUID
type Name = String
type DataSourceUri = String
type Discriminator = String
}

case class Operations(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,20 @@ class ExecutionEventsController @Autowired()(
{
// Reference to the execution plan Id that was triggered
planId: <UUID>,
// [Optional] A label that logically distinguish a group of one of multiple execution plans from another group.
// If set, it has to match the discriminator of the associated execution plan.
// The property is used for UUID collision detection.
discriminator: <string>,
// Time (milliseconds since Epoch) when the execution finished
timestamp: <number>,
// [Optional] Duration (in nanoseconds) of the execution
durationNs: <number>,
// [Optional] Additional info about the error (in case there was an error during the execution)
error: {...},
// [Optional] Any other extra information related to the given execution event
extra: {...}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ class ExecutionPlansController @Autowired()(
// [Optional] A name of the application (script, job etc) that this execution plan represents.
name: <string>,
// [Optional] A label that logically distinguish a group of one of multiple execution plans from another group.
// If set, it has to match the discriminator of the associated execution events.
// The property is used for UUID collision detection.
discriminator: <string>,
// Operation level lineage info
operations: {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spline.producer.service

import java.util.UUID

class UUIDCollisionDetectedException(
entityName: String,
id: UUID,
discriminator: String
) extends RuntimeException(s"$entityName UUID collision detected: $id, discriminator: $discriminator")
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class ExecutionPlanPersistentModelBuilder private(
def build(): ExecutionPlanPersistentModel = {
val pmExecutionPlan = pm.ExecutionPlan(
name = ep.name,
discriminator = ep.discriminator,
_key = ep.id.toString,
systemInfo = ep.systemInfo.toJsonAs[Map[String, Any]],
agentInfo = ep.agentInfo.map(_.toJsonAs[Map[String, Any]]).orNull,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import za.co.absa.spline.persistence.tx.{ArangoTx, InsertQuery, TxBuilder}
import za.co.absa.spline.persistence.{ArangoImplicits, Persister}
import za.co.absa.spline.producer.model.v1_1.ExecutionEvent._
import za.co.absa.spline.producer.model.{v1_1 => apiModel}
import za.co.absa.spline.producer.service.InconsistentEntityException
import za.co.absa.spline.producer.service.model.{ExecutionEventKeyCreator, ExecutionPlanPersistentModel, ExecutionPlanPersistentModelBuilder}
import za.co.absa.spline.producer.service.{UUIDCollisionDetectedException, InconsistentEntityException}

import java.util.UUID
import scala.compat.java8.FutureConverters._
Expand All @@ -42,15 +42,18 @@ class ExecutionProducerRepositoryImpl @Autowired()(db: ArangoDatabaseAsync) exte
import ExecutionProducerRepositoryImpl._

override def insertExecutionPlan(executionPlan: apiModel.ExecutionPlan)(implicit ec: ExecutionContext): Future[Unit] = Persister.execute({
val planAlreadyExistsFuture = db.queryOne[Boolean](
// Here I have to use the type parameter `Any` and cast to `String` later due to ArangoDb Java driver issue.
// See https://github.com/arangodb/arangodb-java-driver/issues/389
val eventualMaybeExistingDiscriminatorOpt: Future[Option[String]] = db.queryOptional[Any](
s"""
|WITH ${NodeDef.ExecutionPlan.name}
|FOR ex IN ${NodeDef.ExecutionPlan.name}
| FILTER ex._key == @key
| COLLECT WITH COUNT INTO cnt
| RETURN TO_BOOL(cnt)
| LIMIT 1
| RETURN ex.discriminator
| """.stripMargin,
Map("key" -> executionPlan.id))
Map("key" -> executionPlan.id)
).map(_.map(Option(_).map(_.toString).orNull))

val eventualPersistedDSKeyByURI: Future[Map[DataSource.Uri, DataSource.Key]] = db.queryAs[DataSource](
s"""
Expand All @@ -64,15 +67,21 @@ class ExecutionProducerRepositoryImpl @Autowired()(db: ArangoDatabaseAsync) exte

for {
persistedDSKeyByURI <- eventualPersistedDSKeyByURI
planAlreadyExists <- planAlreadyExistsFuture
_ <-
if (planAlreadyExists) Future.successful(Unit) // nothing more to do
else createInsertTransaction(executionPlan, persistedDSKeyByURI).execute(db)
maybeExistingDiscriminatorOpt <- eventualMaybeExistingDiscriminatorOpt
_ <- maybeExistingDiscriminatorOpt match {
case Some(existingDiscriminatorOrNull) =>
// execution plan with the given ID already exists
ensureNoExecPlanIDCollision(executionPlan.id, executionPlan.discriminator.orNull, existingDiscriminatorOrNull)
Future.successful(Unit)
case None =>
// no execution plan with the given ID found
createInsertTransaction(executionPlan, persistedDSKeyByURI).execute(db)
}
} yield Unit
})

override def insertExecutionEvents(events: Array[apiModel.ExecutionEvent])(implicit ec: ExecutionContext): Future[Unit] = Persister.execute({
val eventualExecPlanDetails = db.queryStream[ExecPlanDetails](
val eventualExecPlanInfos: Future[Seq[ExecPlanInfo]] = db.queryStream[ExecPlanInfo](
s"""
|WITH executionPlan, executes, operation, dataSource
|FOR ep IN executionPlan
Expand All @@ -82,21 +91,33 @@ class ExecutionProducerRepositoryImpl @Autowired()(db: ArangoDatabaseAsync) exte
| LET ds = FIRST(FOR v IN 1 OUTBOUND ep affects RETURN v)
|
| RETURN {
| "executionPlanKey" : ep._key,
| "frameworkName" : CONCAT(ep.systemInfo.name, " ", ep.systemInfo.version),
| "applicationName" : ep.name,
| "dataSourceUri" : ds.uri,
| "dataSourceName" : ds.name,
| "dataSourceType" : wo.extra.destinationType,
| "append" : wo.append
| key : ep._key,
| discriminator : ep.discriminator,
| details: {
| "executionPlanKey" : ep._key,
| "frameworkName" : CONCAT(ep.systemInfo.name, " ", ep.systemInfo.version),
| "applicationName" : ep.name,
| "dataSourceUri" : ds.uri,
| "dataSourceName" : ds.name,
| "dataSourceType" : wo.extra.destinationType,
| "append" : wo.append
| }
| }
|""".stripMargin,
Map("keys" -> events.map(_.planId))
)

for {
execPlansDetails <- eventualExecPlanDetails
res <- createInsertTransaction(events, execPlansDetails.toArray).execute(db)
execPlansInfos <- eventualExecPlanInfos
(execPlanDiscrById, execPlansDetails) = execPlansInfos
.foldLeft((Map.empty[apiModel.ExecutionPlan.Id, apiModel.ExecutionPlan.Discriminator], Vector.empty[ExecPlanDetails])) {
case ((descrByIdAcc, detailsAcc), ExecPlanInfo(id, discr, details)) =>
(descrByIdAcc + (UUID.fromString(id) -> discr), detailsAcc :+ details)
}
res <- {
events.foreach(e => ensureNoExecPlanIDCollision(e.planId, e.discriminator.orNull, execPlanDiscrById(e.planId)))
createInsertTransaction(events, execPlansDetails.toArray).execute(db)
}
} yield res
})

Expand All @@ -117,6 +138,13 @@ class ExecutionProducerRepositoryImpl @Autowired()(db: ArangoDatabaseAsync) exte

object ExecutionProducerRepositoryImpl {

case class ExecPlanInfo(
key: ArangoDocument.Key,
discriminator: ExecutionPlan.Discriminator,
details: ExecPlanDetails) {
def this() = this(null, null, null)
}

private def createInsertTransaction(
executionPlan: apiModel.ExecutionPlan,
persistedDSKeyByURI: Map[DataSource.Uri, DataSource.Key]
Expand Down Expand Up @@ -175,7 +203,15 @@ object ExecutionProducerRepositoryImpl {
.zip(execPlansDetails)
.map { case (e, pd) =>
val key = new ExecutionEventKeyCreator(e).executionEventKey
Progress(e.timestamp, e.durationNs, e.error, e.extra, key, pd)
Progress(
timestamp = e.timestamp,
durationNs = e.durationNs,
discriminator = e.discriminator,
error = e.error,
extra = e.extra,
_key = key,
execPlanDetails = pd
)
}

val progressEdges = progressNodes
Expand All @@ -188,4 +224,13 @@ object ExecutionProducerRepositoryImpl {
.buildTx
}

private def ensureNoExecPlanIDCollision(
planId: apiModel.ExecutionPlan.Id,
actualDiscriminator: apiModel.ExecutionPlan.Discriminator,
expectedDiscriminator: apiModel.ExecutionPlan.Discriminator
): Unit = {
if (actualDiscriminator != expectedDiscriminator) {
throw new UUIDCollisionDetectedException("ExecutionPlan", planId, actualDiscriminator)
}
}
}

0 comments on commit 25fd2cb

Please sign in to comment.