From af2b9d6fb0c26b58f0c67ddda1b5333b0ec96ac9 Mon Sep 17 00:00:00 2001 From: Kartal Kaan Bozdogan Date: Sun, 24 Dec 2023 10:28:04 +0100 Subject: [PATCH] Added stream utils and tests --- lib/utils/streams.dart | 113 +++++++++++++ test/utils/streams_test.dart | 297 +++++++++++++++++++++++++++++++++++ 2 files changed, 410 insertions(+) create mode 100644 lib/utils/streams.dart create mode 100644 test/utils/streams_test.dart diff --git a/lib/utils/streams.dart b/lib/utils/streams.dart new file mode 100644 index 0000000..4bccacd --- /dev/null +++ b/lib/utils/streams.dart @@ -0,0 +1,113 @@ +import 'dart:async'; + +class ValueStream extends Stream { + late StreamController _controller; + T? _lastValue; + Object? _lastError; + bool? _lastWasError; + final bool _sync; + final void Function()? _userOnListen; + final void Function()? _userOnCancel; + + ValueStream( + {void Function()? onListen, + FutureOr Function()? onCancel, + bool sync = false}) + : _userOnListen = onListen, + _userOnCancel = onCancel, + _sync = sync { + _setController(); + } + + void add(T t) { + _lastWasError = false; + _lastValue = t; + if (_controller.hasListener) { + // Otherwise the controller will buffer + _controller.add(t); + } + } + + void addError(Object o) { + _lastWasError = true; + _lastError = o; + if (_controller.hasListener) { + // Otherwise the controller will buffer + _controller.addError(o); + } + } + + @override + StreamSubscription listen(void Function(T event)? onData, + {Function? onError, void Function()? onDone, bool? cancelOnError}) { + return _controller.stream.listen(onData, + onError: onError, onDone: onDone, cancelOnError: cancelOnError); + } + + void _setController() { + _controller = + StreamController(sync: _sync, onListen: _onListen, onCancel: _onCancel); + } + + void _onListen() { + if (_lastWasError == false) { + _controller.add(_lastValue as T); + } else if (_lastWasError == true) { + _controller.addError(_lastError!); + } + if (_userOnListen != null) _userOnListen!(); + } + + void _onCancel() { + _setController(); // The old one is no good anymore + if (_userOnCancel != null) _userOnCancel!(); + } +} + +class ResourceStream extends ValueStream { + ResourceStream(this._create, this._dispose, + {void Function()? onListen, + FutureOr Function()? onCancel, + bool sync = false}) + : super(onListen: onListen, onCancel: onCancel, sync: sync); + + final T Function() _create; + final void Function(T) _dispose; + + @override + void _onListen() { + if (_lastWasError != false) { + try { + _lastValue = _create(); + _lastWasError = false; + } catch (e) { + _lastWasError = true; + _lastError = e; + } + } + super._onListen(); + } + + @override + void _onCancel() { + _maybeDispose(); + super._onCancel(); + } + + @override + void add(T t) { + _maybeDispose(); + super.add(t); + } + + @override + void addError(Object o) { + _maybeDispose(); + super.addError(o); + } + + void _maybeDispose() { + if (_lastWasError == false) _dispose(_lastValue as T); + _lastWasError = null; + } +} diff --git a/test/utils/streams_test.dart b/test/utils/streams_test.dart new file mode 100644 index 0000000..05de31a --- /dev/null +++ b/test/utils/streams_test.dart @@ -0,0 +1,297 @@ +import 'package:computed/utils/streams.dart'; +import 'package:test/test.dart'; + +void main() { + group('ValueStream', () { + test('values are not buffered', () async { + final s = ValueStream(sync: true); + s.add(0); + s.add(1); + + var lCnt = 0; + int? lastEvent; + + s.listen((event) { + lCnt++; + lastEvent = event; + }, onError: (e) => fail(e.toString())); + + for (var i = 0; i < 2; i++) { + await Future.value(); // Just in case + } + + expect(lCnt, 1); + expect(lastEvent, 1); + }); + + test('can add values after attaching listeners', () async { + final s = ValueStream(sync: true); + + var lCnt = 0; + int? lastEvent; + + s.listen((event) { + lCnt++; + lastEvent = event; + }, onError: (e) => fail(e.toString())); + + s.add(0); + + expect(lCnt, 1); + expect(lastEvent, 0); + }); + + test('can add errors', () async { + final s = ValueStream(sync: true); + + var lCnt = 0; + int? lastEvent; + + s.listen((e) => fail('Must not produce value'), onError: (event) { + lCnt++; + lastEvent = event; + }); + + s.addError(0); + + expect(lCnt, 1); + expect(lastEvent, 0); + }); + + test('can re-listen after cancel', () async { + var lCnt = 0; + int? lastEvent; + + void listener(event) { + lCnt++; + lastEvent = event; + } + + var errCnt = 0; + int? lastError; + + void errorListener(error) { + errCnt++; + lastError = error; + } + + final s = ValueStream(sync: true); + + var sub = s.listen(listener, onError: (e) => fail(e.toString())); + + s.add(0); + + expect(lCnt, 1); + expect(lastEvent, 0); + + // Also test the case where the last added was an error + + sub.cancel(); + + sub = s.listen(listener, onError: errorListener); + await Future.value(); + expect(lCnt, 2); + expect(lastEvent, 0); + + expect(errCnt, 0); + + s.addError(1); + expect(lCnt, 2); + expect(errCnt, 1); + expect(lastError, 1); + + sub.cancel(); + + s.listen(listener, onError: errorListener); + await Future.value(); + + expect(lCnt, 2); + expect(errCnt, 2); + expect(lastError, 1); + }); + + test('onListen and onCancel works', () { + var olCnt = 0; + var ocCnt = 0; + final s = ValueStream( + onListen: () => olCnt++, onCancel: () => ocCnt++, sync: true); + + expect(olCnt, 0); + expect(ocCnt, 0); + + final sub = s.listen((event) => fail('Must not produce any value'), + onError: (e) => fail(e.toString())); + + expect(olCnt, 1); + expect(ocCnt, 0); + + sub.cancel(); + + expect(olCnt, 1); + expect(ocCnt, 1); + }); + + test('is not broadcast', () { + final s = ValueStream(); + s.listen((event) {}); + try { + s.listen((event) {}); + fail("Must have thrown"); + } catch (e) { + expect(e, isA()); + expect( + (e as StateError).message, 'Stream has already been listened to.'); + } + }); + }); + + group('ResourceStream', () { + test('respects create/dispose', () async { + var cCnt = 0; + int create() { + return cCnt++; + } + + var dCnt = 0; + int? lastDispose; + void dispose(int i) { + dCnt++; + lastDispose = i; + } + + var lCnt = 0; + int? lastEvent; + void listener(e) { + lCnt++; + lastEvent = e; + } + + final s = ResourceStream(create, dispose, sync: true); + expect(cCnt, 0); + var sub = s.listen(listener, onError: (e) => fail(e.toString())); + await Future.value(); + expect(cCnt, 1); + expect(dCnt, 0); + expect(lCnt, 1); + expect(lastEvent, 0); + sub.cancel(); + expect(cCnt, 1); + expect(dCnt, 1); + expect(lastDispose, 0); + expect(lCnt, 1); + sub = s.listen(listener, onError: (e) => fail(e.toString())); + await Future.value(); + expect(cCnt, 2); + expect(dCnt, 1); + expect(lCnt, 2); + expect(lastEvent, 1); + s.add(42); + expect(cCnt, 2); + expect(dCnt, 2); + expect(lCnt, 3); + expect(lastDispose, 1); + sub.cancel(); + expect(cCnt, 2); + expect(dCnt, 3); + expect(lCnt, 3); + expect(lastDispose, 42); + }); + test('propagates exceptions thrown by create', () async { + var cCnt = 0; + int? createReturn; + int createThrow = 42; + int create() { + cCnt++; + if (createReturn != null) { + return createReturn; + } else { + throw createThrow; + } + } + + var dCnt = 0; + int? lastDispose; + void dispose(int i) { + dCnt++; + lastDispose = i; + } + + var lCnt = 0; + int? lastEvent; + void listener(e) { + lCnt++; + lastEvent = e; + } + + var eCnt = 0; + int? lastErr; + void errorListener(e) { + eCnt++; + lastErr = e; + } + + final s = ResourceStream(create, dispose, sync: true); + expect(cCnt, 0); + var sub = s.listen(listener, onError: errorListener); + await Future.value(); + expect(cCnt, 1); + expect(dCnt, 0); + expect(lCnt, 0); + expect(eCnt, 1); + expect(lastErr, 42); + s.add(0); + expect(cCnt, 1); + expect(dCnt, 0); + expect(lCnt, 1); + expect(eCnt, 1); + expect(lastEvent, 0); + sub.cancel(); + await Future.value(); + expect(cCnt, 1); + expect(dCnt, 1); + expect(lCnt, 1); + expect(eCnt, 1); + expect(lastDispose, 0); + + s.addError(1); + createReturn = 2; + sub = s.listen(listener, onError: errorListener); + await Future.value(); + expect(cCnt, 2); + expect(dCnt, 1); + expect(lCnt, 2); + expect(eCnt, 1); + expect(lastEvent, 2); + + s.addError(3); + expect(cCnt, 2); + expect(dCnt, 2); + expect(lCnt, 2); + expect(eCnt, 2); + expect(lastDispose, 2); + expect(lastErr, 3); + + sub.cancel(); + expect(cCnt, 2); + expect(dCnt, 2); + expect(lCnt, 2); + expect(eCnt, 2); + + s.add(3); + sub = s.listen(listener, onError: errorListener); + await Future.value(); + expect(cCnt, 2); + expect(dCnt, 2); + expect(lCnt, 3); + expect(eCnt, 2); + expect(lastEvent, 3); + + sub.cancel(); + expect(cCnt, 2); + expect(dCnt, 3); + expect(lCnt, 3); + expect(eCnt, 2); + expect(lastDispose, 3); + }); + }); +}