Skip to content

Commit

Permalink
Change type of the initial version type
Browse files Browse the repository at this point in the history
Part of #61

This PR Changes the initial version of ChangeTracking stream to Option[Long] from Long
  • Loading branch information
s-vitaliy committed Nov 6, 2024
1 parent aee1e61 commit 84ce5d2
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,16 @@ class MsSqlConnection(val connectionOptions: ConnectionOptions) extends AutoClos

/**
* Gets the changes in the database since the given version.
* @param latestVersion The version to start from.
* @param maybeLatestVersion The version to start from.
* @param lookBackInterval The look back interval for the query.
* @return A future containing the changes in the database since the given version and the latest observed version.
*/
def getChanges(latestVersion: Long, lookBackInterval: Duration)(using queryRunner: QueryRunner): Future[(QueryResult[LazyQueryResult.OutputType], Long)] =
val query = QueryProvider.getChangeTrackingVersionQuery(connectionOptions.databaseName, latestVersion, lookBackInterval)
def getChanges(maybeLatestVersion: Option[Long], lookBackInterval: Duration)(using queryRunner: QueryRunner): Future[(QueryResult[LazyQueryResult.OutputType], Long)] =
val query = QueryProvider.getChangeTrackingVersionQuery(connectionOptions.databaseName, maybeLatestVersion, lookBackInterval)

for versionResult <- queryRunner.executeQuery(query, connection, (st, rs) => ScalarQueryResult.apply(st, rs, readChangeTrackingVersion))
version = versionResult.read.getOrElse(0L)
changesQuery <- QueryProvider.getChangesQuery(this, version)
version = versionResult.read.getOrElse(maybeLatestVersion.getOrElse(0L))
changesQuery <- QueryProvider.getChangesQuery(this, version - 1)
result <- queryRunner.executeQuery(changesQuery, connection, LazyQueryResult.apply)
yield (result, version)

Expand Down Expand Up @@ -248,12 +248,13 @@ object QueryProvider:
val mergeExpression = QueryProvider.getMergeExpression(columnSummaries, "tq")
val columnExpression = QueryProvider.getChangeTrackingColumns(columnSummaries, "ct", "tq")
val matchStatement = QueryProvider.getMatchStatement(columnSummaries, "ct", "tq", None)
QueryProvider.getChangesQuery(
val query = QueryProvider.getChangesQuery(
msSqlConnection.connectionOptions,
mergeExpression,
columnExpression,
matchStatement,
fromVersion)
query
})

/**
Expand Down Expand Up @@ -292,17 +293,17 @@ object QueryProvider:
* Gets the query that retrieves the change tracking version for the Microsoft SQL Server database.
*
* @param databaseName The name of the database.
* @param version The version to start from.
* @param maybeVersion The version to start from.
* @param lookBackRange The look back range for the query.
* @return The change tracking version query for the Microsoft SQL Server database.
*/
def getChangeTrackingVersionQuery(databaseName: String, version: Long, lookBackRange: Duration)(using formatter: DateTimeFormatter): MsSqlQuery = {
version match
case 0 =>
def getChangeTrackingVersionQuery(databaseName: String, maybeVersion: Option[Long], lookBackRange: Duration)(using formatter: DateTimeFormatter): MsSqlQuery = {
maybeVersion match
case None =>
val lookBackTime = Instant.now().minusSeconds(lookBackRange.getSeconds)
val formattedTime = formatter.format(LocalDateTime.ofInstant(lookBackTime, ZoneOffset.UTC))
s"SELECT MIN(commit_ts) FROM $databaseName.sys.dm_tran_commit_table WHERE commit_time > '$formattedTime'"
case _ => s"SELECT MIN(commit_ts) FROM sys.dm_tran_commit_table WHERE commit_ts > $version"
case Some(version) => s"SELECT MIN(commit_ts) FROM $databaseName.sys.dm_tran_commit_table WHERE commit_ts > $version"
}

private def getMergeExpression(cs: List[ColumnSummary], tableAlias: String): String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class MsSqlConnectorsTests extends flatspec.AsyncFlatSpec with Matchers:
"MsSqlConnection" should "return correct number of rows on getChanges" in withDatabase { dbInfo =>
val connection = MsSqlConnection(dbInfo.connectionOptions)
for schema <- connection.getSchema
result <- connection.getChanges(0, Duration.ofDays(1))
result <- connection.getChanges(None, Duration.ofDays(1))
(columns, _ ) = result
changedData = columns.read.toList
yield {
Expand All @@ -151,7 +151,7 @@ class MsSqlConnectorsTests extends flatspec.AsyncFlatSpec with Matchers:
"MsSqlConnection" should "update latest version when changes received" in withDatabase { dbInfo =>
val connection = MsSqlConnection(dbInfo.connectionOptions)
for schema <- connection.getSchema
result <- connection.getChanges(0, Duration.ofDays(1))
result <- connection.getChanges(None, Duration.ofDays(1))
(_, latestVersion) = result
yield {
latestVersion should be > 0L
Expand Down

0 comments on commit 84ce5d2

Please sign in to comment.