Skip to content

Commit

Permalink
Add some missing docs
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Jul 26, 2019
1 parent 44874f1 commit b2ffc00
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 0 deletions.
29 changes: 29 additions & 0 deletions src/main/kotlin/hu/akarnokd/kotlin/flow/BehaviorSubject.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,35 @@ class BehaviorSubject<T> : AbstractFlow<T>, SubjectAPI<T> {

private var error: Throwable? = null

/**
* Constructs an empty BehaviorSubject.
*/
@Suppress("UNCHECKED_CAST")
constructor() {
current = Node(NONE as T)
}

/**
* Constructs a BehaviorSubject with an initial value to emit
* to collectors.
*/
constructor(initialValue: T) {
current = Node(initialValue)
}

/**
* Returns true if this subject has collectors waiting for data.
*/
override fun hasCollectors(): Boolean = collectors.get().isNotEmpty()

/**
* Returns the number of collectors waiting for data.
*/
override fun collectorCount(): Int = collectors.get().size

/**
* Emit a value to all current collectors when they are ready.
*/
override suspend fun emit(value: T) {
if (current != DONE) {
val next = Node<T>(value)
Expand All @@ -53,6 +69,10 @@ class BehaviorSubject<T> : AbstractFlow<T>, SubjectAPI<T> {
}
}

/**
* Signal an exception to all current and future collectors when
* they are ready.
*/
@Suppress("UNCHECKED_CAST")
override suspend fun emitError(ex: Throwable) {
if (current != DONE) {
Expand All @@ -68,6 +88,10 @@ class BehaviorSubject<T> : AbstractFlow<T>, SubjectAPI<T> {
}
}

/**
* Signal current and future collectors that no further
* values will be coming.
*/
@Suppress("UNCHECKED_CAST")
override suspend fun complete() {
if (current != DONE) {
Expand All @@ -82,6 +106,11 @@ class BehaviorSubject<T> : AbstractFlow<T>, SubjectAPI<T> {
}
}

/**
* Accepts a [collector] and emits the latest (if available) value
* and any subsequent value received by this BehaviorSubject until
* the BehaviorSubject gets terminated.
*/
override suspend fun collectSafely(collector: FlowCollector<T>) {
val inner = InnerCollector<T>()
if (add(inner)) {
Expand Down
31 changes: 31 additions & 0 deletions src/main/kotlin/hu/akarnokd/kotlin/flow/ReplaySubject.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,62 @@ class ReplaySubject<T> : AbstractFlow<T>, SubjectAPI<T> {

private var done: Boolean = false

/**
* Creates a ReplaySubject with an unbounded internal buffer
* caching all values received via [emit].
*/
constructor() {
buffer = UnboundedReplayBuffer()
}

/**
* Creates a ReplaySubject that caches at most [maxSize]
* values to be replayed to late collectors.
*/
constructor(maxSize: Int) {
buffer = SizeBoundReplayBuffer(maxSize)
}

/**
* Creates a ReplaySubject that caches values at most for
* the given [maxTime] real-time duration and replays
* those upfront to late collectors.
*/
constructor(maxTime: Long, unit: TimeUnit) : this(Int.MAX_VALUE, maxTime, unit)

/**
* Creates a ReplaySubject that caches at most [maxSize] values
* at most for the given [maxTime] real-time duration and
* replays those upfront to late collectors.
*/
constructor(maxSize: Int, maxTime: Long, unit: TimeUnit) : this(maxSize, maxTime, unit, {
t -> t.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
})

/**
* Creates a ReplaySubject that caches at most [maxSize] values
* at most for the given [maxTime] duration (measured via the custom
* [timeSource]) and replays those upfront to late collectors.
*/
constructor(maxSize: Int, maxTime: Long, unit: TimeUnit, timeSource: (TimeUnit) -> Long) {
buffer = TimeAndSizeBoundReplayBuffer(maxSize, maxTime, unit, timeSource)
}

/**
* Accepts a [collector] and emits the cached values upfront
* and any subsequent value received by this ReplaySubject until
* the ReplaySubject gets terminated.
*/
@FlowPreview
override suspend fun collectSafely(collector: FlowCollector<T>) {
val inner = InnerCollector<T>(collector, this)
add(inner)
buffer.replay(inner)
}

/**
* Emit a value to all current collectors when they are ready.
*/
override suspend fun emit(value: T) {
if (!done) {
buffer.emit(value)
Expand Down

0 comments on commit b2ffc00

Please sign in to comment.