Skip to content

Commit

Permalink
Implement DI and basic interfaces
Browse files Browse the repository at this point in the history
Part of #61.

Implement the DI using ZIO and add the interfaces for the basic services
  • Loading branch information
s-vitaliy committed Nov 11, 2024
1 parent b66950e commit c0d5616
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 106 deletions.
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
package com.sneaksanddata.arcane.framework
package services.streaming
package services.mssql

import models.DataCell
import services.mssql.MsSqlConnection
import services.mssql.MsSqlConnection.VersionedBatch
import services.mssql.query.LazyQueryResult.OutputType
import services.mssql.query.{LazyQueryResult, QueryRunner, ScalarQueryResult}
import services.streaming.base.{HasVersion, StreamLifetimeService}
import services.streaming.base.{HasVersion, VersionedDataProvider}

import zio.stream.ZStream
import zio.{Task, ZIO}
import zio.{Task, ZIO, ZLayer}

import java.time.Duration
import scala.util.{Failure, Try}


given HasVersion[VersionedBatch] with
type VersionType = Option[Long]

private val partial: PartialFunction[VersionedBatch, Option[Long]] =
case (queryResult, version: Long) =>
// If the database response is empty, we can't extract the version from it and return the old version.
Expand All @@ -31,34 +28,31 @@ given HasVersion[VersionedBatch] with
dataVersion.toOption

extension (result: VersionedBatch)
def getLatestVersion: Option[Long] = partial.applyOrElse(result, (_: VersionedBatch) => None)
def getLatestVersion: this.VersionType = partial.applyOrElse(result, (_: VersionedBatch) => None)



class StreamGraphBuilder(msSqlConnection: MsSqlConnection) {
/**
* A data provider that reads the changes from the Microsoft SQL Server.
* @param msSqlConnection The connection to the Microsoft SQL Server.
*/
class MsSqlDataProvider(msSqlConnection: MsSqlConnection) extends VersionedDataProvider[Long, VersionedBatch] {
implicit val dataQueryRunner: QueryRunner[LazyQueryResult.OutputType, LazyQueryResult] = QueryRunner()
implicit val versionQueryRunner: QueryRunner[Option[Long], ScalarQueryResult[Long]] = QueryRunner()

override def requestChanges(previousVersion: Option[Long], lookBackInterval: Duration): Task[VersionedBatch] =
ZIO.fromFuture(_ => msSqlConnection.getChanges(previousVersion, lookBackInterval))
}

/**
* The companion object for the MsSqlDataProvider class.
*/
object MsSqlDataProvider:

/**
* Builds a stream that reads the changes from the database.
*
* @param isCancelled The service that determines if the stream should be cancelled.
* @return The stream that reads the changes from the database.
* The ZLayer that creates the MsSqlDataProvider.
*/
def build(isCancelled: StreamLifetimeService): ZStream[Any, Throwable, OutputType] = ZStream.unfoldZIO(None) { previousVersion =>
isCancelled.cancelled match
case true => ZIO.succeed(None)
case false => continueStream(previousVersion)
}

private def continueStream(previousVersion: Option[Long]): ZIO[Any, Throwable, Some[(OutputType, Option[Long])]] =
requestChanges(previousVersion, Duration.ofDays(1)) map { versionedBatch =>
val latestVersion = versionedBatch.getLatestVersion
val (queryResult, _) = versionedBatch
Some(queryResult.read, latestVersion)
val layer: ZLayer[MsSqlConnection, Nothing, MsSqlDataProvider] =
ZLayer {
for connection <- ZIO.service[MsSqlConnection] yield new MsSqlDataProvider(connection)
}

private def requestChanges(previousVersion: Option[Long], lookBackInterval: Duration): Task[VersionedBatch] =
ZIO.fromFuture(_ => msSqlConnection.getChanges(previousVersion, lookBackInterval))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.sneaksanddata.arcane.framework
package services.streaming

import services.mssql.MsSqlConnection.VersionedBatch
import services.mssql.given_HasVersion_VersionedBatch
import services.mssql.query.LazyQueryResult.OutputType
import services.streaming.base.{StreamGraphBuilder, StreamLifetimeService, VersionedDataProvider}

import zio.stream.ZStream
import zio.{ZIO, ZLayer}

import java.time.Duration


class VersionedDataGraphBuilder(versionedDataProvider: VersionedDataProvider[Long, VersionedBatch],
streamLifetimeService: StreamLifetimeService) extends StreamGraphBuilder[OutputType] {
/**
* Builds a stream that reads the changes from the database.
*
* @return The stream that reads the changes from the database.
*/
def create: ZStream[Any, Throwable, OutputType] =
ZStream.unfoldZIO(versionedDataProvider.firstVersion) { previousVersion =>
if streamLifetimeService.cancelled then ZIO.succeed(None) else continueStream(previousVersion)
}

private def continueStream(previousVersion: Option[Long]): ZIO[Any, Throwable, Some[(OutputType, Option[Long])]] =
versionedDataProvider.requestChanges(previousVersion, Duration.ofDays(1)) map { versionedBatch =>
val latestVersion = versionedBatch.getLatestVersion
val (queryResult, _) = versionedBatch
Some(queryResult.read, latestVersion)
}
}

object VersionedDataGraphBuilder:
val layer: ZLayer[VersionedDataProvider[Long, VersionedBatch] & StreamLifetimeService, Nothing, StreamGraphBuilder[OutputType]] =
ZLayer {
for {
dp <- ZIO.service[VersionedDataProvider[Long, VersionedBatch]]
ls <- ZIO.service[StreamLifetimeService]
} yield new VersionedDataGraphBuilder(dp, ls)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ package services.streaming.base
*/
trait HasVersion[Data] {

type VersionType

/**
* Gets the latest version of the data.
*
* @return The latest version of the data.
*/
extension (a: Data) def getLatestVersion: Option[Long]
extension (a: Data) def getLatestVersion: VersionType
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.sneaksanddata.arcane.framework
package services.streaming.base

import zio.stream.ZStream

/**
* A trait that represents a stream graph builder.
* @tparam IncomingType The type of the incoming data.
*/
trait StreamGraphBuilder[IncomingType] {

/**
* Creates a ZStream for the stream graph.
*
* @return ZStream (stream source for the stream graph).
*/
def create: ZStream[Any, Throwable, IncomingType]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.sneaksanddata.arcane.framework
package services.streaming.base

import zio.Task

import java.time.Duration

/**
* Provides a way to get the changes marked with version from a data source.
* @tparam DataVersionType The type of the data version.
* @tparam DataBatchType The type of the data batch.
*/
trait VersionedDataProvider[DataVersionType, DataBatchType: HasVersion] {

/**
* Requests the changes from the data source.
*
* @param previousVersion The previous version of the data.
* @param lookBackInterval The interval to look back for changes if the version is empty.
* @return The changes from the data source.
*/
def requestChanges(previousVersion: Option[DataVersionType], lookBackInterval: Duration): Task[DataBatchType]

/**
* The first version of the data.
*/
val firstVersion: Option[DataVersionType] = None

}
Loading

0 comments on commit c0d5616

Please sign in to comment.