From bd94a1a0c42d324085925bb7b408116184cbcca6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=A8=E9=A3=8E?= Date: Sun, 4 Feb 2024 00:43:15 +0800 Subject: [PATCH] Support compute task --- example/lib/countdown_operation.dart | 3 + example/lib/main.dart | 3 +- example/lib/task_manager_view.dart | 21 +- .../isolate_operation_context_impl.dart | 137 +++++------- lib/src/operation/operation_context_impl.dart | 81 ++++--- lib/src/scheduling/scheduler.dart | 182 +++++++-------- lib/src/scheduling/worker.dart | 210 ++++-------------- lib/src/scheduling/worker_isolate.dart | 49 ---- lib/src/storage/storage.dart | 12 +- lib/src/storage/storage_manager.dart | 69 +++--- lib/src/task/hydrated_task_impl.dart | 37 --- lib/src/task/task_impl.dart | 135 ++++------- lib/src/task/task_priority.dart | 7 - lib/src/utils/priority_queue.dart | 12 +- lib/src/utils/reused_object.dart | 21 ++ lib/task_manager.dart | 117 +++++++--- 16 files changed, 446 insertions(+), 650 deletions(-) delete mode 100644 lib/src/scheduling/worker_isolate.dart delete mode 100644 lib/src/task/hydrated_task_impl.dart delete mode 100644 lib/src/task/task_priority.dart diff --git a/example/lib/countdown_operation.dart b/example/lib/countdown_operation.dart index 6041427..d45cc5a 100644 --- a/example/lib/countdown_operation.dart +++ b/example/lib/countdown_operation.dart @@ -8,6 +8,9 @@ class CountdownOperation extends HydratedOperation { @override String get name => 'Countdown'; + @override + bool get compute => true; + @override FutureOr> run(OperationContext context) async { int data = context.data; diff --git a/example/lib/main.dart b/example/lib/main.dart index 4fbd3c9..942c896 100644 --- a/example/lib/main.dart +++ b/example/lib/main.dart @@ -43,12 +43,11 @@ class _MyHomePageState extends State { super.initState(); worker.maxConcurrencies = 2; - worker.loadTasksWithStorage(); } void _addTasks() { for (var i = 0; i < 1; i++) { - worker.addTask(const CountdownOperation(), 60); + worker.run(const CountdownOperation(), 60); } } diff --git a/example/lib/task_manager_view.dart b/example/lib/task_manager_view.dart index dcff8ca..0171a97 100644 --- a/example/lib/task_manager_view.dart +++ b/example/lib/task_manager_view.dart @@ -64,18 +64,15 @@ class TaskManagerView extends StatelessWidget { ), const Divider(height: 1, thickness: 1), Expanded( - child: Container( - // color: color, - child: StreamBuilder( - initialData: tasks(worker), - stream: worker.stream - .map((event) => tasks(event)) - .distinct(listEquals), - builder: (context, snapshot) { - final tasks = snapshot.requireData; - return _TaskListView(tasks: tasks); - }, - ), + child: StreamBuilder( + initialData: tasks(worker), + stream: worker.stream + .map((event) => tasks(event)) + .distinct(listEquals), + builder: (context, snapshot) { + final tasks = snapshot.requireData; + return _TaskListView(tasks: tasks); + }, ), ) ], diff --git a/lib/src/operation/isolate_operation_context_impl.dart b/lib/src/operation/isolate_operation_context_impl.dart index 91673c1..7f55554 100644 --- a/lib/src/operation/isolate_operation_context_impl.dart +++ b/lib/src/operation/isolate_operation_context_impl.dart @@ -3,38 +3,66 @@ part of '../../task_manager.dart'; class IsolateOperationContextImpl extends OperationContextImpl { IsolateOperationContextImpl({ required super.initialData, - super.id, - super.identifier, super.priority, super.status, }) { _receivePort.listen((message) { - debugPrint('receive: $message'); - if (message is SendPort) { - _sendPort.complete(message); - message.send('hello background'); - } else if (message is _Emit) { - emit(message.data); - } else if (message is Result) { - handlerResult(message); + switch (message.runtimeType) { + case SendPort: + _sendPort.complete(message); + break; + case _EmitTaskAction: + emit((message as _EmitTaskAction).data); + case Result: + _handlerResult(message); + break; + default: } }); } @override - void setup( - {D? data, TaskStatus? status, TaskFlag? flag, TaskPriority? priority}) { - super.setup(data: data, status: status, flag: flag, priority: priority); - if (flag == TaskFlag.cancel) { - _sendPort.future.then((value) => value.send(const _Cancel())); - } else if (flag == TaskFlag.pause) { - _sendPort.future.then((value) => value.send(const _Pause())); + void pause() { + super.pause(); + _sendPort.future.then((value) => value.send(const _PauseTaskAction())); + } + + @override + void cancel() { + super.cancel(); + _sendPort.future.then((value) => value.send(const _CancelTaskAction())); + } + + @override + Future run(Operation operation) async { + status = TaskStatus.running; + _controller.add(this); + + try { + final context = wrapper(); + final result = await compute( + (message) async { + final operation = message[0] as Operation; + final context = message[1] as IsolateOperationContextImplWrapper; + context.ensureInitialized(); + return await operation.run(context); + }, + [ + operation, + context, + ], + ); + _handlerResult(result as Result); + return result.type; + } catch (e) { + _handlerResult(Result.error(e)); + return ResultType.error; } } @override - void handlerResult(Result result) { - super.handlerResult(result); + void _handlerResult(Result result) { + super._handlerResult(result); switch (result.type) { case ResultType.canceled: case ResultType.completed: @@ -53,9 +81,6 @@ class IsolateOperationContextImpl extends OperationContextImpl { return IsolateOperationContextImplWrapper( sendPort: _receivePort.sendPort, initialData: data, - id: id, - identifier: identifier, - priority: priority, ); } } @@ -64,94 +89,50 @@ class IsolateOperationContextImplWrapper extends OperationContext { IsolateOperationContextImplWrapper({ required this.sendPort, required D initialData, - required this.id, - required this.identifier, - required this.priority, }) : data = initialData; final SendPort sendPort; - late final ReceivePort receivePort; @override D data; @override - final String id; - - @override - final String? identifier; - - @override - final TaskPriority priority; + bool get shouldCancel => _flag == TaskFlag.cancel; @override - TaskStatus get status => TaskStatus.running; + bool get shouldPause => _flag == TaskFlag.pause; TaskFlag _flag = TaskFlag.none; @override void emit(D data) { this.data = data; - sendPort.send(_Emit(data)); + sendPort.send(_EmitTaskAction(data)); } void ensureInitialized() { - receivePort = ReceivePort(); + final receivePort = ReceivePort(); sendPort.send(receivePort.sendPort); receivePort.listen((message) { - if (message is _Cancel) { + if (message is _CancelTaskAction) { _flag = TaskFlag.cancel; - } else if (message is _Pause) { + } else if (message is _PauseTaskAction) { _flag = TaskFlag.pause; } }); } - - @override - bool get shouldCancel => _flag == TaskFlag.cancel; - - @override - bool get shouldPause => _flag == TaskFlag.pause; } -final class _Cancel { - const _Cancel(); +final class _CancelTaskAction { + const _CancelTaskAction(); } -final class _Pause { - const _Pause(); +final class _PauseTaskAction { + const _PauseTaskAction(); } -final class _Emit { - _Emit(this.data); +final class _EmitTaskAction { + _EmitTaskAction(this.data); final dynamic data; - - @override - String toString() { - return '$data'; - } -} - -class IsolateTaskImpl { - IsolateTaskImpl({required this.operation, required this.context}); - - factory IsolateTaskImpl.from(TaskImpl> task) => - IsolateTaskImpl( - operation: task.operation, - context: (task._context as IsolateOperationContextImpl).wrapper(), - ); - - final Operation operation; - final IsolateOperationContextImplWrapper context; - - Future> run() async { - try { - context.ensureInitialized(); - final result = await operation.run(context); - return result; - } catch (e) { - return Result.error(e); - } - } } diff --git a/lib/src/operation/operation_context_impl.dart b/lib/src/operation/operation_context_impl.dart index 25e26c0..a446475 100644 --- a/lib/src/operation/operation_context_impl.dart +++ b/lib/src/operation/operation_context_impl.dart @@ -3,12 +3,9 @@ part of '../../task_manager.dart'; class OperationContextImpl extends OperationContext { OperationContextImpl({ required D initialData, - String? id, - this.identifier, this.priority = TaskPriority.normal, this.status = TaskStatus.pending, - }) : data = initialData, - id = id ?? generateIncrementalId('task'); + }) : data = initialData; TaskFlag _flag = TaskFlag.none; final _controller = StreamController>.broadcast(); @@ -16,16 +13,8 @@ class OperationContextImpl extends OperationContext { R? _result; dynamic _error; - @override - final String id; - - @override - final String? identifier; - - @override TaskPriority priority; - @override TaskStatus status; @override @@ -62,28 +51,64 @@ class OperationContextImpl extends OperationContext { _controller.add(this); } - void setup({ - D? data, - TaskStatus? status, - TaskFlag? flag, - TaskPriority? priority, - }) { - if (data != null) { - this.data = data; - } - if (status != null) { - this.status = status; + void cancel() { + switch (status) { + case TaskStatus.running: + _flag = TaskFlag.cancel; + _controller.add(this); + break; + case TaskStatus.pending: + case TaskStatus.paused: + _handlerResult(Result.canceled()); + break; + default: } - if (flag != null) { - _flag = flag; + } + + void pause() { + switch (status) { + case TaskStatus.running: + _flag = TaskFlag.pause; + _controller.add(this); + break; + case TaskStatus.pending: + status = TaskStatus.paused; + _controller.add(this); + break; + default: } - if (priority != null) { - this.priority = priority; + } + + void resume() { + switch (status) { + case TaskStatus.paused: + status = TaskStatus.pending; + _controller.add(this); + break; + default: } + } + + void setPriority(TaskPriority priority) { + this.priority = priority; + _controller.add(this); + } + + Future run(Operation operation) async { + status = TaskStatus.running; _controller.add(this); + + try { + final result = await operation.run(this); + _handlerResult(result); + return result.type; + } catch (e) { + _handlerResult(Result.error(e)); + return ResultType.error; + } } - void handlerResult(Result result) { + void _handlerResult(Result result) { switch (result.type) { case ResultType.paused: _flag = TaskFlag.none; diff --git a/lib/src/scheduling/scheduler.dart b/lib/src/scheduling/scheduler.dart index 4263d53..955fa50 100644 --- a/lib/src/scheduling/scheduler.dart +++ b/lib/src/scheduling/scheduler.dart @@ -5,8 +5,26 @@ typedef TaskId = String; typedef SchedulerIdentifier = String; enum TaskIdentifierStrategy { - reuse, - cancel, + reuse, // Reuse tasks with the same identifier + cancel, // Cancel the previous task with the same identifier +} + +mixin Scheduleable { + Scheduler? _scheduler; + + Scheduler? get scheduler => _scheduler; + + void ensureInitialized(Scheduler value) { + if (_scheduler == null) { + _scheduler = value; + } else { + throw StateError('Task is already initialized'); + } + } + + TaskStatus get status; + + FutureOr run(); } abstract class Scheduler { @@ -30,12 +48,11 @@ abstract class Scheduler { TaskImpl? contains(TaskId id, TaskIdentifier? identifier); - void putIfAbsent( - TaskId id, TaskIdentifier? identifier, TaskImpl Function() ifAbsent); + TaskImpl? taskOfIdentifier(TaskIdentifier identifier); - bool add(TaskImpl task); + TaskImpl putIfAbsent(TaskIdentifier identifier, TaskImpl Function() ifAbsent); - // void addAll(Iterable schedulables); + bool add(TaskImpl task); void pause(TaskImpl task); @@ -43,7 +60,7 @@ abstract class Scheduler { void cancel(TaskImpl task); - void setPriority(TaskImpl task, TaskPriority newPriority); + void setPriority(TaskImpl task, {required TaskPriority oldPriority}); Future waitForAllTasksToComplete(); @@ -70,7 +87,7 @@ class SchedulerImpl extends Scheduler { @override final String identifier; - final FutureOr Function(TaskImpl task) executeTask; + final Future Function(TaskImpl task) executeTask; final _controller = StreamController.broadcast(); Completer _completer = Completer(); @@ -112,10 +129,20 @@ class SchedulerImpl extends Scheduler { } @override - void putIfAbsent(TaskId id, TaskIdentifier? identifier, - TaskImpl Function() ifAbsent) { - if (contains(id, identifier) == null) { - add(ifAbsent()); + TaskImpl? taskOfIdentifier(TaskIdentifier identifier) { + return _taskOfIdentifier[identifier]; + } + + @override + TaskImpl putIfAbsent( + TaskIdentifier identifier, TaskImpl Function() ifAbsent) { + final task = _taskOfIdentifier[identifier]; + if (task == null) { + final task = ifAbsent(); + add(task); + return task; + } else { + return task; } } @@ -163,47 +190,33 @@ class SchedulerImpl extends Scheduler { @override void pause(TaskImpl task) { if (task.status == TaskStatus.pending) { - _updateAndNotify(() { - _pendingTasks.remove(task); - _pausedTasks[task.id] = task; - task.onPaused(); - }); + _pendingTasks.remove(task); + _pausedTasks[task.id] = task; + _notify(); } } @override void resume(TaskImpl task) { - if (task.status == TaskStatus.paused) { - _updateAndNotify(() { - _pausedTasks.remove(task.id); - _pendingTasks.add(task); - task.onRunning(); - _resetCompleter(); - executePendingTasks(); - }); - } + _pausedTasks.remove(task.id); + _pendingTasks.add(task); + executePendingTasks(); } @override void cancel(TaskImpl task) { - _updateAndNotify(() { - if (task.status == TaskStatus.pending) { - _pendingTasks.remove(task); - task.onCanceled(); - } else if (task.status == TaskStatus.paused) { - _pausedTasks.remove(task.id); - task.onCanceled(); - } - }); + if (task.status == TaskStatus.pending) { + _pendingTasks.remove(task); + } else if (task.status == TaskStatus.paused) { + _pausedTasks.remove(task.id); + } + _notify(); } @override - void setPriority(TaskImpl task, TaskPriority newPriority) { - if (task.status == TaskStatus.running || task.status == TaskStatus.paused) { - task._change(priority: newPriority); - } else if (task.status == TaskStatus.pending) { - _pendingTasks.remove(task); - task._change(priority: newPriority); + void setPriority(TaskImpl task, {required TaskPriority oldPriority}) { + if (task.status == TaskStatus.pending) { + _pendingTasks.remove(task, priority: oldPriority.index); _pendingTasks.add(task); } } @@ -228,45 +241,38 @@ class SchedulerImpl extends Scheduler { } void executePendingTasks() { - _updateAndNotify(() { - while (!isPendingTasksEmpty && !isMaxConcurrencyReached) { - final task = _pendingTasks.removeFirst(); - _executeTask(task); - } - }); - if (_runningTasks.isEmpty && _pendingTasks.isEmpty && _pausedTasks.isEmpty) { _complete(); + return; } + + while (!isPendingTasksEmpty && !isMaxConcurrencyReached) { + final task = _pendingTasks.removeFirst(); + _executeTask(task); + } + _notify(); } void _executeTask(TaskImpl task) async { - _updateAndNotify(() { - _runningTasks[task.id] = task; - task.onRunning(); - }); - - Future.microtask(() => executeTask(task)).then((value) { - _updateAndNotify(() { - _runningTasks.remove(task.id); - task.onCompleted(value); - - if (value.type == ResultType.paused) { - _pausedTasks[task.id] = task; - } + _runningTasks[task.id] = task; + _notify(); - if (isPendingTasksEmpty) { - if (_pausedTasks.isEmpty && _runningTasks.isEmpty) { - _complete(); - } - } else { - executePendingTasks(); + executeTask(task).then((value) { + if (value == ResultType.paused) { + _pausedTasks[task.id] = task; + } + }).whenComplete(() { + _runningTasks.remove(task.id); + if (isPendingTasksEmpty) { + if (_pausedTasks.isEmpty && _runningTasks.isEmpty) { + _complete(); } - }); - }).onError((error, stackTrace) { - task.onError(error); + _notify(); + } else { + executePendingTasks(); + } }); } @@ -282,42 +288,6 @@ class SchedulerImpl extends Scheduler { } } - String? _changeIdentifier; - - void _updateAndNotify(void Function() callback) { - if (_changeIdentifier != null) { - callback(); - } else { - _changeIdentifier = generateIncrementalId('scheduler-change'); - - final runningTasksCount = _runningTasks.length; - final pendingTasksCount = _pendingTasks.length; - final pausedTasksCount = _pausedTasks.length; - - try { - callback(); - if (runningTasksCount != _runningTasks.length || - pendingTasksCount != _pendingTasks.length || - pausedTasksCount != _pausedTasks.length) { - _controller.add(this); - } - - _changeIdentifier = null; - } catch (e) { - debugPrint('Error: $e'); - - if (runningTasksCount != _runningTasks.length || - pendingTasksCount != _pendingTasks.length || - pausedTasksCount != _pausedTasks.length) { - _controller.add(this); - } - - _changeIdentifier = null; - rethrow; - } - } - } - void _notify() { _controller.add(this); } diff --git a/lib/src/scheduling/worker.dart b/lib/src/scheduling/worker.dart index dba0263..e54db4b 100644 --- a/lib/src/scheduling/worker.dart +++ b/lib/src/scheduling/worker.dart @@ -5,7 +5,7 @@ final Map _workers = {}; class WorkerImpl extends Worker { WorkerImpl._(String identifier) : super._() { _scheduler = SchedulerImpl( - executeTask: executeTask, + executeTask: _executeTask, identifier: identifier, ); } @@ -51,182 +51,58 @@ class WorkerImpl extends Worker { @override void clear() => _scheduler.clear(); - // factory Worker.isolate() => IsolateWorker(); - @override - TaskImpl> addTask( - Operation operation, - D initialData, { - bool isPaused = false, - }) { - final task = _createTask( - operation, - initialData, - isPaused: isPaused, - isHydrated: false, - ); - _scheduler.add(task); - return task; - } - - void registerScheduledTask( - String name, - Duration duration, - Task Function() builder, { - TaskPriority priority = TaskPriority.normal, - }) { - throw UnimplementedError(); - } - - void registerRepeatedTask( - String name, - Duration duration, - Task Function() builder, { - TaskPriority priority = TaskPriority.normal, - }) { - throw UnimplementedError(); - } - - void addTasks( - Operation operation, - List initialDatas, - ) { - for (var element in initialDatas) { - addTask(operation, element); - } - } - - FutureOr executeTask(TaskImpl task) { - return task.run(); - } - - @override - Stream loadTasksWithStorage() { - final controller = StreamController(); - - Future.microtask(() async { - final list = - await StorageManager.loadTasks(_scheduler.identifier).toList(); - for (var element in list) { - final entity = element.$2; - if (_scheduler.contains(entity.id, entity.identifier) != null) { - continue; - } - final operation = element.$1; - try { - final task = _createTaskWithEntity(entity, operation); - if (_scheduler.add(task)) { - controller.add(task); - } - } catch (e) { - continue; - } - } - controller.close(); - }).onError((error, stackTrace) { - controller.addError(error ?? -1, stackTrace); - controller.close(); - }); - - return controller.stream; - } - - TaskImpl> _createTaskWithEntity( - TaskEntity entity, - HydratedOperation operation, - ) { - return _createTask( - operation, - operation.fromJson(entity.data), - id: entity.id, - identifier: entity.identifier, - priority: entity.priority, - isPaused: entity.status == TaskStatus.paused, - isHydrated: true, - ); - } - - TaskImpl> _createTask( - Operation operation, - D initialData, { - String? id, - String? identifier, - TaskPriority priority = TaskPriority.normal, - required bool isPaused, - required bool isHydrated, - }) { - final context = _createContext( - operation, - initialData: initialData, - id: id, + Task run(Operation operation, D initialData, + {bool isPaused = false, + TaskPriority priority = TaskPriority.normal, + TaskIdentifier? identifier, + TaskIdentifierStrategy strategy = TaskIdentifierStrategy.reuse}) { + return _putIfAbsent( identifier: identifier, priority: priority, - isPaused: isPaused, - ); - if (operation is HydratedOperation) { - return HydratedTaskImpl._( - operation: operation, - context: context, - isHydrated: isHydrated, - ); - } else { - return TaskImpl._( + strategy: strategy, + ifAbsent: () => TaskImpl( operation: operation, - context: context, - ); - } - } - - OperationContextImpl _createContext( - Operation operation, { - required D initialData, - required String? id, - required String? identifier, - required TaskPriority priority, - required bool isPaused, - }) { - return operation._createContext( - initialData: initialData, - id: id, - identifier: identifier, - priority: priority, - isPaused: isPaused, + context: operation.compute + ? IsolateOperationContextImpl( + initialData: initialData, + priority: priority, + status: isPaused ? TaskStatus.paused : TaskStatus.pending, + ) + : OperationContextImpl( + initialData: initialData, + priority: priority, + status: isPaused ? TaskStatus.paused : TaskStatus.pending, + ), + identifier: identifier, + ), ); } -} - -abstract class _Operation { - const _Operation(); - OperationContextImpl _createContext({ - required D initialData, - required String? id, - required String? identifier, + Task _putIfAbsent({ + required TaskIdentifier? identifier, required TaskPriority priority, - required bool isPaused, + required TaskIdentifierStrategy strategy, + required TaskImpl Function() ifAbsent, }) { - return OperationContextImpl( - initialData: initialData, - id: id, - identifier: identifier, - priority: priority, - status: isPaused ? TaskStatus.paused : TaskStatus.pending, - ); + if (identifier != null) { + final task = _scheduler.taskOfIdentifier(identifier) as Task; + switch (strategy) { + case TaskIdentifierStrategy.reuse: + if (task.priority != priority) { + task.setPriority(priority); + } + return task; + case TaskIdentifierStrategy.cancel: + task.cancel(); + } + } + final task = ifAbsent(); + _scheduler.add(task); + return task; } - IsolateOperationContextImpl _createIsolateContext({ - required D initialData, - required String? id, - required String? identifier, - required TaskPriority priority, - required bool isPaused, - }) { - return IsolateOperationContextImpl( - initialData: initialData, - id: id, - identifier: identifier, - priority: priority, - status: isPaused ? TaskStatus.paused : TaskStatus.pending, - ); + Future _executeTask(TaskImpl task) { + return task.run(); } } diff --git a/lib/src/scheduling/worker_isolate.dart b/lib/src/scheduling/worker_isolate.dart deleted file mode 100644 index b475815..0000000 --- a/lib/src/scheduling/worker_isolate.dart +++ /dev/null @@ -1,49 +0,0 @@ -part of '../../task_manager.dart'; - -class IsolateWorker extends WorkerImpl { - IsolateWorker() : super._('IsolateWorker'); - - Task addIsolateTask( - Operation operation, - D initialData, { - bool isPaused = false, - Operation? mainIsolateOperation, - }) { - throw UnimplementedError(); - } - - @override - OperationContextImpl _createContext( - Operation operation, { - required D initialData, - required String? id, - required String? identifier, - required TaskPriority priority, - required bool isPaused, - }) { - return operation._createIsolateContext( - initialData: initialData, - id: id, - identifier: identifier, - priority: priority, - isPaused: isPaused, - ); - } - - @override - FutureOr executeTask(TaskImpl task) { - final context = task._context; - if (context is! IsolateOperationContextImpl) { - throw ArgumentError('Invalid task'); - } - return compute( - (message) { - return message.run(); - }, - IsolateTaskImpl( - operation: task.operation, - context: context.wrapper(), - ), - ); - } -} diff --git a/lib/src/storage/storage.dart b/lib/src/storage/storage.dart index fc6c593..dc6fc62 100644 --- a/lib/src/storage/storage.dart +++ b/lib/src/storage/storage.dart @@ -16,7 +16,8 @@ abstract class Storage { final class TaskEntity { TaskEntity({ - required this.type, + required this.operation, + // required this.type, required this.id, required this.identifier, required this.status, @@ -24,7 +25,8 @@ final class TaskEntity { required this.data, }); - final String type; + final String operation; + // final String type; final String id; final String? identifier; final TaskStatus status; @@ -33,7 +35,8 @@ final class TaskEntity { factory TaskEntity.fromJson(Map json) { return TaskEntity( - type: json['type'], + operation: json['operation'], + // type: json['type'], id: json['id'], identifier: json['identifier'], status: TaskStatus.values[json['status']], @@ -44,7 +47,8 @@ final class TaskEntity { Map toJson() { return { - 'type': type, + 'operation': operation, + // 'type': type, 'id': id, 'identifier': identifier, 'status': status.index, diff --git a/lib/src/storage/storage_manager.dart b/lib/src/storage/storage_manager.dart index 140b13b..b0418c8 100644 --- a/lib/src/storage/storage_manager.dart +++ b/lib/src/storage/storage_manager.dart @@ -16,50 +16,51 @@ class StorageManager { _registeredOperations[T.toString()] = create; } - static Future saveTask(HydratedTaskImpl task) async { - if (_storage == null) { - return; - } - if (task.status == TaskStatus.canceled || - task.status == TaskStatus.completed || - task.status == TaskStatus.error) { + static void listenTask(TaskImpl task) { + final operation = task.operation; + if (!_registeredOperations.containsKey(operation.runtimeType.toString())) { return; } - final scheduler = task._scheduler; - if (scheduler == null) { - return; - } - final type = task.operation.runtimeType.toString(); - if (!_registeredOperations.containsKey(type)) { - return; - } - await _storage!.write( - TaskEntity( - type: type, - id: task.id, - identifier: task.identifier, - status: task.status == TaskStatus.paused - ? TaskStatus.paused - : TaskStatus.pending, - priority: task.priority, - data: task.operation.toJson(task.data), - ), - scheduler.identifier, - ); + // task.stream.listen((event) { + // switch (event.status) { + // case TaskStatus.running: + // case TaskStatus.paused: + // case TaskStatus.pending: + // if (task.scheduler != null) { + // saveTask(task, task.scheduler!.identifier); + // } + // break; + // case TaskStatus.canceled: + // case TaskStatus.completed: + // case TaskStatus.error: + // if (task.scheduler != null) { + // deleteTask(task.id, task.scheduler!.identifier); + // } + // break; + // default: + // } + // }); } - static void deleteTask(HydratedTaskImpl task) { + static Future saveTask( + TaskEntity entity, + SchedulerIdentifier identifier, + ) async { if (_storage == null) { return; } - final scheduler = task._scheduler; - if (scheduler == null) { + final type = entity.operation; + if (!_registeredOperations.containsKey(type)) { return; } - _storage!.delete(task.id, scheduler.identifier); + await _storage!.write(entity, identifier); + } + + static void deleteTask(TaskId id, SchedulerIdentifier identifier) { + _storage!.delete(id, identifier); } - static Stream<(HydratedOperation, TaskEntity)> loadTasks( + static Stream<(HydratedOperation, TaskEntity)> loadTaskEntity( SchedulerIdentifier schedulerIdentifier) { if (_storage == null) { return const Stream.empty(); @@ -67,7 +68,7 @@ class StorageManager { return _storage! .readAll(schedulerIdentifier) .map((event) { - final creater = _registeredOperations[event.type]; + final creater = _registeredOperations[event.operation]; if (creater == null) { _storage!.delete(event.id, schedulerIdentifier); return null; diff --git a/lib/src/task/hydrated_task_impl.dart b/lib/src/task/hydrated_task_impl.dart deleted file mode 100644 index dc42519..0000000 --- a/lib/src/task/hydrated_task_impl.dart +++ /dev/null @@ -1,37 +0,0 @@ -part of '../../task_manager.dart'; - -class HydratedTaskImpl> - extends TaskImpl { - HydratedTaskImpl._({ - required O operation, - required OperationContextImpl context, - required this.isHydrated, - }) : super._(operation: operation, context: context) { - stream.listen((event) { - switch (event.status) { - case TaskStatus.running: - case TaskStatus.paused: - case TaskStatus.pending: - if (_scheduler != null) StorageManager.saveTask(this); - break; - case TaskStatus.canceled: - case TaskStatus.completed: - case TaskStatus.error: - if (_scheduler != null) StorageManager.deleteTask(this); - break; - default: - } - }); - } - - final bool isHydrated; - - @override - void ensureInitialized(Scheduler value) { - super.ensureInitialized(value); - - if (!isHydrated) { - StorageManager.saveTask(this); - } - } -} diff --git a/lib/src/task/task_impl.dart b/lib/src/task/task_impl.dart index c3fc40b..73b17ff 100644 --- a/lib/src/task/task_impl.dart +++ b/lib/src/task/task_impl.dart @@ -1,94 +1,70 @@ part of '../../task_manager.dart'; -class TaskImpl> extends Task - with PriorityMixin { - TaskImpl._({ +class TaskImpl extends Task with PriorityMixin { + TaskImpl({ required this.operation, - required OperationContextImpl context, - }) : _context = context; + required this.context, + String? id, + this.identifier, + }) : id = id ?? generateIncrementalId('task'); - @override - String get name => operation.name; + final OperationContextImpl context; @override - String get id => _context.id; + final Operation operation; @override - String? get identifier => _context.identifier; + final String id; @override - TaskPriority get priority => _context.priority; + final String? identifier; @override - TaskStatus get status => _context.status; + TaskPriority get priority => context.priority; @override - D get data => _context.data; + TaskStatus get status => context.status; @override - bool get shouldCancel => _context.shouldCancel; + D get data => context.data; @override - bool get shouldPause => _context.shouldPause; + bool get shouldCancel => context.shouldCancel; @override - Stream> get stream => _context.stream.map((event) => this); + bool get shouldPause => context.shouldPause; - final OperationContextImpl _context; - final O operation; + @override + Stream> get stream => context.stream.map((event) => this); - FutureOr> run() { - return operation.run(_context); - } + Future run() => context.run(operation); @override void cancel() { - if (status == TaskStatus.running) { - _change(flag: TaskFlag.cancel); - } else if (status == TaskStatus.pending || status == TaskStatus.paused) { - if (_scheduler != null) { - _scheduler?.cancel(this); - } else { - _change(status: TaskStatus.canceled, flag: TaskFlag.none); - } - } + _scheduler?.cancel(this); + context.cancel(); } @override void pause() { - if (status == TaskStatus.running) { - _change(flag: TaskFlag.pause); - } else if (status == TaskStatus.pending) { - if (_scheduler != null) { - _scheduler?.pause(this); - } else { - _change(status: TaskStatus.paused, flag: TaskFlag.none); - } - } + _scheduler?.pause(this); + context.pause(); } @override void resume() { - if (status == TaskStatus.paused) { - if (_scheduler != null) { - _scheduler?.resume(this); - } else { - _change(status: TaskStatus.pending, flag: TaskFlag.none); - } - } + _scheduler?.resume(this); + context.resume(); } @override - void changePriority(TaskPriority priority) { - if (_scheduler != null) { - _scheduler?.setPriority(this, priority); - } else { - _change(priority: priority); - } + void setPriority(TaskPriority priority) { + _scheduler?.setPriority(this, oldPriority: context.priority); + context.setPriority(priority); } @override - Future wait() => _context.wait(); + Future wait() => context.wait(); @override int get hashCode => id.hashCode; @@ -103,11 +79,11 @@ class TaskImpl> extends Task @override String toString() { - return '${operation.runtimeType}: ${_context.data}'; + return '${operation.runtimeType}: ${context.status}'; } @override - int get priorityValue => _context.priority.index; + int get priorityValue => context.priority.index; /// Private Scheduler? _scheduler; @@ -118,40 +94,6 @@ class TaskImpl> extends Task } _scheduler = value; } - - void onCompleted(Result result) { - _context.handlerResult(result); - } - - void onError(dynamic error) { - _context.handlerResult(Result.error(error)); - } - - void onPaused() { - _context.handlerResult(Result.paused()); - } - - void onCanceled() { - _context.handlerResult(Result.canceled()); - } - - void onRunning() { - _context.setup(status: TaskStatus.running); - } - - void _change({ - D? data, - TaskStatus? status, - TaskFlag? flag, - TaskPriority? priority, - }) { - _context.setup( - data: data, - status: status, - flag: flag, - priority: priority, - ); - } } enum TaskStatus { @@ -173,9 +115,18 @@ enum TaskFlag { class CanceledException extends Error { CanceledException(); +} - @override - String toString() { - return 'CanceledException'; - } +enum TaskType { + normal, + isolate, +} + +TaskType getTaskType(Task task) { + return TaskType.normal; +} + +OperationContext getTaskForType(TaskType type) { + // return OperationContextImpl(); + throw UnimplementedError(); } diff --git a/lib/src/task/task_priority.dart b/lib/src/task/task_priority.dart deleted file mode 100644 index 58c28f5..0000000 --- a/lib/src/task/task_priority.dart +++ /dev/null @@ -1,7 +0,0 @@ -enum TaskPriority { - veryLow, - low, - normal, - high, - veryHigh, -} diff --git a/lib/src/utils/priority_queue.dart b/lib/src/utils/priority_queue.dart index 9be2b3d..779dffa 100644 --- a/lib/src/utils/priority_queue.dart +++ b/lib/src/utils/priority_queue.dart @@ -46,7 +46,7 @@ abstract class PriorityQueue { /// only one of them is removed. /// /// Uses the [Object.==] of elements in the queue to check - bool remove(E element); + bool remove(E element, {int? priority}); /// Removes all the elements from this queue and returns them. List removeAll(); @@ -133,16 +133,16 @@ class PriorityQueueImpl implements PriorityQueue { } @override - bool remove(E element) { - final priority = element.priorityValue; - final list = _listOfPriority[priority]; + bool remove(E element, {int? priority}) { + final priorityValue = priority ?? element.priorityValue; + final list = _listOfPriority[priorityValue]; if (list == null) { return false; } final removed = list.remove(element); if (list.isEmpty) { - _priorities.remove(priority); - _listOfPriority.remove(priority); + _priorities.remove(priorityValue); + _listOfPriority.remove(priorityValue); } return removed; } diff --git a/lib/src/utils/reused_object.dart b/lib/src/utils/reused_object.dart index 0bbf2ab..3518fb7 100644 --- a/lib/src/utils/reused_object.dart +++ b/lib/src/utils/reused_object.dart @@ -31,4 +31,25 @@ class GlobalReusedObject { } return reusedObject.create(); } + + operator [](String key) { + final reusedObject = _reusedObjects[key]; + if (reusedObject == null) { + throw StateError('Reused object not found: $key'); + } + return reusedObject.create(); + } + + operator []=(String key, ReusedObject value) { + _reusedObjects[key] = value; + } + + dynamic putIfAbsent(String key, dynamic Function() builder) { + if (_reusedObjects.containsKey(key)) { + return _reusedObjects[key]; + } + final value = builder(); + _reusedObjects[key] = value; + return value; + } } diff --git a/lib/task_manager.dart b/lib/task_manager.dart index c93b2f6..9e8302b 100644 --- a/lib/task_manager.dart +++ b/lib/task_manager.dart @@ -4,12 +4,10 @@ import 'dart:async'; import 'dart:isolate'; import 'package:flutter/foundation.dart'; -import 'package:task_manager/src/task/task_priority.dart'; import 'package:task_manager/src/utils/generate_incremental_id.dart'; import 'package:task_manager/src/utils/priority_queue.dart'; part 'src/task/task_impl.dart'; -part 'src/task/hydrated_task_impl.dart'; part 'src/task/result.dart'; part 'src/operation/operation_context_impl.dart'; @@ -17,20 +15,11 @@ part 'src/operation/isolate_operation_context_impl.dart'; part 'src/scheduling/scheduler.dart'; part 'src/scheduling/worker.dart'; -part 'src/scheduling/worker_isolate.dart'; part 'src/storage/storage.dart'; part 'src/storage/storage_manager.dart'; abstract class OperationContext { - String get id; - - String? get identifier; - - TaskPriority get priority; - - TaskStatus get status; - D get data; bool get shouldCancel; @@ -40,26 +29,21 @@ abstract class OperationContext { void emit(D data); } -abstract class Operation extends _Operation { +abstract class Operation { const Operation(); String get name => runtimeType.toString(); - FutureOr> run(OperationContext context); -} - -mixin HydratedOperationMixin { - D fromJson(dynamic json); - dynamic toJson(D data); -} + /// If [compute] is true, the operation will run in isolate + bool get compute => false; -abstract class HydratedOperation extends Operation - with HydratedOperationMixin { - const HydratedOperation(); + FutureOr> run(OperationContext context); } abstract class Task { - String get name; + String get name => operation.name; + + Operation get operation; String get id; @@ -85,7 +69,7 @@ abstract class Task { void resume(); - void changePriority(TaskPriority priority); + void setPriority(TaskPriority priority); } abstract class Worker { @@ -103,12 +87,89 @@ abstract class Worker { List get pendingTasks; List get pausedTasks; - Task addTask(Operation operation, D initialData, - {bool isPaused = false}); - + Task run( + Operation operation, + D initialData, { + bool isPaused = false, + TaskPriority priority = TaskPriority.normal, + TaskIdentifier? identifier, + TaskIdentifierStrategy strategy = TaskIdentifierStrategy.reuse, + }); + + // void registerScheduledTask( + // String name, + // Duration duration, + // Task Function() builder, { + // TaskPriority priority = TaskPriority.normal, + // }) { + // throw UnimplementedError(); + // } + + void registerRepeatedTask( + Operation operation, + D initialData, { + required String name, + required Duration timeInterval, + TaskPriority priority = TaskPriority.normal, + Duration Function( + R result, + int runCount, + int runTime, + Duration previousTimeInterval, + )? nextTimeInterval, + bool Function(R? result, dynamic error, int runCount, int runTime)? + terminate, + }) { + throw UnimplementedError(); + } + + /// Wait for all tasks to complete Future wait(); + /// Clear all tasks void clear(); - Stream loadTasksWithStorage(); + // Future cancelTaskWithIdentifier(TaskIdentifier identifier); } + +abstract class HydratedOperation extends Operation { + const HydratedOperation(); + + D fromJson(dynamic json); + dynamic toJson(D data); +} + +abstract class HydratedWorker extends Worker { + HydratedWorker._() : super._(); + + // Stream loadTasks(); +} + +enum TaskPriority { + veryLow, + low, + normal, + high, + veryHigh, +} + +// mixin ComputeMixin { +// // +// } + +// mixin HydratedMixin { +// D fromJson(dynamic json); +// dynamic toJson(D data); +// } + + +// final class BlockOperation extends Operation { +// const BlockOperation(this.block); + +// final FutureOr> Function(OperationContext context) block; + +// @override +// FutureOr> run(OperationContext context) async { +// return block(context); +// } +// } \ No newline at end of file