From 4b03d2da954e46cdfbe9ef273f74e55a5e9e34ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=A8=E9=A3=8E?= Date: Mon, 5 Feb 2024 15:40:41 +0800 Subject: [PATCH] Use TransferableTypedData to avoid data copying --- README.md | 1 - .../isolate_operation_context_impl.dart | 9 +++-- lib/src/operation/operation_context_impl.dart | 2 +- lib/src/scheduling/hydrated_worker.dart | 2 +- lib/src/scheduling/worker.dart | 28 +++++++++++-- lib/src/task/result.dart | 40 +++++++++++++++++++ lib/task_manager.dart | 12 ++---- 7 files changed, 75 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 5815137..6a6abd5 100644 --- a/README.md +++ b/README.md @@ -82,7 +82,6 @@ void example() async { task.resume(); expect(task.status, TaskStatus.running); await Future.delayed(const Duration(milliseconds: 400)); - expect(task.status, TaskStatus.running); task.pause(); await Future.delayed(const Duration(milliseconds: 400)); diff --git a/lib/src/operation/isolate_operation_context_impl.dart b/lib/src/operation/isolate_operation_context_impl.dart index 7f55554..5e37d16 100644 --- a/lib/src/operation/isolate_operation_context_impl.dart +++ b/lib/src/operation/isolate_operation_context_impl.dart @@ -45,23 +45,24 @@ class IsolateOperationContextImpl extends OperationContextImpl { final operation = message[0] as Operation; final context = message[1] as IsolateOperationContextImplWrapper; context.ensureInitialized(); - return await operation.run(context); + final result = await operation.run(context); + return result.tryToTransferableTypedData(); }, [ operation, context, ], ); - _handlerResult(result as Result); + _handlerResult(result.tryFromTransferableTypedData()); return result.type; } catch (e) { - _handlerResult(Result.error(e)); + _handlerResult(Result.error(e)); return ResultType.error; } } @override - void _handlerResult(Result result) { + void _handlerResult(Result result) { super._handlerResult(result); switch (result.type) { case ResultType.canceled: diff --git a/lib/src/operation/operation_context_impl.dart b/lib/src/operation/operation_context_impl.dart index a446475..2a6a56e 100644 --- a/lib/src/operation/operation_context_impl.dart +++ b/lib/src/operation/operation_context_impl.dart @@ -108,7 +108,7 @@ class OperationContextImpl extends OperationContext { } } - void _handlerResult(Result result) { + void _handlerResult(Result result) { switch (result.type) { case ResultType.paused: _flag = TaskFlag.none; diff --git a/lib/src/scheduling/hydrated_worker.dart b/lib/src/scheduling/hydrated_worker.dart index d6fe3c3..ad5391c 100644 --- a/lib/src/scheduling/hydrated_worker.dart +++ b/lib/src/scheduling/hydrated_worker.dart @@ -4,7 +4,7 @@ class HydratedWorkerImpl extends WorkerImpl implements HydratedWorker { HydratedWorkerImpl({ required this.storage, required this.identifier, - }) : super._(); + }) : super(); final Storage storage; final String identifier; diff --git a/lib/src/scheduling/worker.dart b/lib/src/scheduling/worker.dart index ad43ff7..b0fdd71 100644 --- a/lib/src/scheduling/worker.dart +++ b/lib/src/scheduling/worker.dart @@ -1,14 +1,12 @@ part of '../../task_manager.dart'; -class WorkerImpl extends Worker { - WorkerImpl._() : super._() { +class WorkerImpl implements Worker { + WorkerImpl() { _scheduler = SchedulerImpl( executeTask: _executeTask, ); } - factory WorkerImpl() => WorkerImpl._(); - late final Scheduler _scheduler; @override @@ -66,6 +64,28 @@ class WorkerImpl extends Worker { ); } + @override + void registerRepeatedTask(Operation operation, D initialData, + {required String name, + required Duration timeInterval, + TaskPriority priority = TaskPriority.normal, + Duration Function(R result, int runCount, int runTime, + Duration previousTimeInterval)? + nextTimeInterval, + bool Function(R? result, dynamic error, int runCount, int runTime)? + terminate}) { + // TODO: implement registerRepeatedTask + throw UnimplementedError(); + } + + @override + void registerScheduledTask( + String name, Duration duration, Task Function() builder, + {TaskPriority priority = TaskPriority.normal}) { + // TODO: implement registerScheduledTask + throw UnimplementedError(); + } + Task _putIfAbsent({ required TaskIdentifier? identifier, required TaskPriority priority, diff --git a/lib/src/task/result.dart b/lib/src/task/result.dart index 967433e..1bdf547 100644 --- a/lib/src/task/result.dart +++ b/lib/src/task/result.dart @@ -47,6 +47,46 @@ class Result { return 'Error$suffix'; } } + + Result tryToTransferableTypedData() { + switch (type) { + case ResultType.completed: + if (result is Uint8List) { + return Result.completed( + TransferableTypedData.fromList([result as Uint8List]), + ); + } + case ResultType.paused: + if (data is Uint8List) { + return Result.paused( + TransferableTypedData.fromList([result as Uint8List]), + ); + } + break; + default: + } + return this; + } + + Result tryFromTransferableTypedData() { + switch (type) { + case ResultType.completed: + if (result is TransferableTypedData) { + return Result.completed( + (result as TransferableTypedData).materialize().asUint8List(), + ); + } + case ResultType.paused: + if (data is TransferableTypedData) { + return Result.paused( + (data as TransferableTypedData).materialize().asUint8List(), + ); + } + break; + default: + } + return this; + } } enum ResultType { diff --git a/lib/task_manager.dart b/lib/task_manager.dart index 94d26b4..5a555e9 100644 --- a/lib/task_manager.dart +++ b/lib/task_manager.dart @@ -2,6 +2,7 @@ library task_manager; import 'dart:async'; import 'dart:isolate'; +import 'dart:typed_data'; import 'package:flutter/foundation.dart'; import 'package:task_manager/src/utils/generate_incremental_id.dart'; @@ -73,8 +74,7 @@ abstract class Task { } abstract class Worker { - Worker._(); - factory Worker() => WorkerImpl(); + factory Worker() = WorkerImpl; int get maxConcurrencies; set maxConcurrencies(int value); @@ -110,18 +110,14 @@ abstract class Worker { )? nextTimeInterval, bool Function(R? result, dynamic error, int runCount, int runTime)? terminate, - }) { - throw UnimplementedError(); - } + }); void registerScheduledTask( String name, Duration duration, Task Function() builder, { TaskPriority priority = TaskPriority.normal, - }) { - throw UnimplementedError(); - } + }); Future wait();