Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add postScanLeft #475

Open
wants to merge 2 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,57 @@ import scala.annotation.unchecked.uncheckedVariance
import scala.collection.View
import com.thoughtworks.binding.Observable.Operator
import scala.util.control.NonFatal

import scala.collection.immutable.ArraySeq
sealed trait Observable[+A]:
private[binding] final def postScanLeft[B](z: B)(
op: (B, A) => B
)(using ExecutionContext): Observable.Operator[B] =
this match
case Observable.Empty =>
Observable.Empty
case nonEmptyA: Observable.NonEmpty[A] =>
locally[Observable.Operator.NonEmpty.Lazy[B]] { () =>
nonEmptyA.next().map { case (head, tail) =>
val builder = head.iterableFactory.newBuilder[B]
val headIterator = head.iterator
var acc = z
while headIterator.hasNext do
acc = op(acc, headIterator.next())
builder += acc
(builder.result, tail.postScanLeft(acc)(op))
}
}
end match
end postScanLeft
final def scanLeft[B](z: B)(op: (B, A) => B)(using
ExecutionContext
): Observable.Operator[B] =
z +: postScanLeft(z)(op)
end scanLeft
final def +:[B >: A](a: B): Observable.Operator.NonEmpty.Eager[B] =
Observable.Operator.NonEmpty.Eager[B](
Future.successful((View.Single(a), replay))
)
end +:
final def map[B](mapper: A => B)(using
ExecutionContext
): Observable.Operator[B] =
this match
case Observable.Empty =>
Observable.Empty
case nonEmptyA: Observable.NonEmpty[A] =>
locally[Observable.Operator.NonEmpty.Lazy[B]] { () =>
nonEmptyA.next().map { case (head, tail) =>
(head.map(mapper), tail.map(mapper))
}
}
end match
end map
final def flatMapLatest[B](
mapper: A => Observable[B],
default: Observable[B] = Observable.Operator.Empty
)(using ExecutionContext): Observable.Operator[B] =
Observable.this match
this match
case Observable.Empty =>
default.replay
case nonEmptyA: Observable.NonEmpty[A] =>
Expand All @@ -29,39 +73,41 @@ sealed trait Observable[+A]:
case Some(last) =>
mapper(last)
(View.Empty, tail.flatMapLatest(mapper, newDefault))
end handleA
def handleB(
head: Iterable[B],
tail: Observable[B]
): (Iterable[B], Observable.Operator[B]) =
(head, nonEmptyA.flatMapLatest[B](mapper, tail))

end handleB
locally[Observable.Operator.NonEmpty.Lazy[B]] { () =>
default match
case Observable.Empty =>
nonEmptyA.next().map(handleA)
case nonEmptyB: Observable.NonEmpty[B] =>
val handler = Future.firstCompletedOf(
Seq(
nonEmptyA
.next()
.map { case (head, tail) =>
() => handleA(head, tail)
}(using ExecutionContext.parasitic),
nonEmptyB
.next()
.map { case (head, tail) =>
() => handleB(head, tail)
}(using ExecutionContext.parasitic)
)
)(using ExecutionContext.parasitic)
handler.map(_())
Future
.firstCompletedOf(
Seq(
nonEmptyA
.next()
.map { case (head, tail) =>
() => handleA(head, tail)
}(using ExecutionContext.parasitic),
nonEmptyB
.next()
.map { case (head, tail) =>
() => handleB(head, tail)
}(using ExecutionContext.parasitic)
)
)(using ExecutionContext.parasitic)
.map(_())
}

end match
end flatMapLatest
final def replay: Observable.Operator[A] =
this match
case asyncList: Observable.Operator[A] =>
asyncList
case operator: Observable.Operator[A] =>
operator
case nonEmpty: Observable.NonEmpty[A] =>
locally[Observable.Operator.NonEmpty.Lazy[A]] { () =>
nonEmpty
Expand All @@ -70,8 +116,9 @@ sealed trait Observable[+A]:
(head, tail.replay)
}(ExecutionContext.parasitic)
}
end match
end replay

end Observable
object Observable:
sealed trait Operator[+A] extends Observable[A]
object Operator:
Expand All @@ -97,8 +144,8 @@ object Observable:
}
final def next() = nextLazyVal
end Lazy
trait Eager[+A] extends NonEmpty[A]:
protected val nextVal: Future[(Iterable[A], Operator[A])]
final case class Eager[+A](nextVal: Future[(Iterable[A], Operator[A])])
extends NonEmpty[A]:
def next() = nextVal
end Eager
end NonEmpty
Expand Down Expand Up @@ -199,7 +246,6 @@ object Observable:
}
end new
end apply

given (using ExecutionContext): Monad[BehaviorSubject] with
def point[A](a: => A): BehaviorSubject[A] = BehaviorSubject.pure(a)

Expand All @@ -215,3 +261,4 @@ object Observable:
end bind
end given
end BehaviorSubject
end Observable
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,37 @@ object ObservableSeq:
end Splice
end Patch
extension [A](observableSeq: ObservableSeq[A])
def toFingerTree[M <: Comparable[Int] | Int](using
Reducer[A, M]
)(using ExecutionContext): Observable.Operator[FingerTree[M, A]] =
def greaterThan(left: Comparable[Int] | Int, right: Int) =
left match
case int: Int =>
int > right
case comparable: Comparable[Int] =>
comparable.compareTo(right) > 0
end greaterThan
observableSeq
.postScanLeft(FingerTree.empty) { (snapshot, patch) =>
patch match
case Patch.Splice(
index,
deleteCount,
newItems
) =>
val (left, notLeft) = snapshot.split(greaterThan(_, index))
val (deleted, right) = notLeft.split(greaterThan(_, deleteCount))
newItems.foldLeft(left)(_ :+ _) <++> right
case Patch.ReplaceChildren(newItems) =>
newItems.foldLeft(FingerTree.empty)(_ :+ _)
}
end toFingerTree
def measure[M <: Comparable[Int]](using Reducer[A, M])(using
Monoid[M],
ExecutionContext
): Observable.Operator[M] =
observableSeq.toFingerTree[M].map(_.measureMonoid)
end measure
def flatMap[B](f: A => ObservableSeq[B])(using
ExecutionContext
): ObservableSeq[B] =
Expand Down