From 539641094d3379808deb566b8cdd57ca4947709a Mon Sep 17 00:00:00 2001 From: "Yang, Bo" Date: Sun, 27 Mar 2022 00:49:29 -0700 Subject: [PATCH 1/2] Add postScanLeft --- .../com/thoughtworks/binding/Observable.scala | 97 ++++++++++++++----- 1 file changed, 72 insertions(+), 25 deletions(-) diff --git a/Observable/shared/src/main/scala/com/thoughtworks/binding/Observable.scala b/Observable/shared/src/main/scala/com/thoughtworks/binding/Observable.scala index 4ce77b1d..f71a1e53 100644 --- a/Observable/shared/src/main/scala/com/thoughtworks/binding/Observable.scala +++ b/Observable/shared/src/main/scala/com/thoughtworks/binding/Observable.scala @@ -9,13 +9,57 @@ import scala.annotation.unchecked.uncheckedVariance import scala.collection.View import com.thoughtworks.binding.Observable.Operator import scala.util.control.NonFatal - +import scala.collection.immutable.ArraySeq sealed trait Observable[+A]: + private[binding] final def postScanLeft[B](z: B)( + op: (B, A) => B + )(using ExecutionContext): Observable.Operator[B] = + this match + case Observable.Empty => + Observable.Empty + case nonEmptyA: Observable.NonEmpty[A] => + locally[Observable.Operator.NonEmpty.Lazy[B]] { () => + nonEmptyA.next().map { case (head, tail) => + val builder = head.iterableFactory.newBuilder[B] + val headIterator = head.iterator + var acc = z + while headIterator.hasNext do + acc = op(acc, headIterator.next()) + builder += acc + (builder.result, tail.postScanLeft(acc)(op)) + } + } + end match + end postScanLeft + final def scanLeft[B](z: B)(op: (B, A) => B)(using + ExecutionContext + ): Observable.Operator[B] = + z +: postScanLeft(z)(op) + end scanLeft + final def +:[B >: A](a: B): Observable.Operator.NonEmpty.Eager[B] = + Observable.Operator.NonEmpty.Eager[B]( + Future.successful((View.Single(a), replay)) + ) + end +: + final def map[B](mapper: A => B)(using + ExecutionContext + ): Observable.Operator[B] = + this match + case Observable.Empty => + Observable.Empty + case nonEmptyA: Observable.NonEmpty[A] => + locally[Observable.Operator.NonEmpty.Lazy[B]] { () => + nonEmptyA.next().map { case (head, tail) => + (head.map(mapper), tail.map(mapper)) + } + } + end match + end map final def flatMapLatest[B]( mapper: A => Observable[B], default: Observable[B] = Observable.Operator.Empty )(using ExecutionContext): Observable.Operator[B] = - Observable.this match + this match case Observable.Empty => default.replay case nonEmptyA: Observable.NonEmpty[A] => @@ -29,39 +73,41 @@ sealed trait Observable[+A]: case Some(last) => mapper(last) (View.Empty, tail.flatMapLatest(mapper, newDefault)) + end handleA def handleB( head: Iterable[B], tail: Observable[B] ): (Iterable[B], Observable.Operator[B]) = (head, nonEmptyA.flatMapLatest[B](mapper, tail)) - + end handleB locally[Observable.Operator.NonEmpty.Lazy[B]] { () => default match case Observable.Empty => nonEmptyA.next().map(handleA) case nonEmptyB: Observable.NonEmpty[B] => - val handler = Future.firstCompletedOf( - Seq( - nonEmptyA - .next() - .map { case (head, tail) => - () => handleA(head, tail) - }(using ExecutionContext.parasitic), - nonEmptyB - .next() - .map { case (head, tail) => - () => handleB(head, tail) - }(using ExecutionContext.parasitic) - ) - )(using ExecutionContext.parasitic) - handler.map(_()) + Future + .firstCompletedOf( + Seq( + nonEmptyA + .next() + .map { case (head, tail) => + () => handleA(head, tail) + }(using ExecutionContext.parasitic), + nonEmptyB + .next() + .map { case (head, tail) => + () => handleB(head, tail) + }(using ExecutionContext.parasitic) + ) + )(using ExecutionContext.parasitic) + .map(_()) } - + end match end flatMapLatest final def replay: Observable.Operator[A] = this match - case asyncList: Observable.Operator[A] => - asyncList + case operator: Observable.Operator[A] => + operator case nonEmpty: Observable.NonEmpty[A] => locally[Observable.Operator.NonEmpty.Lazy[A]] { () => nonEmpty @@ -70,8 +116,9 @@ sealed trait Observable[+A]: (head, tail.replay) }(ExecutionContext.parasitic) } + end match end replay - +end Observable object Observable: sealed trait Operator[+A] extends Observable[A] object Operator: @@ -97,8 +144,8 @@ object Observable: } final def next() = nextLazyVal end Lazy - trait Eager[+A] extends NonEmpty[A]: - protected val nextVal: Future[(Iterable[A], Operator[A])] + final case class Eager[+A](nextVal: Future[(Iterable[A], Operator[A])]) + extends NonEmpty[A]: def next() = nextVal end Eager end NonEmpty @@ -199,7 +246,6 @@ object Observable: } end new end apply - given (using ExecutionContext): Monad[BehaviorSubject] with def point[A](a: => A): BehaviorSubject[A] = BehaviorSubject.pure(a) @@ -215,3 +261,4 @@ object Observable: end bind end given end BehaviorSubject +end Observable From e238e9fbd62d7227a5031626f1f72e8ac31f939c Mon Sep 17 00:00:00 2001 From: "Yang, Bo" Date: Sun, 27 Mar 2022 00:49:37 -0700 Subject: [PATCH 2/2] Add toFingerTree --- .../thoughtworks/binding/ObservableSeq.scala | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/ObservableSeq/shared/src/main/scala/com/thoughtworks/binding/ObservableSeq.scala b/ObservableSeq/shared/src/main/scala/com/thoughtworks/binding/ObservableSeq.scala index 194b00e1..0c14553f 100644 --- a/ObservableSeq/shared/src/main/scala/com/thoughtworks/binding/ObservableSeq.scala +++ b/ObservableSeq/shared/src/main/scala/com/thoughtworks/binding/ObservableSeq.scala @@ -126,6 +126,37 @@ object ObservableSeq: end Splice end Patch extension [A](observableSeq: ObservableSeq[A]) + def toFingerTree[M <: Comparable[Int] | Int](using + Reducer[A, M] + )(using ExecutionContext): Observable.Operator[FingerTree[M, A]] = + def greaterThan(left: Comparable[Int] | Int, right: Int) = + left match + case int: Int => + int > right + case comparable: Comparable[Int] => + comparable.compareTo(right) > 0 + end greaterThan + observableSeq + .postScanLeft(FingerTree.empty) { (snapshot, patch) => + patch match + case Patch.Splice( + index, + deleteCount, + newItems + ) => + val (left, notLeft) = snapshot.split(greaterThan(_, index)) + val (deleted, right) = notLeft.split(greaterThan(_, deleteCount)) + newItems.foldLeft(left)(_ :+ _) <++> right + case Patch.ReplaceChildren(newItems) => + newItems.foldLeft(FingerTree.empty)(_ :+ _) + } + end toFingerTree + def measure[M <: Comparable[Int]](using Reducer[A, M])(using + Monoid[M], + ExecutionContext + ): Observable.Operator[M] = + observableSeq.toFingerTree[M].map(_.measureMonoid) + end measure def flatMap[B](f: A => ObservableSeq[B])(using ExecutionContext ): ObservableSeq[B] =