diff --git a/README.md b/README.md index 5dc88af..af87d0c 100644 --- a/README.md +++ b/README.md @@ -33,9 +33,7 @@ task_manager: ## Usage -### Create a task - -The following example demonstrates how to create a simple task: +### Run a task ```dart class ExampleOperation extends Operation { @@ -53,15 +51,13 @@ void example() async { final worker = Worker(); worker.maxConcurrencies = 2; // Add a task - final task = worker.addTask(const ExampleOperation(), 1); + final task = worker.run(const ExampleOperation(), 1); // Wait for the task to complete - await task.wait(); // Result.completed('Hello World - 1') + await task.wait(); // 'Hello World - 1' } ``` -### Pause or cancel a task - -For tasks in progress, you need to check if the operation should be paused or canceled, as shown below: +### Pause task ```dart class CountdownOperation extends Operation { @@ -71,7 +67,7 @@ class CountdownOperation extends Operation { FutureOr> run(OperationContext context) async { int data = context.data; while (data > 0) { - await Future.delayed(const Duration(milliseconds: 1000)); + await Future.delayed(const Duration(milliseconds: 200)); data -= 1; /// Check if the operation should be paused or canceled @@ -87,50 +83,104 @@ class CountdownOperation extends Operation { } } -void example() { - task.cancel(); - task.pause(); - task.resume(); +void example() async { + final worker = Worker(); + final task = worker.run(const CountdownOperation(), 10, isPaused: true); + expect(task.status, TaskStatus.paused); + + task.resume(); + expect(task.status, TaskStatus.running); + await Future.delayed(const Duration(milliseconds: 400)); + expect(task.status, TaskStatus.running); + + task.pause(); + await Future.delayed(const Duration(milliseconds: 400)); + expect(task.status, TaskStatus.paused); + + task.resume(); + expect(task.status, TaskStatus.running); + + await task.wait(); + expect(task.status, TaskStatus.completed); } ``` -### Create hydrated task +### Cancel task + +```dart +void example() async { + final worker = Worker(); + final task = worker.run(const CountdownOperation(), 10); + expect(task.status, TaskStatus.running); + + await Future.delayed(const Duration(milliseconds: 400)); + task.cancel(); + + await task.wait().onError((error, stackTrace) { + debugPrint('Error: $error'); + }).whenComplete(() { + expect(task.status, TaskStatus.canceled); + }); +} +``` -To create a hydrated task, refer to the following code: +### Run a task in isolate ```dart -class ExampleHydratedOperation extends HydratedOperation { - const ExampleHydratedOperation(); +class CountdownComputeOperation extends CountdownOperation { + const CountdownComputeOperation(); @override - FutureOr> run(OperationContext context) async { - await Future.delayed(const Duration(seconds: 1)); - return Result.completed('Hello World - ${context.data}'); - } + bool get compute => true; +} - @override - toJson(int data) { - return data; - } +void example() async { + final worker = Worker(); + final task = worker.run(const CountdownComputeOperation(), 10); + _listenTask(task); + expect(task.status, TaskStatus.running); + await task.wait(); + expect(task.status, TaskStatus.completed); +} +``` + +### Run a hydrated task + + +```dart +class CountdownHydratedOperation extends CountdownOperation + implements HydratedOperation { + const CountdownHydratedOperation(); @override int fromJson(json) { return json; } + + @override + toJson(int data) { + return data; + } } void example() { - StorageManager.registerStorage(CustomStorage()); - StorageManager.registerOperation(() => const ExampleHydratedOperation()); -} + final storage = CustomStorage(); + final worker = HydratedWorker(storage: storage, identifier: 'test'); + worker.register(() => const CountdownHydratedOperation()); + + final task = worker.run(const CountdownHydratedOperation(), 10); + expect(task.status, TaskStatus.running); + await _ensureDataStored(); + + var list = await storage.readAll('test').toList(); + expect(list.length, 1); + + await task.wait(); + expect(task.status, TaskStatus.completed); + await _ensureDataStored(); + + list = await storage.readAll('test').toList(); + expect(list.length, 0);} ``` -
- More... - test - #### test - ```dart - final test = 'test'; - ``` -
diff --git a/example/lib/countdown_operation.dart b/example/lib/countdown_operation.dart index d45cc5a..6041427 100644 --- a/example/lib/countdown_operation.dart +++ b/example/lib/countdown_operation.dart @@ -8,9 +8,6 @@ class CountdownOperation extends HydratedOperation { @override String get name => 'Countdown'; - @override - bool get compute => true; - @override FutureOr> run(OperationContext context) async { int data = context.data; diff --git a/example/lib/main.dart b/example/lib/main.dart index 942c896..1f13c94 100644 --- a/example/lib/main.dart +++ b/example/lib/main.dart @@ -5,9 +5,6 @@ import 'package:flutter/material.dart'; import 'package:task_manager/task_manager.dart'; void main() async { - StorageManager.registerStorage(CustomStorage.adapter()); - StorageManager.registerOperation(() => const CountdownOperation()); - runApp(const MyApp()); } @@ -17,7 +14,7 @@ class MyApp extends StatelessWidget { @override Widget build(BuildContext context) { return MaterialApp( - title: 'Flutter Task Example', + title: 'Task Manager Example', debugShowCheckedModeBanner: false, theme: ThemeData( colorScheme: ColorScheme.fromSeed(seedColor: Colors.deepPurple), @@ -36,13 +33,19 @@ class MyHomePage extends StatefulWidget { } class _MyHomePageState extends State { - final Worker worker = Worker(); + late final HydratedWorker worker; @override void initState() { super.initState(); + worker = HydratedWorker( + storage: CustomStorage.adapter(), + identifier: 'default', + ); worker.maxConcurrencies = 2; + worker.register(() => const CountdownOperation()); + worker.loadTasks(); } void _addTasks() { diff --git a/example/lib/storage/custom_storage_io.dart b/example/lib/storage/custom_storage_io.dart index 12d55bf..3aa02eb 100644 --- a/example/lib/storage/custom_storage_io.dart +++ b/example/lib/storage/custom_storage_io.dart @@ -8,8 +8,8 @@ import 'package:task_manager/task_manager.dart'; class CustomStorageIOImpl extends CustomStorage { @override - Stream readAll(String identifier) async* { - final directory = await getDirectory(identifier); + Stream readAll(String worker) async* { + final directory = await getDirectory(worker); final list = await directory.list().toList(); for (var element in list) { if (!element.path.endsWith('json')) { @@ -23,8 +23,8 @@ class CustomStorageIOImpl extends CustomStorage { } @override - FutureOr write(TaskEntity task, String identifier) async { - return getFile(task.id, identifier).then( + FutureOr write(TaskEntity task, String worker) async { + return getFile(task.id, worker).then( (value) => value.writeAsString( json.encode(task.toJson()), ), @@ -32,8 +32,8 @@ class CustomStorageIOImpl extends CustomStorage { } @override - FutureOr delete(String taskId, String identifier) async { - return getFile(taskId, identifier).then((value) async { + FutureOr delete(String taskId, String worker) async { + return getFile(taskId, worker).then((value) async { if (await value.exists()) { value.delete(); } @@ -41,23 +41,23 @@ class CustomStorageIOImpl extends CustomStorage { } @override - FutureOr clear(String identifier) { - return getDirectory(identifier).then((value) => value.delete()); + FutureOr clear(String worker) { + return getDirectory(worker).then((value) => value.delete()); } @override FutureOr close() {} - Future getDirectory(String managerIdentifier) async { - final directory = Directory('task_storage/$managerIdentifier'); + Future getDirectory(String worker) async { + final directory = Directory('task_storage/$worker'); if (!(await directory.exists())) { await directory.create(recursive: true); } return directory; } - Future getFile(String taskId, String identifier) async { - final directory = await getDirectory(identifier); + Future getFile(String taskId, String worker) async { + final directory = await getDirectory(worker); final path = join(directory.path, '$taskId.json'); return File(path); } diff --git a/example/lib/storage/custom_storage_web.dart b/example/lib/storage/custom_storage_web.dart index 3e9671d..a919a78 100644 --- a/example/lib/storage/custom_storage_web.dart +++ b/example/lib/storage/custom_storage_web.dart @@ -21,12 +21,12 @@ class CustomStorageWebImpl extends CustomStorage { } @override - FutureOr clear(String identifier) async { + FutureOr clear(String worker) async { await _ready.future; _database.transaction((transaction) { return _store.delete( transaction, - finder: Finder(filter: Filter.equals('identifier', identifier)), + finder: Finder(filter: Filter.equals('worker', worker)), ); }); } @@ -37,7 +37,7 @@ class CustomStorageWebImpl extends CustomStorage { } @override - FutureOr delete(String taskId, String identifier) async { + FutureOr delete(String taskId, String worker) async { await _ready.future; await _database.transaction((transaction) { final record = _store.record(taskId); @@ -46,13 +46,13 @@ class CustomStorageWebImpl extends CustomStorage { } @override - Stream readAll(String identifier) { + Stream readAll(String worker) { final controller = StreamController(); _ready.future.then((value) { return _database.transaction((transaction) async { final finder = Finder( - filter: Filter.equals('identifier', identifier), + filter: Filter.equals('worker', worker), ); final records = await _store.find(transaction, finder: finder); for (var element in records) { @@ -71,14 +71,14 @@ class CustomStorageWebImpl extends CustomStorage { } @override - FutureOr write(TaskEntity task, String identifier) async { + FutureOr write(TaskEntity task, String worker) async { await _ready.future; await _database.transaction((transaction) { final record = _store.record(task.id); record.put( transaction, { - 'identifier': identifier, + 'worker': worker, 'entity': task.toJson(), }, ); diff --git a/example/lib/task_manager_view.dart b/example/lib/task_manager_view.dart index 0171a97..f850e25 100644 --- a/example/lib/task_manager_view.dart +++ b/example/lib/task_manager_view.dart @@ -215,8 +215,4 @@ class _TaskListItem extends StatelessWidget { return const SizedBox.shrink(); } } - - // Widget _buildProgressView(BuildContext context, {required Task task}) { - // return - // } } diff --git a/lib/src/scheduling/hydrated_worker.dart b/lib/src/scheduling/hydrated_worker.dart new file mode 100644 index 0000000..d6fe3c3 --- /dev/null +++ b/lib/src/scheduling/hydrated_worker.dart @@ -0,0 +1,149 @@ +part of '../../task_manager.dart'; + +class HydratedWorkerImpl extends WorkerImpl implements HydratedWorker { + HydratedWorkerImpl({ + required this.storage, + required this.identifier, + }) : super._(); + + final Storage storage; + final String identifier; + + final Map _builders = {}; + // final Map> _entities = {}; + + @override + Task run( + covariant HydratedOperation operation, D initialData, + {bool isPaused = false, + TaskPriority priority = TaskPriority.normal, + TaskIdentifier? identifier, + TaskIdentifierStrategy strategy = TaskIdentifierStrategy.reuse}) { + if (!_builders.containsKey(operation.runtimeType.toString())) { + throw ArgumentError('Operation not registered'); + } + final task = super.run( + operation, + initialData, + isPaused: isPaused, + priority: priority, + identifier: identifier, + strategy: strategy, + ); + + /// Write the task to the storage + _writeTask(task); + return task; + } + + @override + bool add(TaskImpl task) { + if (super.add(task)) { + /// Listen to the task stream and write the task to the storage + task.stream.listen((event) { + switch (event.status) { + case TaskStatus.running: + case TaskStatus.pending: + case TaskStatus.paused: + _writeTask(event); + break; + case TaskStatus.canceled: + case TaskStatus.completed: + case TaskStatus.error: + _deleteTask(event.id); + break; + default: + } + }); + return true; + } else { + return false; + } + } + + @override + void register(HydratedOperation Function() create) { + final operation = create(); + final builder = HydratedTaskBuilder(create); + _builders[operation.runtimeType.toString()] = builder; + } + + @override + Stream loadTasks() { + final controller = StreamController(); + + Future.microtask(() async { + await for (var entity in storage.readAll(identifier)) { + final builder = _builders[entity.operation]; + if (builder == null) { + continue; + } + try { + final task = builder.build(entity); + if (add(task)) { + controller.add(task); + } + } catch (e) { + _deleteTask(entity.id); + } + } + }).whenComplete(() { + controller.close(); + }); + + return controller.stream; + } + + void _writeTask(Task task) { + final operation = task.operation; + if (operation is! HydratedOperation) { + return; + } + storage.write( + TaskEntity( + operation: operation.runtimeType.toString(), + id: task.id, + identifier: task.identifier, + isPaused: task.status == TaskStatus.paused, + priority: task.priority, + data: operation.toJson(task.data), + ), + identifier, + ); + } + + void _deleteTask(TaskId id) { + storage.delete(id, identifier); + } +} + +typedef HydratedOperationCreator = T Function(); + +class HydratedTaskBuilder { + const HydratedTaskBuilder(this.create); + + final HydratedOperationCreator> create; + + TaskImpl build(TaskEntity entity) { + final operation = create(); + final initialData = operation.fromJson(entity.data); + final status = entity.isPaused ? TaskStatus.paused : TaskStatus.pending; + final context = operation.compute + ? IsolateOperationContextImpl( + initialData: initialData, + priority: entity.priority, + status: status, + ) + : OperationContextImpl( + initialData: initialData, + priority: entity.priority, + status: status, + ); + return TaskImpl( + operation: operation, + context: context, + id: entity.id, + identifier: entity.identifier, + ); + } +} diff --git a/lib/src/scheduling/scheduler.dart b/lib/src/scheduling/scheduler.dart index 955fa50..85180b1 100644 --- a/lib/src/scheduling/scheduler.dart +++ b/lib/src/scheduling/scheduler.dart @@ -2,34 +2,13 @@ part of '../../task_manager.dart'; typedef TaskIdentifier = String; typedef TaskId = String; -typedef SchedulerIdentifier = String; enum TaskIdentifierStrategy { reuse, // Reuse tasks with the same identifier cancel, // Cancel the previous task with the same identifier } -mixin Scheduleable { - Scheduler? _scheduler; - - Scheduler? get scheduler => _scheduler; - - void ensureInitialized(Scheduler value) { - if (_scheduler == null) { - _scheduler = value; - } else { - throw StateError('Task is already initialized'); - } - } - - TaskStatus get status; - - FutureOr run(); -} - abstract class Scheduler { - SchedulerIdentifier get identifier; - Stream get stream; int maxConcurrencies = 4; @@ -66,8 +45,6 @@ abstract class Scheduler { void clear(); - // void cancelWithIdentifier(String identifier); - operator [](TaskId id); } @@ -79,13 +56,7 @@ class SchedulerImpl extends Scheduler { final Map _taskOfId = {}; final Map _taskOfIdentifier = {}; - SchedulerImpl({ - required this.executeTask, - required this.identifier, - }); - - @override - final String identifier; + SchedulerImpl({required this.executeTask}); final Future Function(TaskImpl task) executeTask; @@ -295,3 +266,21 @@ class SchedulerImpl extends Scheduler { @override operator [](TaskId id) => _taskOfId[id]; } + +mixin Scheduleable { + Scheduler? _scheduler; + + Scheduler? get scheduler => _scheduler; + + void ensureInitialized(Scheduler value) { + if (_scheduler == null) { + _scheduler = value; + } else { + throw StateError('Task is already initialized'); + } + } + + TaskStatus get status; + + FutureOr run(); +} diff --git a/lib/src/scheduling/worker.dart b/lib/src/scheduling/worker.dart index e54db4b..ad43ff7 100644 --- a/lib/src/scheduling/worker.dart +++ b/lib/src/scheduling/worker.dart @@ -1,29 +1,16 @@ part of '../../task_manager.dart'; -final Map _workers = {}; - class WorkerImpl extends Worker { - WorkerImpl._(String identifier) : super._() { + WorkerImpl._() : super._() { _scheduler = SchedulerImpl( executeTask: _executeTask, - identifier: identifier, ); } - factory WorkerImpl([String identifier = 'default']) { - var worker = _workers[identifier]; - if (worker != null) { - return worker; - } - worker = WorkerImpl._(identifier); - _workers[identifier] = worker; - return worker; - } + factory WorkerImpl() => WorkerImpl._(); late final Scheduler _scheduler; - String get identifier => _scheduler.identifier; - @override int get maxConcurrencies => _scheduler.maxConcurrencies; @@ -51,6 +38,11 @@ class WorkerImpl extends Worker { @override void clear() => _scheduler.clear(); + @override + void cancelTask(String identifier) { + _scheduler.taskOfIdentifier(identifier)?.cancel(); + } + @override Task run(Operation operation, D initialData, {bool isPaused = false, @@ -63,17 +55,12 @@ class WorkerImpl extends Worker { strategy: strategy, ifAbsent: () => TaskImpl( operation: operation, - context: operation.compute - ? IsolateOperationContextImpl( - initialData: initialData, - priority: priority, - status: isPaused ? TaskStatus.paused : TaskStatus.pending, - ) - : OperationContextImpl( - initialData: initialData, - priority: priority, - status: isPaused ? TaskStatus.paused : TaskStatus.pending, - ), + context: createContext( + operation: operation, + initialData: initialData, + priority: priority, + isPaused: isPaused, + ), identifier: identifier, ), ); @@ -98,10 +85,36 @@ class WorkerImpl extends Worker { } } final task = ifAbsent(); - _scheduler.add(task); + add(task); return task; } + bool add(TaskImpl task) { + return _scheduler.add(task); + } + + OperationContextImpl createContext({ + required Operation operation, + required D initialData, + required TaskPriority priority, + required bool isPaused, + }) { + final status = isPaused ? TaskStatus.paused : TaskStatus.pending; + if (operation.compute) { + return IsolateOperationContextImpl( + initialData: initialData, + priority: priority, + status: status, + ); + } else { + return OperationContextImpl( + initialData: initialData, + priority: priority, + status: status, + ); + } + } + Future _executeTask(TaskImpl task) { return task.run(); } diff --git a/lib/src/storage/storage.dart b/lib/src/storage/storage.dart index dc6fc62..6b5836f 100644 --- a/lib/src/storage/storage.dart +++ b/lib/src/storage/storage.dart @@ -3,13 +3,13 @@ part of '../../task_manager.dart'; abstract class Storage { const Storage(); - Stream readAll(String identifier); + Stream readAll(String worker); - FutureOr write(TaskEntity task, String identifier); + FutureOr write(TaskEntity task, String worker); - FutureOr delete(String taskId, String identifier); + FutureOr delete(String taskId, String worker); - FutureOr clear(String identifier); + FutureOr clear(String worker); FutureOr close(); } @@ -17,29 +17,26 @@ abstract class Storage { final class TaskEntity { TaskEntity({ required this.operation, - // required this.type, required this.id, required this.identifier, - required this.status, + required this.isPaused, required this.priority, required this.data, }); final String operation; - // final String type; final String id; final String? identifier; - final TaskStatus status; + final bool isPaused; final TaskPriority priority; final dynamic data; factory TaskEntity.fromJson(Map json) { return TaskEntity( operation: json['operation'], - // type: json['type'], id: json['id'], identifier: json['identifier'], - status: TaskStatus.values[json['status']], + isPaused: json['isPaused'], priority: TaskPriority.values[json['priority']], data: json['data'], ); @@ -48,10 +45,9 @@ final class TaskEntity { Map toJson() { return { 'operation': operation, - // 'type': type, 'id': id, 'identifier': identifier, - 'status': status.index, + 'isPaused': isPaused, 'priority': priority.index, 'data': data, }; diff --git a/lib/src/storage/storage_manager.dart b/lib/src/storage/storage_manager.dart deleted file mode 100644 index b0418c8..0000000 --- a/lib/src/storage/storage_manager.dart +++ /dev/null @@ -1,93 +0,0 @@ -part of '../../task_manager.dart'; - -typedef OperationCreater = T Function(); - -class StorageManager { - static Storage? _storage; - - static void registerStorage(Storage storage) { - _storage = storage; - } - - static final Map _registeredOperations = {}; - - static void registerOperation( - OperationCreater create) { - _registeredOperations[T.toString()] = create; - } - - static void listenTask(TaskImpl task) { - final operation = task.operation; - if (!_registeredOperations.containsKey(operation.runtimeType.toString())) { - return; - } - // task.stream.listen((event) { - // switch (event.status) { - // case TaskStatus.running: - // case TaskStatus.paused: - // case TaskStatus.pending: - // if (task.scheduler != null) { - // saveTask(task, task.scheduler!.identifier); - // } - // break; - // case TaskStatus.canceled: - // case TaskStatus.completed: - // case TaskStatus.error: - // if (task.scheduler != null) { - // deleteTask(task.id, task.scheduler!.identifier); - // } - // break; - // default: - // } - // }); - } - - static Future saveTask( - TaskEntity entity, - SchedulerIdentifier identifier, - ) async { - if (_storage == null) { - return; - } - final type = entity.operation; - if (!_registeredOperations.containsKey(type)) { - return; - } - await _storage!.write(entity, identifier); - } - - static void deleteTask(TaskId id, SchedulerIdentifier identifier) { - _storage!.delete(id, identifier); - } - - static Stream<(HydratedOperation, TaskEntity)> loadTaskEntity( - SchedulerIdentifier schedulerIdentifier) { - if (_storage == null) { - return const Stream.empty(); - } - return _storage! - .readAll(schedulerIdentifier) - .map((event) { - final creater = _registeredOperations[event.operation]; - if (creater == null) { - _storage!.delete(event.id, schedulerIdentifier); - return null; - } - try { - return (creater(), event); - } catch (e) { - debugPrint('Error when loading task: ${event.id} - $e'); - _storage!.delete(event.id, schedulerIdentifier); - } - }) - .skipWhile((element) => element == null) - .cast<(HydratedOperation, TaskEntity)>(); - } - - static FutureOr clear(SchedulerIdentifier schedulerIdentifier) { - if (_storage == null) { - return Future.value(); - } - return _storage!.clear(schedulerIdentifier); - } -} diff --git a/lib/src/utils/priority_queue.dart b/lib/src/utils/priority_queue.dart index 779dffa..d144fcd 100644 --- a/lib/src/utils/priority_queue.dart +++ b/lib/src/utils/priority_queue.dart @@ -122,11 +122,11 @@ class PriorityQueueImpl implements PriorityQueue { if (_priorities.isEmpty) { throw StateError('PriorityQueue is empty'); } - final priority = _priorities.first; + final priority = _priorities.last; // highest priority final list = _listOfPriority[priority]!; final element = list.removeAt(0); if (list.isEmpty) { - _priorities.removeAt(0); + _priorities.removeLast(); _listOfPriority.remove(priority); } return element; diff --git a/lib/task_manager.dart b/lib/task_manager.dart index 9e8302b..94d26b4 100644 --- a/lib/task_manager.dart +++ b/lib/task_manager.dart @@ -15,9 +15,9 @@ part 'src/operation/isolate_operation_context_impl.dart'; part 'src/scheduling/scheduler.dart'; part 'src/scheduling/worker.dart'; +part 'src/scheduling/hydrated_worker.dart'; part 'src/storage/storage.dart'; -part 'src/storage/storage_manager.dart'; abstract class OperationContext { D get data; @@ -96,15 +96,6 @@ abstract class Worker { TaskIdentifierStrategy strategy = TaskIdentifierStrategy.reuse, }); - // void registerScheduledTask( - // String name, - // Duration duration, - // Task Function() builder, { - // TaskPriority priority = TaskPriority.normal, - // }) { - // throw UnimplementedError(); - // } - void registerRepeatedTask( Operation operation, D initialData, { @@ -123,13 +114,20 @@ abstract class Worker { throw UnimplementedError(); } - /// Wait for all tasks to complete + void registerScheduledTask( + String name, + Duration duration, + Task Function() builder, { + TaskPriority priority = TaskPriority.normal, + }) { + throw UnimplementedError(); + } + Future wait(); - /// Clear all tasks void clear(); - // Future cancelTaskWithIdentifier(TaskIdentifier identifier); + void cancelTask(String identifier); } abstract class HydratedOperation extends Operation { @@ -139,10 +137,25 @@ abstract class HydratedOperation extends Operation { dynamic toJson(D data); } -abstract class HydratedWorker extends Worker { - HydratedWorker._() : super._(); +abstract class HydratedWorker implements Worker { + factory HydratedWorker({ + required Storage storage, + required String identifier, + }) = HydratedWorkerImpl; - // Stream loadTasks(); + @override + Task run( + covariant HydratedOperation operation, + D initialData, { + bool isPaused = false, + TaskPriority priority = TaskPriority.normal, + TaskIdentifier? identifier, + TaskIdentifierStrategy strategy = TaskIdentifierStrategy.reuse, + }); + + void register(HydratedOperation Function() create); + + Stream loadTasks(); } enum TaskPriority { @@ -152,24 +165,3 @@ enum TaskPriority { high, veryHigh, } - -// mixin ComputeMixin { -// // -// } - -// mixin HydratedMixin { -// D fromJson(dynamic json); -// dynamic toJson(D data); -// } - - -// final class BlockOperation extends Operation { -// const BlockOperation(this.block); - -// final FutureOr> Function(OperationContext context) block; - -// @override -// FutureOr> run(OperationContext context) async { -// return block(context); -// } -// } \ No newline at end of file diff --git a/test/memory_storage.dart b/test/memory_storage.dart index f1466be..c89af04 100644 --- a/test/memory_storage.dart +++ b/test/memory_storage.dart @@ -20,7 +20,8 @@ class MemoryStorage extends Storage { @override Stream readAll(String identifier) { - return Stream.fromIterable(_caches[identifier]?.values ?? []); + final list = _caches[identifier]?.values.toList() ?? []; + return Stream.fromIterable(List.from(list)); } @override diff --git a/test/task_manager_test.dart b/test/task_manager_test.dart index af0fe88..491cf8b 100644 --- a/test/task_manager_test.dart +++ b/test/task_manager_test.dart @@ -7,150 +7,139 @@ import 'package:task_manager/task_manager.dart'; import 'memory_storage.dart'; void main() { - // setUp(() { - // WidgetsFlutterBinding.ensureInitialized(); - // debugPrint('setUp'); - // }); + test('Run a task', () async { + final worker = Worker(); + final task = worker.run(const CountdownOperation(), 10); + expect(task.status, TaskStatus.running); + await task.wait(); + expect(task.status, TaskStatus.completed); + expect(worker.length, 0); + }); - Future ensureDataStored() { - return Future.delayed(const Duration(milliseconds: 100)); - } + test('Pause task', () async { + final worker = Worker(); + final task = worker.run(const CountdownOperation(), 10, isPaused: true); + expect(task.status, TaskStatus.paused); - test('xxx', () async { - final Completer completer = Completer(); + task.resume(); + expect(task.status, TaskStatus.running); + await Future.delayed(const Duration(milliseconds: 400)); + expect(task.status, TaskStatus.running); - Future.delayed(const Duration(seconds: 1), () { - // completer.complete(1); - // completer.complete(); - completer.completeError(CanceledException()); - }); + task.pause(); + await Future.delayed(const Duration(milliseconds: 400)); + expect(task.status, TaskStatus.paused); - try { - debugPrint('${await completer.future}'); - } catch (e) { - debugPrint('catch: $e'); - } + task.resume(); + expect(task.status, TaskStatus.running); + + await task.wait(); + expect(task.status, TaskStatus.completed); }); - test('run a task', () async { - final worker = WorkerImpl(); - final task = worker.addCountdownTask(10); + test('Cancel task', () async { + final worker = Worker(); + final task = worker.run(const CountdownOperation(), 10); + expect(task.status, TaskStatus.running); - task.stream.listen((event) { - debugPrint('CountdownTask: ${event.data} - ${event.status}'); - if (event.data == 0) { - debugPrint('Finish'); - } + await Future.delayed(const Duration(milliseconds: 400)); + task.cancel(); + + await task.wait().onError((error, stackTrace) { + debugPrint('Error: $error'); + }).whenComplete(() { + expect(task.status, TaskStatus.canceled); }); + }); + test('Run a task in isolate', () async { + final worker = Worker(); + final task = worker.run(const CountdownComputeOperation(), 10); + _listenTask(task); expect(task.status, TaskStatus.running); await task.wait(); expect(task.status, TaskStatus.completed); - expect(task.data, 0); - expect(worker.length, 0); }); - test('storage', () async { - /// Initialize - final worker = WorkerImpl(); - StorageManager.registerStorage(MemoryStorage()); - StorageManager.registerOperation(() => const CountdownOperation()); - StorageManager.clear(worker.identifier); - - /// Add a paused task - final task = worker.addCountdownTask(6, isPaused: true); - expect(task.status, TaskStatus.paused); - expect(worker.length, 1); - await ensureDataStored(); + test('Run a hydrated task', () async { + final storage = MemoryStorage(); + final worker = HydratedWorker(storage: storage, identifier: 'test'); + worker.register(() => const CountdownHydratedOperation()); + final task = worker.run(const CountdownHydratedOperation(), 10); + expect(task.status, TaskStatus.running); + await _ensureDataStored(); - worker.clear(); - expect(worker.length, 0); - var list = await worker.loadTasksWithStorage().toList(); + var list = await storage.readAll('test').toList(); expect(list.length, 1); - expect(list[0].status, TaskStatus.paused); - /// Resume the task to completed - list[0].resume(); - await list[0].wait(); - await ensureDataStored(); - expect(worker.length, 0); + await task.wait(); + expect(task.status, TaskStatus.completed); + await _ensureDataStored(); - /// Load tasks from storage - list = await worker.loadTasksWithStorage().toList(); + list = await storage.readAll('test').toList(); expect(list.length, 0); }); - test('worker', () async { + test('Task priority', () async { final worker = WorkerImpl(); - worker.maxConcurrencies = 2; - worker.stream.listen((event) { - String runningTasks = ""; - for (var element in worker.runningTasks) { - runningTasks += '${element.id}, '; - } - debugPrint('------'); - debugPrint('runningTasks: $runningTasks'); + worker.maxConcurrencies = 1; - String pendingTasks = ""; - for (var element in worker.pendingTasks) { - pendingTasks += '${element.id}, '; - } - debugPrint('pendingTasks: $pendingTasks'); - }); + /// Will be executed in the order of task1 task3 task2 - for (var i = 0; i < 4; i++) { - worker.addCountdownTask(6); - } - expect(worker.length, 4); + final task1 = worker.run( + const CountdownOperation(), + 10, + priority: TaskPriority.normal, + ); - await worker.wait(); - debugPrint('All tasks completed'); - expect(worker.length, 0); - }); + final task2 = worker.run( + const CountdownOperation(), + 10, + priority: TaskPriority.low, + ); - test('run isolate task', () async { - final worker = IsolateWorker(); - final task = worker.addCountdownTask(10); + final task3 = worker.run( + const CountdownOperation(), + 10, + priority: TaskPriority.high, + ); - task.stream.listen((event) { - debugPrint('CountdownTask: ${event.data} - ${event.status}'); - if (event.data == 0) { - debugPrint('Finish'); - } - }); + _listenTask(task1); + _listenTask(task2); + _listenTask(task3); - await Future.delayed(const Duration(milliseconds: 400)); - debugPrint('Pause'); - task.pause(); + await task1.wait(); + expect(task1.status, TaskStatus.completed); + expect(task2.status, TaskStatus.pending); + expect(task3.status, TaskStatus.running); - await Future.delayed(const Duration(milliseconds: 400)); - expect(task.status, TaskStatus.paused); + await task3.wait(); + expect(task3.status, TaskStatus.completed); + expect(task2.status, TaskStatus.running); - await Future.delayed(const Duration(seconds: 4)); + await task2.wait(); + expect(task2.status, TaskStatus.completed); - debugPrint('Resume'); - task.resume(); - expect(task.status, TaskStatus.running); - - await Future.delayed(const Duration(milliseconds: 400)); - debugPrint('Cancel'); - task.cancel(); + expect(worker.length, 0); + }); +} - try { - await task.wait(); - } catch (e) { - debugPrint('catch: $e'); +void _listenTask(Task task) { + task.stream.listen((event) { + if (event.status == TaskStatus.running || + event.status == TaskStatus.paused) { + debugPrint('${event.name}: ${event.data} - ${event.status}'); + } else { + debugPrint('${event.name}: ${event.status}'); } - - await worker.wait(); - debugPrint('done'); - - expect(task.status, TaskStatus.canceled); - expect(worker.length, 0); }); } -class CountdownOperation extends HydratedOperation { +Future _ensureDataStored() { + return Future.delayed(const Duration(milliseconds: 100)); +} + +class CountdownOperation extends Operation { const CountdownOperation(); @override @@ -170,6 +159,11 @@ class CountdownOperation extends HydratedOperation { } } } +} + +class CountdownHydratedOperation extends CountdownOperation + implements HydratedOperation { + const CountdownHydratedOperation(); @override int fromJson(json) { @@ -182,13 +176,9 @@ class CountdownOperation extends HydratedOperation { } } -extension on WorkerImpl { - TaskImpl> addCountdownTask(int initialData, - {bool isPaused = false}) { - return addTask( - const CountdownOperation(), - initialData, - isPaused: isPaused, - ); - } +class CountdownComputeOperation extends CountdownOperation { + const CountdownComputeOperation(); + + @override + bool get compute => true; }