From 84ce5d201489f151e455853daa1b0670e4ae42d3 Mon Sep 17 00:00:00 2001 From: Vitalii Savitskii Date: Wed, 6 Nov 2024 13:05:01 +0100 Subject: [PATCH] Change type of the initial version type Part of #61 This PR Changes the initial version of ChangeTracking stream to Option[Long] from Long --- .../services/mssql/MsSqlConnection.scala | 23 ++++++++++--------- .../mssql/MsSqlConnectorsTests.scala | 4 ++-- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/framework/arcane-framework/src/main/scala/services/mssql/MsSqlConnection.scala b/framework/arcane-framework/src/main/scala/services/mssql/MsSqlConnection.scala index 86a1eb8..47c91a1 100644 --- a/framework/arcane-framework/src/main/scala/services/mssql/MsSqlConnection.scala +++ b/framework/arcane-framework/src/main/scala/services/mssql/MsSqlConnection.scala @@ -96,16 +96,16 @@ class MsSqlConnection(val connectionOptions: ConnectionOptions) extends AutoClos /** * Gets the changes in the database since the given version. - * @param latestVersion The version to start from. + * @param maybeLatestVersion The version to start from. * @param lookBackInterval The look back interval for the query. * @return A future containing the changes in the database since the given version and the latest observed version. */ - def getChanges(latestVersion: Long, lookBackInterval: Duration)(using queryRunner: QueryRunner): Future[(QueryResult[LazyQueryResult.OutputType], Long)] = - val query = QueryProvider.getChangeTrackingVersionQuery(connectionOptions.databaseName, latestVersion, lookBackInterval) + def getChanges(maybeLatestVersion: Option[Long], lookBackInterval: Duration)(using queryRunner: QueryRunner): Future[(QueryResult[LazyQueryResult.OutputType], Long)] = + val query = QueryProvider.getChangeTrackingVersionQuery(connectionOptions.databaseName, maybeLatestVersion, lookBackInterval) for versionResult <- queryRunner.executeQuery(query, connection, (st, rs) => ScalarQueryResult.apply(st, rs, readChangeTrackingVersion)) - version = versionResult.read.getOrElse(0L) - changesQuery <- QueryProvider.getChangesQuery(this, version) + version = versionResult.read.getOrElse(maybeLatestVersion.getOrElse(0L)) + changesQuery <- QueryProvider.getChangesQuery(this, version - 1) result <- queryRunner.executeQuery(changesQuery, connection, LazyQueryResult.apply) yield (result, version) @@ -248,12 +248,13 @@ object QueryProvider: val mergeExpression = QueryProvider.getMergeExpression(columnSummaries, "tq") val columnExpression = QueryProvider.getChangeTrackingColumns(columnSummaries, "ct", "tq") val matchStatement = QueryProvider.getMatchStatement(columnSummaries, "ct", "tq", None) - QueryProvider.getChangesQuery( + val query = QueryProvider.getChangesQuery( msSqlConnection.connectionOptions, mergeExpression, columnExpression, matchStatement, fromVersion) + query }) /** @@ -292,17 +293,17 @@ object QueryProvider: * Gets the query that retrieves the change tracking version for the Microsoft SQL Server database. * * @param databaseName The name of the database. - * @param version The version to start from. + * @param maybeVersion The version to start from. * @param lookBackRange The look back range for the query. * @return The change tracking version query for the Microsoft SQL Server database. */ - def getChangeTrackingVersionQuery(databaseName: String, version: Long, lookBackRange: Duration)(using formatter: DateTimeFormatter): MsSqlQuery = { - version match - case 0 => + def getChangeTrackingVersionQuery(databaseName: String, maybeVersion: Option[Long], lookBackRange: Duration)(using formatter: DateTimeFormatter): MsSqlQuery = { + maybeVersion match + case None => val lookBackTime = Instant.now().minusSeconds(lookBackRange.getSeconds) val formattedTime = formatter.format(LocalDateTime.ofInstant(lookBackTime, ZoneOffset.UTC)) s"SELECT MIN(commit_ts) FROM $databaseName.sys.dm_tran_commit_table WHERE commit_time > '$formattedTime'" - case _ => s"SELECT MIN(commit_ts) FROM sys.dm_tran_commit_table WHERE commit_ts > $version" + case Some(version) => s"SELECT MIN(commit_ts) FROM $databaseName.sys.dm_tran_commit_table WHERE commit_ts > $version" } private def getMergeExpression(cs: List[ColumnSummary], tableAlias: String): String = diff --git a/framework/arcane-framework/src/test/scala/services/connectors/mssql/MsSqlConnectorsTests.scala b/framework/arcane-framework/src/test/scala/services/connectors/mssql/MsSqlConnectorsTests.scala index 399744d..da93ef6 100644 --- a/framework/arcane-framework/src/test/scala/services/connectors/mssql/MsSqlConnectorsTests.scala +++ b/framework/arcane-framework/src/test/scala/services/connectors/mssql/MsSqlConnectorsTests.scala @@ -140,7 +140,7 @@ class MsSqlConnectorsTests extends flatspec.AsyncFlatSpec with Matchers: "MsSqlConnection" should "return correct number of rows on getChanges" in withDatabase { dbInfo => val connection = MsSqlConnection(dbInfo.connectionOptions) for schema <- connection.getSchema - result <- connection.getChanges(0, Duration.ofDays(1)) + result <- connection.getChanges(None, Duration.ofDays(1)) (columns, _ ) = result changedData = columns.read.toList yield { @@ -151,7 +151,7 @@ class MsSqlConnectorsTests extends flatspec.AsyncFlatSpec with Matchers: "MsSqlConnection" should "update latest version when changes received" in withDatabase { dbInfo => val connection = MsSqlConnection(dbInfo.connectionOptions) for schema <- connection.getSchema - result <- connection.getChanges(0, Duration.ofDays(1)) + result <- connection.getChanges(None, Duration.ofDays(1)) (_, latestVersion) = result yield { latestVersion should be > 0L