From 4af6a2e44b0276692017c3bf04e905a3d68fee11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=A8=E9=A3=8E?= Date: Tue, 6 Feb 2024 21:23:36 +0800 Subject: [PATCH] Implement reusable isolate --- example/lib/storage/custom_storage_io.dart | 7 +- .../isolate_operation_context_impl.dart | 117 +++----- lib/src/reusable_isolate.dart | 278 ++++++++++++++++++ lib/task_manager.dart | 4 +- test/task_manager_test.dart | 34 +++ 5 files changed, 361 insertions(+), 79 deletions(-) create mode 100644 lib/src/reusable_isolate.dart diff --git a/example/lib/storage/custom_storage_io.dart b/example/lib/storage/custom_storage_io.dart index 3aa02eb..f1e5507 100644 --- a/example/lib/storage/custom_storage_io.dart +++ b/example/lib/storage/custom_storage_io.dart @@ -3,6 +3,7 @@ import 'dart:convert'; import 'dart:io'; import 'package:example/storage/custom_storage.dart'; +import 'package:flutter/material.dart'; import 'package:path/path.dart'; import 'package:task_manager/task_manager.dart'; @@ -18,7 +19,11 @@ class CustomStorageIOImpl extends CustomStorage { final file = File(element.path); final contents = await file.readAsString(); final json = jsonDecode(contents); - yield TaskEntity.fromJson(json); + try { + yield TaskEntity.fromJson(json); + } catch (e) { + debugPrint('Error parsing task: $e'); + } } } diff --git a/lib/src/operation/isolate_operation_context_impl.dart b/lib/src/operation/isolate_operation_context_impl.dart index a782777..f9d2f30 100644 --- a/lib/src/operation/isolate_operation_context_impl.dart +++ b/lib/src/operation/isolate_operation_context_impl.dart @@ -5,32 +5,18 @@ class IsolateOperationContextImpl extends OperationContextImpl { required super.initialData, super.priority, super.status, - }) { - _receivePort.listen((message) { - switch (message.runtimeType) { - case SendPort: - _sendPort.complete(message); - break; - case _EmitTaskAction: - emit((message as _EmitTaskAction).data); - case Result: - _handlerResult(message); - break; - default: - } - }); - } + }); @override void pause() { super.pause(); - _sendPort.future.then((value) => value.send(const _PauseTaskAction())); + _isolateCallback?.emit(const _PauseTaskAction()); } @override void cancel() { super.cancel(); - _sendPort.future.then((value) => value.send(const _CancelTaskAction())); + _isolateCallback?.emit(const _CancelTaskAction()); } @override @@ -39,60 +25,55 @@ class IsolateOperationContextImpl extends OperationContextImpl { _controller.add(this); try { - final context = wrapper(); - final result = await compute( - (message) async { + final callback = ReusableIsolate.run( + (message, receive, emit) { final operation = message[0] as Operation; final context = message[1] as IsolateOperationContextImplWrapper; - context.ensureInitialized(); - final result = await operation.run(context); - return result.tryToTransferableTypedData(); + context.ensureInitialized(receive, emit); + return operation.run(context); }, - [ - operation, - context, - ], + [operation, wrapper()], ); + callback.receive.listen((event) { + emit(_tryFromTransferableTypedData(event)); + }); + + _isolateCallback = callback; + final result = await callback.wait(); + _isolateCallback = null; _handlerResult(result.tryFromTransferableTypedData()); return result.type; } catch (e) { + _isolateCallback = null; _handlerResult(Result.error(e)); return ResultType.error; } } - @override - void _handlerResult(Result result) { - super._handlerResult(result); - switch (result.type) { - case ResultType.canceled: - case ResultType.completed: - case ResultType.error: - _receivePort.close(); - break; - default: - } - } - - final ReceivePort _receivePort = ReceivePort(); - late Completer _sendPort; + IsolateCallback? _isolateCallback; IsolateOperationContextImplWrapper wrapper() { - _sendPort = Completer(); - return IsolateOperationContextImplWrapper( - sendPort: _receivePort.sendPort, - initialData: data, - ); + return IsolateOperationContextImplWrapper(initialData: data); } } class IsolateOperationContextImplWrapper extends OperationContext { IsolateOperationContextImplWrapper({ - required this.sendPort, required D initialData, }) : data = initialData; - final SendPort sendPort; + void Function(dynamic)? _emitter; + + void ensureInitialized(Stream receive, void Function(dynamic) emit) { + _emitter = emit; + receive.listen((message) { + if (message is _CancelTaskAction) { + _flag = TaskFlag.cancel; + } else if (message is _PauseTaskAction) { + _flag = TaskFlag.pause; + } + }); + } @override D data; @@ -108,20 +89,7 @@ class IsolateOperationContextImplWrapper extends OperationContext { @override void emit(D data) { this.data = data; - sendPort.send(_EmitTaskAction(data)); - } - - void ensureInitialized() { - final receivePort = ReceivePort(); - sendPort.send(receivePort.sendPort); - - receivePort.listen((message) { - if (message is _CancelTaskAction) { - _flag = TaskFlag.cancel; - } else if (message is _PauseTaskAction) { - _flag = TaskFlag.pause; - } - }); + _emitter?.call(_tryToTransferableTypedData(data)); } } @@ -133,21 +101,16 @@ final class _PauseTaskAction { const _PauseTaskAction(); } -final class _EmitTaskAction { - _EmitTaskAction(dynamic data) { - if (data is Uint8List) { - _data = TransferableTypedData.fromList([data]); - } else { - _data = data; - } +dynamic _tryFromTransferableTypedData(dynamic data) { + if (data is TransferableTypedData) { + return data.materialize().asUint8List(); } - late final dynamic _data; + return data; +} - dynamic get data { - if (_data is TransferableTypedData) { - return _data.materialize().asUint8List(); - } else { - return _data; - } +dynamic _tryToTransferableTypedData(dynamic data) { + if (data is Uint8List) { + return TransferableTypedData.fromList([data]); } + return data; } diff --git a/lib/src/reusable_isolate.dart b/lib/src/reusable_isolate.dart new file mode 100644 index 0000000..5aeda95 --- /dev/null +++ b/lib/src/reusable_isolate.dart @@ -0,0 +1,278 @@ +import 'dart:async'; +import 'dart:isolate'; + +import 'package:flutter/cupertino.dart'; +import 'package:flutter/foundation.dart'; +import 'package:task_manager/src/utils/generate_incremental_id.dart'; + +typedef IsolateEnterPoint = FutureOr Function( + M message, Stream receive, void Function(dynamic) emit); + +abstract class IsolateCallback { + Stream get receive; + + void emit(dynamic data); + + Future wait(); +} + +final class ReusableIsolate { + static final Set<_IsolateWrapper> _idleIsolates = {}; + + static IsolateCallback run( + IsolateEnterPoint entryPoint, M message) { + if (_idleIsolates.isNotEmpty) { + final isolate = _idleIsolates.last; + _idleIsolates.remove(isolate); + return isolate.run(entryPoint, message); + } else { + final isolate = _IsolateWrapper( + onIdle: (wrapper) => _idleIsolates.add(wrapper), + onRunning: (wrapper) => _idleIsolates.remove(wrapper), + onExit: (wrapper) => _idleIsolates.remove(wrapper), + ); + return isolate.run(entryPoint, message); + } + } +} + +final class _IsolateWrapper { + _IsolateWrapper({ + required this.onIdle, + required this.onRunning, + required this.onExit, + }) { + _receivePort.listen(_listen); + + Isolate.spawn( + _backgroundIsolateEntryPoint, + _receivePort.sendPort, + debugName: _id, + ).then((value) {}).onError((error, stackTrace) { + _completer.completeError(error!); + onExit(this); + }); + } + + final void Function(_IsolateWrapper wrapper) onIdle; + final void Function(_IsolateWrapper wrapper) onRunning; + final void Function(_IsolateWrapper wrapper) onExit; + + final _id = generateIncrementalId('_IsolateWrapper'); + final _receivePort = ReceivePort(); + final _completer = Completer(); + final _tasks = {}; + + IsolateCallback run(IsolateEnterPoint entryPoint, M message) { + final id = generateIncrementalId('$runtimeType-task'); + final task = _IsolateCallback(); + _tasks[id] = task; + + _completer.future.then((sendPort) { + sendPort.send(_Handler(id, entryPoint, message)); + task.emitter = (data) { + sendPort.send(_Emitter(id, data)); + }; + }).onError((error, stackTrace) { + debugPrint('SendPort - error: $error'); + task._onError(error); + _tasks.remove(id); + }); + + return task; + } + + void _listen(message) { + if (message is SendPort) { + _completer.complete(message); + } else if (message is _Result) { + if (message.isIdle) { + onIdle(this); + } + final task = _tasks.remove(message.id); + if (task != null) { + task._onComplete(message.value); + } + } else if (message is _Error) { + if (message.isIdle) { + onIdle(this); + } + final task = _tasks.remove(message.id); + if (task != null) { + task._onError(message.error); + } + } else if (message is _Emitter) { + final task = _tasks[message.id]; + if (task != null) { + task._onReceive(message.value); + } + } else if (message is _RequestExit) { + onExit(this); + _completer.future.then((sendPort) { + sendPort.send(_ApproveExit(message.id)); + }); + } + } + + @override + int get hashCode => _id.hashCode; + + @override + bool operator ==(Object other) { + if (other is _IsolateWrapper) { + return other._id == _id; + } + return false; + } +} + +void _backgroundIsolateEntryPoint(SendPort sendPort) async { + final receivePort = ReceivePort(); + sendPort.send(receivePort.sendPort); + + final broadcastReceivePort = receivePort.asBroadcastStream(); + String? lastTaskId; + await for (var element in broadcastReceivePort) { + if (element is _Handler) { + Future.microtask(() async { + final taskId = element.id; + lastTaskId = taskId; + try { + final receive = broadcastReceivePort + .where((event) => event is _Emitter) + .cast<_Emitter>() + .where((event) => event.id == element.id) + .map((event) => event.value); + final result = await element.run( + receive, + (value) { + sendPort.send(_Emitter(element.id, value)); + }, + ); + sendPort.send(_Result(element.id, result, taskId == lastTaskId)); + } catch (e) { + sendPort.send(_Error(element.id, e, taskId == lastTaskId)); + } + + Future.delayed(const Duration(seconds: 5)).then((value) { + if (taskId == lastTaskId) { + sendPort.send(_RequestExit(lastTaskId)); + } + }); + }); + } else if (element is _ApproveExit) { + if (element.id == lastTaskId) { + receivePort.close(); + return; + } + } + } +} + +class _Handler { + const _Handler(this.id, this.entryPoint, this.message); + final String id; + final IsolateEnterPoint entryPoint; + final M message; + + FutureOr run(Stream receive, void Function(dynamic) emit) { + return entryPoint(message, receive, emit); + } +} + +class _Result { + const _Result(this.id, this.value, this.isIdle); + final String id; + final dynamic value; + final bool isIdle; + + @override + String toString() => '$id - Result: $value'; +} + +class _Error { + const _Error(this.id, this.error, this.isIdle); + final String id; + final dynamic error; + final bool isIdle; + + @override + String toString() => '$id - Error: $error'; +} + +class _Emitter { + const _Emitter(this.id, this.value); + final String id; + final dynamic value; + + @override + String toString() => '$id - Emitted: $value'; +} + +class _RequestExit { + const _RequestExit(this.id); + + final String? id; + + @override + String toString() => 'RequestExit'; +} + +class _ApproveExit { + const _ApproveExit(this.id); + + final String? id; + + @override + String toString() => 'ApproveExit'; +} + +class _IsolateCallback implements IsolateCallback { + _IsolateCallback(); + + final Completer _completer = Completer(); + StreamController? _controller; + final List _todo = []; + void Function(dynamic data)? _emitter; + set emitter(void Function(dynamic data) value) { + _emitter = value; + for (var data in _todo) { + value(data); + } + _todo.clear(); + } + + @override + Stream get receive { + _controller ??= StreamController.broadcast(); + return _controller!.stream; + } + + @override + void emit(dynamic data) { + if (_emitter != null) { + _emitter?.call(data); + } else { + _todo.add(data); + } + } + + @override + Future wait() { + return _completer.future; + } + + void _onReceive(dynamic value) { + _controller?.add(value); + } + + void _onComplete(R value) { + _controller?.close(); + _completer.complete(value); + } + + void _onError(dynamic error) { + _controller?.close(); + _completer.completeError(error); + } +} diff --git a/lib/task_manager.dart b/lib/task_manager.dart index 5a555e9..881958b 100644 --- a/lib/task_manager.dart +++ b/lib/task_manager.dart @@ -1,10 +1,12 @@ library task_manager; +export 'src/reusable_isolate.dart'; + import 'dart:async'; import 'dart:isolate'; -import 'dart:typed_data'; import 'package:flutter/foundation.dart'; +import 'package:task_manager/src/reusable_isolate.dart'; import 'package:task_manager/src/utils/generate_incremental_id.dart'; import 'package:task_manager/src/utils/priority_queue.dart'; diff --git a/test/task_manager_test.dart b/test/task_manager_test.dart index 491cf8b..cd520dc 100644 --- a/test/task_manager_test.dart +++ b/test/task_manager_test.dart @@ -57,6 +57,14 @@ void main() { final task = worker.run(const CountdownComputeOperation(), 10); _listenTask(task); expect(task.status, TaskStatus.running); + + await Future.delayed(const Duration(milliseconds: 400)); + task.pause(); + await Future.delayed(const Duration(milliseconds: 2000)); + expect(task.status, TaskStatus.paused); + + task.resume(); + await task.wait(); expect(task.status, TaskStatus.completed); }); @@ -122,6 +130,32 @@ void main() { expect(worker.length, 0); }); + + test('reusable_isolate', () async { + final task = ReusableIsolate.run( + (message, receive, emit) async { + int value = message; + receive.listen((event) { + emit(event); + }); + while (value > 0) { + await Future.delayed(const Duration(milliseconds: 200)); + value -= 1; + emit(value); + } + return message * 10; + }, + 10, + ); + + Future.delayed(const Duration(milliseconds: 1000)).then((value) { + task.emit(20); + }); + + final list = await task.receive.toSet(); + expect(list, {9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 20}); + expect(await task.wait(), 100); + }); } void _listenTask(Task task) {