Skip to content

Commit

Permalink
Allow computations using other computations via .asStream
Browse files Browse the repository at this point in the history
  Useful for the "computed query" pattern
  • Loading branch information
mstniy committed Dec 20, 2023
1 parent de0e1eb commit 58250dc
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 5 deletions.
3 changes: 0 additions & 3 deletions lib/src/computed.dart
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,6 @@ class ComputedImpl<T> {

ComputedSubscription<T> listen(
void Function(T event)? onData, Function? onError) {
if (GlobalCtx._currentComputation != null) {
throw StateError('`listen` is not allowed inside computations.');
}
final sub = _ComputedSubscriptionImpl<T>(this, onData, onError);
if (_novalue) {
try {
Expand Down
69 changes: 67 additions & 2 deletions test/computed_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,72 @@ void main() {
sub.cancel();
});

test('computed query pattern works', () async {
final controller = StreamController<int>.broadcast(
sync: true); // Use a broadcast stream to make debugging easier
final source = controller.stream;

final queryStream = Computed(() => source.use)
.asStream
.map((key) => Future.microtask(() => key));

var cCnt = 0;

final result = $(() {
cCnt++;
return queryStream.use.use;
});

var expectation = 0;
var callCnt = 0;

final sub = result.listen((event) {
callCnt++;
expect(event, expectation);
}, (e) => fail(e.toString()));

await Future.value();
expect(cCnt, 2);
expect(callCnt, 0);

controller.add(0);

expect(cCnt, 2);

await Future.value(); // For the asStream to propagate the result

expect(cCnt, 4);
expect(callCnt, 0);

await Future.value(); // For the "query" to complete

expect(cCnt, 6);
expect(callCnt, 1);

controller.add(0);

await Future.value();
await Future.value();

expect(cCnt, 6); // First computation terminates propagation
expect(callCnt, 1);

expectation = 1;
controller.add(1);

await Future.value(); // For the asStream to propagate the result

expect(cCnt, 8);
expect(callCnt, 1);

await Future.value(); // For the "query" to complete

expect(cCnt, 10);
expect(callCnt, 2);

sub.cancel();
});

group('respects topological order', () {
test('on upstream updates', () {
for (var streamFirst in [false, true]) {
Expand Down Expand Up @@ -894,8 +960,7 @@ void main() {
}, (e) {
expect(flag, false);
flag = true;
expect(e, isA<StateError>());
expect(e.message, '`listen` is not allowed inside computations.');
expect(e, isA<ComputedAsyncError>());
});

await Future.value();
Expand Down

0 comments on commit 58250dc

Please sign in to comment.