Skip to content

Commit

Permalink
Implement grouping and look back interval (#83)
Browse files Browse the repository at this point in the history
* Implement DI and basic interfaces

Part of #61.

Implement the DI using ZIO and add the interfaces for the basic services

* Implement grouping and look back interval

Part of #61.

Implemented:
- Graph processor that reads lazy results and creates a grouped chunk of DataRow for the Sink
- Added Settings models: `GroupingSettings`, `VersionedDataGraphBuilderSettings`
  • Loading branch information
s-vitaliy authored Nov 12, 2024
1 parent a886d81 commit 6fd9724
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.sneaksanddata.arcane.framework
package models.settings

import java.time.Duration

/**
* Provides grouping settings for the stream
*/
trait GroupingSettings {

/**
* The interval to group the data.
*/
val groupingInterval: Duration

/**
* The number of rows per group.
*/
val rowsPerGroup: Int
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.sneaksanddata.arcane.framework
package models.settings

import java.time.Duration

/**
* Provides settings for a stream source.
*/
trait VersionedDataGraphBuilderSettings {

/**
* The interval to look back for changes if the version is empty.
*/
val lookBackInterval: Duration
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.sneaksanddata.arcane.framework
package services.mssql

import models.{ArcaneSchema, ArcaneType, Field}
import models.{ArcaneSchema, ArcaneType, DataRow, Field}
import services.base.{CanAdd, SchemaProvider}
import services.mssql.MsSqlConnection.{DATE_PARTITION_KEY, UPSERT_MERGE_KEY, VersionedBatch, toArcaneType}
import services.mssql.base.{CanPeekHead, QueryResult}
Expand Down Expand Up @@ -215,10 +215,11 @@ object MsSqlConnection:
case java.sql.Types.VARCHAR => Success(ArcaneType.StringType)
case _ => Failure(new IllegalArgumentException(s"Unsupported SQL type: $sqlType"))

type DataBatch = QueryResult[LazyQueryResult.OutputType] & CanPeekHead[LazyQueryResult.OutputType]
/**
* Represents a versioned batch of data.
*/
type VersionedBatch = (QueryResult[LazyQueryResult.OutputType] & CanPeekHead[LazyQueryResult.OutputType], Long)
type VersionedBatch = (DataBatch, Long)

/**
* Ensures that the head of the result (if any) saved and cannot be lost
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package services.mssql.base
/**
* Represents the result of a query to a SQL database.
*/
trait QueryResult[Output]:
trait QueryResult[Output] extends AutoCloseable:

/**
* The output type of the query result.
Expand All @@ -16,7 +16,7 @@ trait QueryResult[Output]:
*
* @return The result of the query.
*/
def read: OutputType
def read: Output

/**
* Represents a query result that can peek the head of the result.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import scala.util.{Failure, Success, Try}
* @param resultSet The result set of the query.
*/
class LazyQueryResult(protected val statement: Statement, resultSet: ResultSet, eagerHead: List[DataRow]) extends QueryResult[LazyList[DataRow]]
with ResultSetOwner with CanPeekHead[LazyList[DataRow]]:
with CanPeekHead[LazyList[DataRow]] with ResultSetOwner:

/**
* Reads the result of the query.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,42 +1,65 @@
package com.sneaksanddata.arcane.framework
package services.streaming

import services.mssql.MsSqlConnection.VersionedBatch
import models.DataRow
import models.settings.VersionedDataGraphBuilderSettings
import services.mssql.MsSqlConnection.{DataBatch, VersionedBatch}
import services.mssql.given_HasVersion_VersionedBatch
import services.mssql.query.LazyQueryResult.OutputType
import services.streaming.base.{StreamGraphBuilder, StreamLifetimeService, VersionedDataProvider}
import services.streaming.base.{BatchProcessor, StreamGraphBuilder, StreamLifetimeService, VersionedDataProvider}

import zio.stream.ZStream
import zio.{ZIO, ZLayer}
import zio.{Chunk, ZIO, ZLayer}

import java.time.Duration


class VersionedDataGraphBuilder(versionedDataProvider: VersionedDataProvider[Long, VersionedBatch],
streamLifetimeService: StreamLifetimeService) extends StreamGraphBuilder[OutputType] {
/**
* The stream graph builder that reads the changes from the database.
* @param VersionedDataGraphBuilderSettings The settings for the stream source.
* @param versionedDataProvider The versioned data provider.
* @param streamLifetimeService The stream lifetime service.
* @param batchProcessor The batch processor.
*/
class VersionedDataGraphBuilder(VersionedDataGraphBuilderSettings: VersionedDataGraphBuilderSettings,
versionedDataProvider: VersionedDataProvider[Long, VersionedBatch],
streamLifetimeService: StreamLifetimeService,
batchProcessor: BatchProcessor[DataBatch, Chunk[DataRow]])
extends StreamGraphBuilder[OutputType, Chunk[DataRow]]:

/**
* Builds a stream that reads the changes from the database.
*
* @return The stream that reads the changes from the database.
*/
def create: ZStream[Any, Throwable, OutputType] =
ZStream.unfoldZIO(versionedDataProvider.firstVersion) { previousVersion =>
override def create: ZStream[Any, Throwable, Chunk[DataRow]] = this.createStream.via(this.batchProcessor.process)

private def createStream = ZStream.unfoldZIO(versionedDataProvider.firstVersion) { previousVersion =>
if streamLifetimeService.cancelled then ZIO.succeed(None) else continueStream(previousVersion)
}

private def continueStream(previousVersion: Option[Long]): ZIO[Any, Throwable, Some[(OutputType, Option[Long])]] =
versionedDataProvider.requestChanges(previousVersion, Duration.ofDays(1)) map { versionedBatch =>
private def continueStream(previousVersion: Option[Long]): ZIO[Any, Throwable, Some[(DataBatch, Option[Long])]] =
versionedDataProvider.requestChanges(previousVersion, VersionedDataGraphBuilderSettings.lookBackInterval) map { versionedBatch =>
val latestVersion = versionedBatch.getLatestVersion
val (queryResult, _) = versionedBatch
Some(queryResult.read, latestVersion)
Some(queryResult, latestVersion)
}
}

/**
* The companion object for the VersionedDataGraphBuilder class.
*/
object VersionedDataGraphBuilder:
val layer: ZLayer[VersionedDataProvider[Long, VersionedBatch] & StreamLifetimeService, Nothing, StreamGraphBuilder[OutputType]] =
private type GraphBuilderLayerTypes = VersionedDataProvider[Long, VersionedBatch]
& StreamLifetimeService
& BatchProcessor[DataBatch, Chunk[DataRow]]
& VersionedDataGraphBuilderSettings

/**
* The ZLayer that creates the VersionedDataGraphBuilder.
*/
val layer: ZLayer[GraphBuilderLayerTypes, Nothing, StreamGraphBuilder[OutputType, Chunk[DataRow]]] =
ZLayer {
for {
sss <- ZIO.service[VersionedDataGraphBuilderSettings]
dp <- ZIO.service[VersionedDataProvider[Long, VersionedBatch]]
ls <- ZIO.service[StreamLifetimeService]
} yield new VersionedDataGraphBuilder(dp, ls)
bp <- ZIO.service[BatchProcessor[DataBatch, Chunk[DataRow]]]
} yield new VersionedDataGraphBuilder(sss, dp, ls, bp)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.sneaksanddata.arcane.framework
package services.streaming.base

import zio.stream.ZPipeline

/**
* A trait that represents a batch processor.
* @tparam IncomingType The type of the incoming data.
*/
trait BatchProcessor[IncomingType, OutgoingType] {

/**
* Processes the incoming data.
*
* @return ZPipeline (stream source for the stream graph).
*/
def process: ZPipeline[Any, Throwable, IncomingType, OutgoingType]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.sneaksanddata.arcane.framework
package services.streaming.base

import models.DataRow
import models.settings.GroupingSettings
import services.mssql.MsSqlConnection.DataBatch

import zio.stream.ZPipeline
import zio.{Chunk, ZIO, ZLayer}

import scala.concurrent.duration.Duration
import scala.util.{Try, Using}

/**
* The batch processor implementation that converts a lazy DataBatch to a Chunk of DataRow.
* @param groupingSettings The grouping settings.
*/
class LazyListGroupingProcessor(groupingSettings: GroupingSettings) extends BatchProcessor[DataBatch, Chunk[DataRow]] {

/**
* Processes the incoming data.
*
* @return ZPipeline (stream source for the stream graph).
*/
def process: ZPipeline[Any, Throwable, DataBatch, Chunk[DataRow]] = ZPipeline
.map(this.readBatch)
.map(tryDataRow => tryDataRow.get)
.map(list => Chunk.fromIterable(list))
.flattenChunks
.groupedWithin(groupingSettings.rowsPerGroup, groupingSettings.groupingInterval)

private def readBatch(dataBatch: DataBatch): Try[List[DataRow]] = Using(dataBatch) { data => data.read.toList }
}

/**
* The companion object for the LazyOutputDataProcessor class.
*/
object LazyListGroupingProcessor:

/**
* The ZLayer that creates the LazyOutputDataProcessor.
*/
val layer: ZLayer[GroupingSettings, Nothing, LazyListGroupingProcessor] =
ZLayer {
for settings <- ZIO.service[GroupingSettings] yield LazyListGroupingProcessor(settings)
}

def apply(groupingSettings: GroupingSettings): LazyListGroupingProcessor =
require(groupingSettings.rowsPerGroup > 0, "Rows per group must be greater than 0")
require(!groupingSettings.groupingInterval.equals(Duration.Zero), "groupingInterval must be greater than 0")
new LazyListGroupingProcessor(groupingSettings)
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import zio.stream.ZStream
* A trait that represents a stream graph builder.
* @tparam IncomingType The type of the incoming data.
*/
trait StreamGraphBuilder[IncomingType] {
trait StreamGraphBuilder[IncomingType, OutgoingType] {

/**
* Creates a ZStream for the stream graph.
*
* @return ZStream (stream source for the stream graph).
*/
def create: ZStream[Any, Throwable, IncomingType]
def create: ZStream[Nothing, Throwable, OutgoingType]
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,23 @@ package com.sneaksanddata.arcane.framework
package services.streaming

import models.ArcaneType.{IntType, StringType}
import models.DataCell
import services.mssql.MsSqlConnection.VersionedBatch
import models.settings.{GroupingSettings, VersionedDataGraphBuilderSettings}
import models.{DataCell, DataRow}
import services.mssql.MsSqlConnection.{DataBatch, VersionedBatch}
import services.mssql.query.{LazyQueryResult, QueryRunner, ScalarQueryResult}
import services.mssql.{ConnectionOptions, MsSqlConnection, MsSqlDataProvider}
import services.streaming.base.{StreamLifetimeService, VersionedDataProvider}
import services.streaming.base.{BatchProcessor, LazyListGroupingProcessor, StreamLifetimeService, VersionedDataProvider}

import com.microsoft.sqlserver.jdbc.SQLServerDriver
import org.scalatest.*
import org.scalatest.matchers.must.Matchers
import org.scalatest.matchers.should.Matchers.*
import zio.{Runtime, ULayer, Unsafe, ZIO, ZLayer}
import org.scalatest.prop.TableDrivenPropertyChecks.forAll
import org.scalatest.prop.Tables.Table
import zio.{Chunk, FiberFailure, Runtime, ULayer, Unsafe, ZIO, ZLayer}

import java.sql.Connection
import java.time.Duration
import java.util.Properties
import scala.List
import scala.concurrent.Future
Expand All @@ -32,8 +36,8 @@ class StreamGraphBuilderTests extends flatspec.AsyncFlatSpec with Matchers:

"StreamGraph" should "not duplicate data on the first iteration" in withFreshTable("StreamGraphBuilderTests") { dbInfo =>
runStream(dbInfo, TestStreamLifetimeService(3)) flatMap { list =>
list should have size 3 // 3 batches of changes
list map (_.size) should contain theSameElementsAs List(10, 0, 0) // only first batch has data
list should have size 1 // 3 batches of changes
list map (_.size) should contain theSameElementsAs List(10) // only first batch has data
list.head.size should be (10) // 7 fields in the first batch
}
}
Expand Down Expand Up @@ -80,12 +84,29 @@ class StreamGraphBuilderTests extends flatspec.AsyncFlatSpec with Matchers:
}
}

private val failingTestCases = Table(
// First tuple defines column names
("duration", "groupingInterval", "expected"),

(Duration.ofSeconds(1), 0, "requirement failed: Rows per group must be greater than 0")
)

forAll (failingTestCases) { (duration: Duration, groupingInterval: Int, expectedMessage: String) =>
"StreamGraph" should "fail if change capture interval is empty" in withEmptyTable("StreamGraphBuilderTests") { dbInfo =>
val thrown = the [FiberFailure] thrownBy runStream(dbInfo, TestStreamLifetimeService(5), Some(new TestGroupingSettings(duration, groupingInterval)))
thrown.getCause().getMessage should be (expectedMessage)
}
}

/** Creates and runs a stream that reads changes from the database */
private def runStream(dbInfo: TestConnectionInfo, streamLifetimeService: StreamLifetimeService) =
private def runStream(dbInfo: TestConnectionInfo, streamLifetimeService: StreamLifetimeService, testGroupingSettings: Option[TestGroupingSettings] = None) =
val container = services.provide(
ZLayer.succeed(MsSqlConnection(dbInfo.connectionOptions)),
MsSqlDataProvider.layer,
ZLayer.succeed(streamLifetimeService)
ZLayer.succeed(streamLifetimeService),
ZLayer.succeed(testGroupingSettings.getOrElse(new TestGroupingSettings(Duration.ofSeconds(1), 10))),
ZLayer.succeed(new TestVersionedDataGraphBuilderSettings(Duration.ofSeconds(1))),
LazyListGroupingProcessor.layer,
)
Unsafe.unsafe { implicit unsafe =>
val stream = runtime.unsafe.run(container).getOrThrowFiberFailure()
Expand All @@ -96,9 +117,11 @@ class StreamGraphBuilderTests extends flatspec.AsyncFlatSpec with Matchers:
/** Service container builder */
private def services =
for {
sss <- ZIO.service[VersionedDataGraphBuilderSettings]
dp <- ZIO.service[VersionedDataProvider[Long, VersionedBatch]]
sls <- ZIO.service[StreamLifetimeService]
} yield new VersionedDataGraphBuilder(dp, sls)
bp <- ZIO.service[BatchProcessor[DataBatch, Chunk[DataRow]]]
} yield new VersionedDataGraphBuilder(sss, dp, sls, bp)

/// Helper methods

Expand Down Expand Up @@ -176,3 +199,8 @@ object TestStreamLifetimeService:
def apply(maxQueries: Int) = new TestStreamLifetimeService(maxQueries, _ => ())

def apply(maxQueries: Int, callback: Int => Any) = new TestStreamLifetimeService(maxQueries, callback)


class TestGroupingSettings(val groupingInterval: Duration, val rowsPerGroup: Int) extends GroupingSettings

class TestVersionedDataGraphBuilderSettings(override val lookBackInterval: Duration) extends VersionedDataGraphBuilderSettings

0 comments on commit 6fd9724

Please sign in to comment.