From 1a8e6fc8197b676a667540134e57d8deac196ed8 Mon Sep 17 00:00:00 2001 From: Peter Banda Date: Thu, 20 Jun 2019 22:09:18 +0200 Subject: [PATCH] Link data sets fix - null link --- build.sbt | 2 +- .../org/ada/server/models/ScheduledTime.scala | 16 ++++++++++ .../models/dataimport/CsvDataSetImport.scala | 17 +++++++++- .../models/dataimport/DataSetImport.scala | 30 ++++++----------- .../dataimport/EGaitDataSetImport.scala | 17 +++++++++- .../models/dataimport/JsonDataSetImport.scala | 17 +++++++++- .../dataimport/RedCapDataSetImport.scala | 17 +++++++++- .../dataimport/SynapseDataSetImport.scala | 17 +++++++++- .../dataimport/TranSmartDataSetImport.scala | 17 +++++++++- .../ChangeFieldEnumsTransformation.scala | 15 +++++++++ .../datatrans/CopyDataSetTransformation.scala | 15 +++++++++ .../datatrans/DataSetMetaTransformation.scala | 32 ++++++------------- .../datatrans/DropFieldsTransformation.scala | 15 +++++++++ .../LinkMultiDataSetsTransformation.scala | 15 +++++++++ .../LinkTwoDataSetsTransformation.scala | 15 +++++++++ ...hGroupsWithConfoundersTransformation.scala | 15 +++++++++ .../RenameFieldsTransformation.scala | 15 +++++++++ .../DataSetCentralImporterImpl.scala | 17 +++++----- .../DataSetCentralTransformerImpl.scala | 17 +++++----- .../LinkMultiDataSetsTransformer.scala | 22 +++++++++---- .../LinkTwoDataSetsTransformer.scala | 4 +-- 21 files changed, 270 insertions(+), 77 deletions(-) diff --git a/build.sbt b/build.sbt index 2351009..d9d5828 100644 --- a/build.sbt +++ b/build.sbt @@ -2,7 +2,7 @@ organization := "org.adada" name := "ada-server" -version := "0.7.3.RC.10.SNAPSHOT.2" +version := "0.7.3.RC.10.SNAPSHOT.7" description := "Server side of Ada Discovery Analytics containing a persistence layer, stats and data import/transformation services, and util classes." diff --git a/src/main/scala/org/ada/server/models/ScheduledTime.scala b/src/main/scala/org/ada/server/models/ScheduledTime.scala index 2535f52..789d874 100644 --- a/src/main/scala/org/ada/server/models/ScheduledTime.scala +++ b/src/main/scala/org/ada/server/models/ScheduledTime.scala @@ -14,6 +14,22 @@ case class ScheduledTime( second: Option[Int] ) +object ScheduledTime { + + def fillZeroes(scheduledTime: ScheduledTime): ScheduledTime = { + def value(int: Option[Int]) = Some(int.getOrElse(0)) + + if (scheduledTime.weekDay.isDefined) { + scheduledTime.copy(hour = value(scheduledTime.hour), minute = value(scheduledTime.minute), second = value(scheduledTime.second)) + } else if (scheduledTime.hour.isDefined) { + scheduledTime.copy(minute = value(scheduledTime.minute), second = value(scheduledTime.second)) + } else if (scheduledTime.minute.isDefined) { + scheduledTime.copy(second = value(scheduledTime.second)) + } else + scheduledTime + } +} + object WeekDay extends Enumeration { case class Val(day: Int) extends super.Val diff --git a/src/main/scala/org/ada/server/models/dataimport/CsvDataSetImport.scala b/src/main/scala/org/ada/server/models/dataimport/CsvDataSetImport.scala index 07a8818..c2e8b65 100644 --- a/src/main/scala/org/ada/server/models/dataimport/CsvDataSetImport.scala +++ b/src/main/scala/org/ada/server/models/dataimport/CsvDataSetImport.scala @@ -27,4 +27,19 @@ case class CsvDataSetImport( dataView: Option[DataView] = None, timeCreated: Date = new Date(), timeLastExecuted: Option[Date] = None -) extends DataSetImport +) extends DataSetImport { + + override def copyCore( + __id: Option[BSONObjectID], + _timeCreated: Date, + _timeLastExecuted: Option[Date], + _scheduled: Boolean, + _scheduledTime: Option[ScheduledTime] + ) = copy( + _id = __id, + timeCreated = _timeCreated, + timeLastExecuted = _timeLastExecuted, + scheduled = _scheduled, + scheduledTime = _scheduledTime + ) +} diff --git a/src/main/scala/org/ada/server/models/dataimport/DataSetImport.scala b/src/main/scala/org/ada/server/models/dataimport/DataSetImport.scala index a28ec8a..51ee823 100644 --- a/src/main/scala/org/ada/server/models/dataimport/DataSetImport.scala +++ b/src/main/scala/org/ada/server/models/dataimport/DataSetImport.scala @@ -20,6 +20,14 @@ trait DataSetImport extends Schedulable { val setting: Option[DataSetSetting] val dataView: Option[DataView] + + def copyCore( + _id: Option[BSONObjectID], + timeCreated: Date, + timeLastExecuted: Option[Date], + scheduled: Boolean, + scheduledTime: Option[ScheduledTime] + ): DataSetImport } object DataSetImport { @@ -43,26 +51,6 @@ object DataSetImport { def of(entity: DataSetImport): Option[BSONObjectID] = entity._id protected def set(entity: DataSetImport, id: Option[BSONObjectID]) = - entity match { - case x: CsvDataSetImport => x.copy(_id = id) - case x: JsonDataSetImport => x.copy(_id = id) - case x: SynapseDataSetImport => x.copy(_id = id) - case x: TranSmartDataSetImport => x.copy(_id = id) - case x: RedCapDataSetImport => x.copy(_id = id) - case x: EGaitDataSetImport => x.copy(_id = id) - } - } - - implicit class DataSetImportExt(val dataSetImport: DataSetImport) extends AnyVal { - - def copyWithTimestamps(timeCreated: Date, timeLastExecuted: Option[Date]): DataSetImport = - dataSetImport match { - case x: CsvDataSetImport => x.copy(timeCreated = timeCreated, timeLastExecuted = timeLastExecuted) - case x: JsonDataSetImport => x.copy(timeCreated = timeCreated, timeLastExecuted = timeLastExecuted) - case x: SynapseDataSetImport => x.copy(timeCreated = timeCreated, timeLastExecuted = timeLastExecuted) - case x: TranSmartDataSetImport => x.copy(timeCreated = timeCreated, timeLastExecuted = timeLastExecuted) - case x: RedCapDataSetImport => x.copy(timeCreated = timeCreated, timeLastExecuted = timeLastExecuted) - case x: EGaitDataSetImport => x.copy(timeCreated = timeCreated, timeLastExecuted = timeLastExecuted) - } + entity.copyCore(id, entity.timeCreated, entity.timeLastExecuted, entity.scheduled, entity.scheduledTime) } } \ No newline at end of file diff --git a/src/main/scala/org/ada/server/models/dataimport/EGaitDataSetImport.scala b/src/main/scala/org/ada/server/models/dataimport/EGaitDataSetImport.scala index 17b4443..8d25182 100644 --- a/src/main/scala/org/ada/server/models/dataimport/EGaitDataSetImport.scala +++ b/src/main/scala/org/ada/server/models/dataimport/EGaitDataSetImport.scala @@ -17,4 +17,19 @@ case class EGaitDataSetImport( dataView: Option[DataView] = None, timeCreated: Date = new Date(), timeLastExecuted: Option[Date] = None -) extends DataSetImport \ No newline at end of file +) extends DataSetImport { + + override def copyCore( + __id: Option[BSONObjectID], + _timeCreated: Date, + _timeLastExecuted: Option[Date], + _scheduled: Boolean, + _scheduledTime: Option[ScheduledTime] + ) = copy( + _id = __id, + timeCreated = _timeCreated, + timeLastExecuted = _timeLastExecuted, + scheduled = _scheduled, + scheduledTime = _scheduledTime + ) +} \ No newline at end of file diff --git a/src/main/scala/org/ada/server/models/dataimport/JsonDataSetImport.scala b/src/main/scala/org/ada/server/models/dataimport/JsonDataSetImport.scala index 2f29100..1e426cd 100644 --- a/src/main/scala/org/ada/server/models/dataimport/JsonDataSetImport.scala +++ b/src/main/scala/org/ada/server/models/dataimport/JsonDataSetImport.scala @@ -23,4 +23,19 @@ case class JsonDataSetImport( dataView: Option[DataView] = None, timeCreated: Date = new Date(), timeLastExecuted: Option[Date] = None -) extends DataSetImport +) extends DataSetImport { + + override def copyCore( + __id: Option[BSONObjectID], + _timeCreated: Date, + _timeLastExecuted: Option[Date], + _scheduled: Boolean, + _scheduledTime: Option[ScheduledTime] + ) = copy( + _id = __id, + timeCreated = _timeCreated, + timeLastExecuted = _timeLastExecuted, + scheduled = _scheduled, + scheduledTime = _scheduledTime + ) +} diff --git a/src/main/scala/org/ada/server/models/dataimport/RedCapDataSetImport.scala b/src/main/scala/org/ada/server/models/dataimport/RedCapDataSetImport.scala index d3d7af5..6f65d42 100644 --- a/src/main/scala/org/ada/server/models/dataimport/RedCapDataSetImport.scala +++ b/src/main/scala/org/ada/server/models/dataimport/RedCapDataSetImport.scala @@ -22,4 +22,19 @@ case class RedCapDataSetImport( dataView: Option[DataView] = None, timeCreated: Date = new Date(), timeLastExecuted: Option[Date] = None -) extends DataSetImport +) extends DataSetImport { + + override def copyCore( + __id: Option[BSONObjectID], + _timeCreated: Date, + _timeLastExecuted: Option[Date], + _scheduled: Boolean, + _scheduledTime: Option[ScheduledTime] + ) = copy( + _id = __id, + timeCreated = _timeCreated, + timeLastExecuted = _timeLastExecuted, + scheduled = _scheduled, + scheduledTime = _scheduledTime + ) +} diff --git a/src/main/scala/org/ada/server/models/dataimport/SynapseDataSetImport.scala b/src/main/scala/org/ada/server/models/dataimport/SynapseDataSetImport.scala index 26e6fe7..26488c2 100644 --- a/src/main/scala/org/ada/server/models/dataimport/SynapseDataSetImport.scala +++ b/src/main/scala/org/ada/server/models/dataimport/SynapseDataSetImport.scala @@ -20,4 +20,19 @@ case class SynapseDataSetImport( dataView: Option[DataView] = None, timeCreated: Date = new Date(), timeLastExecuted: Option[Date] = None -) extends DataSetImport +) extends DataSetImport { + + override def copyCore( + __id: Option[BSONObjectID], + _timeCreated: Date, + _timeLastExecuted: Option[Date], + _scheduled: Boolean, + _scheduledTime: Option[ScheduledTime] + ) = copy( + _id = __id, + timeCreated = _timeCreated, + timeLastExecuted = _timeLastExecuted, + scheduled = _scheduled, + scheduledTime = _scheduledTime + ) +} diff --git a/src/main/scala/org/ada/server/models/dataimport/TranSmartDataSetImport.scala b/src/main/scala/org/ada/server/models/dataimport/TranSmartDataSetImport.scala index b27a981..e6c66f9 100644 --- a/src/main/scala/org/ada/server/models/dataimport/TranSmartDataSetImport.scala +++ b/src/main/scala/org/ada/server/models/dataimport/TranSmartDataSetImport.scala @@ -24,4 +24,19 @@ case class TranSmartDataSetImport( dataView: Option[DataView] = None, timeCreated: Date = new Date(), timeLastExecuted: Option[Date] = None -) extends DataSetImport +) extends DataSetImport { + + override def copyCore( + __id: Option[BSONObjectID], + _timeCreated: Date, + _timeLastExecuted: Option[Date], + _scheduled: Boolean, + _scheduledTime: Option[ScheduledTime] + ) = copy( + _id = __id, + timeCreated = _timeCreated, + timeLastExecuted = _timeLastExecuted, + scheduled = _scheduled, + scheduledTime = _scheduledTime + ) +} diff --git a/src/main/scala/org/ada/server/models/datatrans/ChangeFieldEnumsTransformation.scala b/src/main/scala/org/ada/server/models/datatrans/ChangeFieldEnumsTransformation.scala index aa270b4..a7fe775 100644 --- a/src/main/scala/org/ada/server/models/datatrans/ChangeFieldEnumsTransformation.scala +++ b/src/main/scala/org/ada/server/models/datatrans/ChangeFieldEnumsTransformation.scala @@ -17,5 +17,20 @@ case class ChangeFieldEnumsTransformation( timeCreated: Date = new Date(), timeLastExecuted: Option[Date] = None ) extends DataSetMetaTransformation { + override val sourceDataSetIds = Seq(sourceDataSetId) + + override def copyCore( + __id: Option[BSONObjectID], + _timeCreated: Date, + _timeLastExecuted: Option[Date], + _scheduled: Boolean, + _scheduledTime: Option[ScheduledTime] + ) = copy( + _id = __id, + timeCreated = _timeCreated, + timeLastExecuted = _timeLastExecuted, + scheduled = _scheduled, + scheduledTime = _scheduledTime + ) } \ No newline at end of file diff --git a/src/main/scala/org/ada/server/models/datatrans/CopyDataSetTransformation.scala b/src/main/scala/org/ada/server/models/datatrans/CopyDataSetTransformation.scala index 748b1b5..bd05471 100644 --- a/src/main/scala/org/ada/server/models/datatrans/CopyDataSetTransformation.scala +++ b/src/main/scala/org/ada/server/models/datatrans/CopyDataSetTransformation.scala @@ -18,5 +18,20 @@ case class CopyDataSetTransformation( timeCreated: Date = new Date(), timeLastExecuted: Option[Date] = None ) extends DataSetTransformation { + override val sourceDataSetIds = Seq(sourceDataSetId) + + override def copyCore( + __id: Option[BSONObjectID], + _timeCreated: Date, + _timeLastExecuted: Option[Date], + _scheduled: Boolean, + _scheduledTime: Option[ScheduledTime] + ) = copy( + _id = __id, + timeCreated = _timeCreated, + timeLastExecuted = _timeLastExecuted, + scheduled = _scheduled, + scheduledTime = _scheduledTime + ) } \ No newline at end of file diff --git a/src/main/scala/org/ada/server/models/datatrans/DataSetMetaTransformation.scala b/src/main/scala/org/ada/server/models/datatrans/DataSetMetaTransformation.scala index 3b50bbf..0172a3c 100644 --- a/src/main/scala/org/ada/server/models/datatrans/DataSetMetaTransformation.scala +++ b/src/main/scala/org/ada/server/models/datatrans/DataSetMetaTransformation.scala @@ -15,6 +15,14 @@ trait DataSetMetaTransformation extends Schedulable { val timeLastExecuted: Option[Date] val sourceDataSetIds: Seq[String] + + def copyCore( + _id: Option[BSONObjectID], + timeCreated: Date, + timeLastExecuted: Option[Date], + scheduled: Boolean, + scheduledTime: Option[ScheduledTime] + ): DataSetMetaTransformation } trait DataSetTransformation extends DataSetMetaTransformation { @@ -60,28 +68,6 @@ object DataSetTransformation { def of(entity: DataSetMetaTransformation): Option[BSONObjectID] = entity._id protected def set(entity: DataSetMetaTransformation, id: Option[BSONObjectID]) = - entity match { - case x: CopyDataSetTransformation => x.copy(_id = id) - case x: DropFieldsTransformation => x.copy(_id = id) - case x: RenameFieldsTransformation => x.copy(_id = id) - case x: ChangeFieldEnumsTransformation => x.copy(_id = id) - case x: MatchGroupsWithConfoundersTransformation => x.copy(_id = id) - case x: LinkTwoDataSetsTransformation => x.copy(_id = id) - case x: LinkMultiDataSetsTransformation => x.copy(_id = id) - } - } - - implicit class DataSetMetaTransformationExt(val dataSetImport: DataSetMetaTransformation) extends AnyVal { - - def copyWithTimestamps(timeCreated: Date, timeLastExecuted: Option[Date]) = - dataSetImport match { - case x: CopyDataSetTransformation => x.copy(timeCreated = timeCreated, timeLastExecuted = timeLastExecuted) - case x: DropFieldsTransformation => x.copy(timeCreated = timeCreated, timeLastExecuted = timeLastExecuted) - case x: RenameFieldsTransformation => x.copy(timeCreated = timeCreated, timeLastExecuted = timeLastExecuted) - case x: ChangeFieldEnumsTransformation => x.copy(timeCreated = timeCreated, timeLastExecuted = timeLastExecuted) - case x: MatchGroupsWithConfoundersTransformation => x.copy(timeCreated = timeCreated, timeLastExecuted = timeLastExecuted) - case x: LinkTwoDataSetsTransformation => x.copy(timeCreated = timeCreated, timeLastExecuted = timeLastExecuted) - case x: LinkMultiDataSetsTransformation => x.copy(timeCreated = timeCreated, timeLastExecuted = timeLastExecuted) - } + entity.copyCore(id, entity.timeCreated, entity.timeLastExecuted, entity.scheduled, entity.scheduledTime) } } \ No newline at end of file diff --git a/src/main/scala/org/ada/server/models/datatrans/DropFieldsTransformation.scala b/src/main/scala/org/ada/server/models/datatrans/DropFieldsTransformation.scala index 9c63429..509cf3c 100644 --- a/src/main/scala/org/ada/server/models/datatrans/DropFieldsTransformation.scala +++ b/src/main/scala/org/ada/server/models/datatrans/DropFieldsTransformation.scala @@ -20,5 +20,20 @@ case class DropFieldsTransformation( timeCreated: Date = new Date(), timeLastExecuted: Option[Date] = None ) extends DataSetTransformation { + override val sourceDataSetIds = Seq(sourceDataSetId) + + override def copyCore( + __id: Option[BSONObjectID], + _timeCreated: Date, + _timeLastExecuted: Option[Date], + _scheduled: Boolean, + _scheduledTime: Option[ScheduledTime] + ) = copy( + _id = __id, + timeCreated = _timeCreated, + timeLastExecuted = _timeLastExecuted, + scheduled = _scheduled, + scheduledTime = _scheduledTime + ) } \ No newline at end of file diff --git a/src/main/scala/org/ada/server/models/datatrans/LinkMultiDataSetsTransformation.scala b/src/main/scala/org/ada/server/models/datatrans/LinkMultiDataSetsTransformation.scala index 17cf203..2b56a02 100644 --- a/src/main/scala/org/ada/server/models/datatrans/LinkMultiDataSetsTransformation.scala +++ b/src/main/scala/org/ada/server/models/datatrans/LinkMultiDataSetsTransformation.scala @@ -19,7 +19,22 @@ case class LinkMultiDataSetsTransformation( timeCreated: Date = new Date(), timeLastExecuted: Option[Date] = None ) extends DataSetTransformation { + override val sourceDataSetIds = linkedDataSetSpecs.map(_.dataSetId) + + override def copyCore( + __id: Option[BSONObjectID], + _timeCreated: Date, + _timeLastExecuted: Option[Date], + _scheduled: Boolean, + _scheduledTime: Option[ScheduledTime] + ) = copy( + _id = __id, + timeCreated = _timeCreated, + timeLastExecuted = _timeLastExecuted, + scheduled = _scheduled, + scheduledTime = _scheduledTime + ) } case class LinkedDataSetSpec( diff --git a/src/main/scala/org/ada/server/models/datatrans/LinkTwoDataSetsTransformation.scala b/src/main/scala/org/ada/server/models/datatrans/LinkTwoDataSetsTransformation.scala index 940f7bb..1b291ae 100644 --- a/src/main/scala/org/ada/server/models/datatrans/LinkTwoDataSetsTransformation.scala +++ b/src/main/scala/org/ada/server/models/datatrans/LinkTwoDataSetsTransformation.scala @@ -23,5 +23,20 @@ case class LinkTwoDataSetsTransformation( timeCreated: Date = new Date(), timeLastExecuted: Option[Date] = None ) extends DataSetTransformation { + override val sourceDataSetIds = Seq(leftSourceDataSetId, rightSourceDataSetId) + + override def copyCore( + __id: Option[BSONObjectID], + _timeCreated: Date, + _timeLastExecuted: Option[Date], + _scheduled: Boolean, + _scheduledTime: Option[ScheduledTime] + ) = copy( + _id = __id, + timeCreated = _timeCreated, + timeLastExecuted = _timeLastExecuted, + scheduled = _scheduled, + scheduledTime = _scheduledTime + ) } \ No newline at end of file diff --git a/src/main/scala/org/ada/server/models/datatrans/MatchGroupsWithConfoundersTransformation.scala b/src/main/scala/org/ada/server/models/datatrans/MatchGroupsWithConfoundersTransformation.scala index 13bcc5b..fcbd444 100644 --- a/src/main/scala/org/ada/server/models/datatrans/MatchGroupsWithConfoundersTransformation.scala +++ b/src/main/scala/org/ada/server/models/datatrans/MatchGroupsWithConfoundersTransformation.scala @@ -23,5 +23,20 @@ case class MatchGroupsWithConfoundersTransformation( timeCreated: Date = new Date(), timeLastExecuted: Option[Date] = None ) extends DataSetTransformation { + override val sourceDataSetIds = Seq(sourceDataSetId) + + override def copyCore( + __id: Option[BSONObjectID], + _timeCreated: Date, + _timeLastExecuted: Option[Date], + _scheduled: Boolean, + _scheduledTime: Option[ScheduledTime] + ) = copy( + _id = __id, + timeCreated = _timeCreated, + timeLastExecuted = _timeLastExecuted, + scheduled = _scheduled, + scheduledTime = _scheduledTime + ) } \ No newline at end of file diff --git a/src/main/scala/org/ada/server/models/datatrans/RenameFieldsTransformation.scala b/src/main/scala/org/ada/server/models/datatrans/RenameFieldsTransformation.scala index 73d826b..5623053 100644 --- a/src/main/scala/org/ada/server/models/datatrans/RenameFieldsTransformation.scala +++ b/src/main/scala/org/ada/server/models/datatrans/RenameFieldsTransformation.scala @@ -19,5 +19,20 @@ case class RenameFieldsTransformation( timeCreated: Date = new Date(), timeLastExecuted: Option[Date] = None ) extends DataSetTransformation { + override val sourceDataSetIds = Seq(sourceDataSetId) + + override def copyCore( + __id: Option[BSONObjectID], + _timeCreated: Date, + _timeLastExecuted: Option[Date], + _scheduled: Boolean, + _scheduledTime: Option[ScheduledTime] + ) = copy( + _id = __id, + timeCreated = _timeCreated, + timeLastExecuted = _timeLastExecuted, + scheduled = _scheduled, + scheduledTime = _scheduledTime + ) } \ No newline at end of file diff --git a/src/main/scala/org/ada/server/services/importers/DataSetCentralImporterImpl.scala b/src/main/scala/org/ada/server/services/importers/DataSetCentralImporterImpl.scala index 7da32e3..e8da8a1 100644 --- a/src/main/scala/org/ada/server/services/importers/DataSetCentralImporterImpl.scala +++ b/src/main/scala/org/ada/server/services/importers/DataSetCentralImporterImpl.scala @@ -28,16 +28,15 @@ protected[services] class DataSetCentralImporterImpl @Inject()( override protected def postExec( input: DataSetImport, exec: DataSetImporter[DataSetImport] - ): Future[Unit] = { - val futureUpdate = - if (input._id.isDefined) { + ) = + for { + _ <- if (input._id.isDefined) { // update if id exists, i.e., it's a persisted import - val updateInput = input.copyWithTimestamps(timeCreated = input.timeCreated, timeLastExecuted = Some(new Date())) - repo.update(updateInput) - } else Future(()) + val updatedInput = input.copyCore(input._id, input.timeCreated, Some(new Date()), input.scheduled, input.scheduledTime) + repo.update(updatedInput).map(_ => ()) + } else + Future(()) - futureUpdate.map(_ => + } yield messageLogger.info(s"Import of data set '${input.dataSetName}' successfully finished.") - ) - } } \ No newline at end of file diff --git a/src/main/scala/org/ada/server/services/transformers/DataSetCentralTransformerImpl.scala b/src/main/scala/org/ada/server/services/transformers/DataSetCentralTransformerImpl.scala index 24581a7..ef46ea5 100644 --- a/src/main/scala/org/ada/server/services/transformers/DataSetCentralTransformerImpl.scala +++ b/src/main/scala/org/ada/server/services/transformers/DataSetCentralTransformerImpl.scala @@ -29,16 +29,15 @@ protected[services] class DataSetCentralTransformerImpl @Inject()( override protected def postExec( input: DataSetMetaTransformation, exec: DataSetMetaTransformer[DataSetMetaTransformation] - ): Future[Unit] = { - val futureUpdate = - if (input._id.isDefined) { + ) = + for { + _ <- if (input._id.isDefined) { // update if id exists, i.e., it's a persisted transformation - val updatedInput = input.copyWithTimestamps(timeCreated = input.timeCreated, timeLastExecuted = Some(new Date())) - repo.update(updatedInput) - } else Future(()) + val updatedInput = input.copyCore(input._id, input.timeCreated, Some(new Date()), input.scheduled, input.scheduledTime) + repo.update(updatedInput).map(_ => ()) + } else + Future(()) - futureUpdate.map(_ => + } yield messageLogger.info(s"Transformation of data set(s) '${input.sourceDataSetIds.mkString(", ")}' successfully finished.") - ) - } } \ No newline at end of file diff --git a/src/main/scala/org/ada/server/services/transformers/LinkMultiDataSetsTransformer.scala b/src/main/scala/org/ada/server/services/transformers/LinkMultiDataSetsTransformer.scala index f6e159f..aec93f1 100644 --- a/src/main/scala/org/ada/server/services/transformers/LinkMultiDataSetsTransformer.scala +++ b/src/main/scala/org/ada/server/services/transformers/LinkMultiDataSetsTransformer.scala @@ -4,11 +4,11 @@ import akka.stream.scaladsl.StreamConverters import org.ada.server.AdaException import org.ada.server.dataaccess.dataset.DataSetAccessor import org.ada.server.field.FieldType -import org.ada.server.field.FieldUtil.{JsonFieldOps, FieldOps, fieldTypeOrdering, NamedFieldType} +import org.ada.server.field.FieldUtil.{FieldOps, JsonFieldOps, NamedFieldType, fieldTypeOrdering} import org.ada.server.models.DataSetFormattersAndIds.{FieldIdentity, JsObjectIdentity} import org.ada.server.models.Field import org.ada.server.models.datatrans.{LinkMultiDataSetsTransformation, LinkedDataSetSpec} -import org.incal.core.dataaccess.AscSort +import org.incal.core.dataaccess.{AscSort, NotEqualsNullCriterion} import org.incal.core.util.crossProduct import org.incal.core.dataaccess.Criterion._ import play.api.libs.json.{JsObject, Json} @@ -28,7 +28,6 @@ private class LinkMultiDataSetsTransformer extends AbstractDataSetTransformer[Li if (spec.linkedDataSetSpecs.size < 2) throw new AdaException(s"LinkMultiDataSetsTransformer expects at least two data sets but got ${spec.linkedDataSetSpecs.size}.") - for { // prepare data set infos with initialized accessors and load fields dataSetInfos <- Future.sequence(spec.linkedDataSetSpecs.map(createDataSetInfo)) @@ -122,10 +121,18 @@ private class LinkMultiDataSetsTransformer extends AbstractDataSetTransformer[Li linkRightJsonsMaps: Seq[Map[Seq[String], Traversable[JsObject]]])( json: JsObject ): List[JsObject] = { - val link = json.toDisplayStrings(leftDataSetInfo.linkFieldTypes) val jsonId = (json \ JsObjectIdentity.name).asOpt[BSONObjectID] - val rightJsonsCrossed = crossProduct(linkRightJsonsMaps.flatMap(_.get(link))) + // check if the link is defined (i.e. all values are defined) + val isLinkDefined = json.toValues(leftDataSetInfo.linkFieldTypes).forall(_.isDefined) + + // perform a cross-product of the right jsons (if the link is defined) + val rightJsonsCrossed = + if (isLinkDefined) { + val link = json.toDisplayStrings(leftDataSetInfo.linkFieldTypes) + crossProduct(linkRightJsonsMaps.flatMap(_.get(link))) + } else + Nil if (rightJsonsCrossed.isEmpty) { List(json) @@ -145,7 +152,10 @@ private class LinkMultiDataSetsTransformer extends AbstractDataSetTransformer[Li addDataSetIdToRightFieldNames: Boolean ): Future[Map[Seq[String], Traversable[JsObject]]] = for { - jsons <- dataSetInfo.dsa.dataSetRepo.find(projection = dataSetInfo.fieldNames) + jsons <- dataSetInfo.dsa.dataSetRepo.find( + criteria = dataSetInfo.linkFieldNames.map(NotEqualsNullCriterion), // all of the link fields must be defined (not null) + projection = dataSetInfo.fieldNames + ) } yield { val linkFieldNameSet = dataSetInfo.linkFieldNames.toSet jsons.map { json => diff --git a/src/main/scala/org/ada/server/services/transformers/LinkTwoDataSetsTransformer.scala b/src/main/scala/org/ada/server/services/transformers/LinkTwoDataSetsTransformer.scala index 0328289..89ca71b 100644 --- a/src/main/scala/org/ada/server/services/transformers/LinkTwoDataSetsTransformer.scala +++ b/src/main/scala/org/ada/server/services/transformers/LinkTwoDataSetsTransformer.scala @@ -4,7 +4,7 @@ import javax.inject.Inject import org.ada.server.models.datatrans.{LinkMultiDataSetsTransformation, LinkTwoDataSetsTransformation, LinkedDataSetSpec} import scala.reflect.runtime.universe.TypeTag -private class LinkTwoDataSetsTransformer @Inject()(multiTransformer: LinkMultiDataSetsTransformer) extends DataSetTransformer[LinkTwoDataSetsTransformation] { +private class LinkTwoDataSetsTransformer @Inject()(multiTransformer: LinkMultiDataSetsTransformer) extends AbstractDataSetTransformer[LinkTwoDataSetsTransformation] { // just delegates to LinkMultiDataSetsTransformer override def runAsFuture( @@ -23,5 +23,5 @@ private class LinkTwoDataSetsTransformer @Inject()(multiTransformer: LinkMultiDa ) } - override protected implicit val typeTag = implicitly[TypeTag[LinkTwoDataSetsTransformation]] + protected def execInternal(spec: LinkTwoDataSetsTransformation) = ??? // not called } \ No newline at end of file