Skip to content

Commit

Permalink
Use TransferableTypedData to avoid data copying
Browse files Browse the repository at this point in the history
  • Loading branch information
cezres committed Feb 5, 2024
1 parent cf92fb6 commit 4b03d2d
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 19 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ void example() async {
task.resume();
expect(task.status, TaskStatus.running);
await Future.delayed(const Duration(milliseconds: 400));
expect(task.status, TaskStatus.running);
task.pause();
await Future.delayed(const Duration(milliseconds: 400));
Expand Down
9 changes: 5 additions & 4 deletions lib/src/operation/isolate_operation_context_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,24 @@ class IsolateOperationContextImpl<D, R> extends OperationContextImpl<D, R> {
final operation = message[0] as Operation;
final context = message[1] as IsolateOperationContextImplWrapper;
context.ensureInitialized();
return await operation.run(context);
final result = await operation.run(context);
return result.tryToTransferableTypedData();
},
[
operation,
context,
],
);
_handlerResult(result as Result<D, R>);
_handlerResult(result.tryFromTransferableTypedData());
return result.type;
} catch (e) {
_handlerResult(Result<D, R>.error(e));
_handlerResult(Result.error(e));
return ResultType.error;
}
}

@override
void _handlerResult(Result<D, R> result) {
void _handlerResult(Result result) {
super._handlerResult(result);
switch (result.type) {
case ResultType.canceled:
Expand Down
2 changes: 1 addition & 1 deletion lib/src/operation/operation_context_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class OperationContextImpl<D, R> extends OperationContext<D, R> {
}
}

void _handlerResult(Result<D, R> result) {
void _handlerResult(Result result) {
switch (result.type) {
case ResultType.paused:
_flag = TaskFlag.none;
Expand Down
2 changes: 1 addition & 1 deletion lib/src/scheduling/hydrated_worker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ class HydratedWorkerImpl extends WorkerImpl implements HydratedWorker {
HydratedWorkerImpl({
required this.storage,
required this.identifier,
}) : super._();
}) : super();

final Storage storage;
final String identifier;
Expand Down
28 changes: 24 additions & 4 deletions lib/src/scheduling/worker.dart
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
part of '../../task_manager.dart';

class WorkerImpl extends Worker {
WorkerImpl._() : super._() {
class WorkerImpl implements Worker {
WorkerImpl() {
_scheduler = SchedulerImpl(
executeTask: _executeTask,
);
}

factory WorkerImpl() => WorkerImpl._();

late final Scheduler _scheduler;

@override
Expand Down Expand Up @@ -66,6 +64,28 @@ class WorkerImpl extends Worker {
);
}

@override
void registerRepeatedTask<D, R>(Operation<D, R> operation, D initialData,
{required String name,
required Duration timeInterval,
TaskPriority priority = TaskPriority.normal,
Duration Function(R result, int runCount, int runTime,
Duration previousTimeInterval)?
nextTimeInterval,
bool Function(R? result, dynamic error, int runCount, int runTime)?
terminate}) {
// TODO: implement registerRepeatedTask
throw UnimplementedError();
}

@override
void registerScheduledTask<D, R>(
String name, Duration duration, Task<D, R> Function() builder,
{TaskPriority priority = TaskPriority.normal}) {
// TODO: implement registerScheduledTask
throw UnimplementedError();
}

Task<D, R> _putIfAbsent<D, R>({
required TaskIdentifier? identifier,
required TaskPriority priority,
Expand Down
40 changes: 40 additions & 0 deletions lib/src/task/result.dart
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,46 @@ class Result<D, R> {
return 'Error$suffix';
}
}

Result tryToTransferableTypedData() {
switch (type) {
case ResultType.completed:
if (result is Uint8List) {
return Result.completed(
TransferableTypedData.fromList([result as Uint8List]),
);
}
case ResultType.paused:
if (data is Uint8List) {
return Result.paused(
TransferableTypedData.fromList([result as Uint8List]),
);
}
break;
default:
}
return this;
}

Result tryFromTransferableTypedData() {
switch (type) {
case ResultType.completed:
if (result is TransferableTypedData) {
return Result.completed(
(result as TransferableTypedData).materialize().asUint8List(),
);
}
case ResultType.paused:
if (data is TransferableTypedData) {
return Result.paused(
(data as TransferableTypedData).materialize().asUint8List(),
);
}
break;
default:
}
return this;
}
}

enum ResultType {
Expand Down
12 changes: 4 additions & 8 deletions lib/task_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ library task_manager;

import 'dart:async';
import 'dart:isolate';
import 'dart:typed_data';

import 'package:flutter/foundation.dart';
import 'package:task_manager/src/utils/generate_incremental_id.dart';
Expand Down Expand Up @@ -73,8 +74,7 @@ abstract class Task<D, R> {
}

abstract class Worker {
Worker._();
factory Worker() => WorkerImpl();
factory Worker() = WorkerImpl;

int get maxConcurrencies;
set maxConcurrencies(int value);
Expand Down Expand Up @@ -110,18 +110,14 @@ abstract class Worker {
)? nextTimeInterval,
bool Function(R? result, dynamic error, int runCount, int runTime)?
terminate,
}) {
throw UnimplementedError();
}
});

void registerScheduledTask<D, R>(
String name,
Duration duration,
Task<D, R> Function() builder, {
TaskPriority priority = TaskPriority.normal,
}) {
throw UnimplementedError();
}
});

Future<void> wait();

Expand Down

0 comments on commit 4b03d2d

Please sign in to comment.