Skip to content

Commit

Permalink
Support compute task
Browse files Browse the repository at this point in the history
  • Loading branch information
cezres committed Feb 3, 2024
1 parent 82e2cba commit bd94a1a
Show file tree
Hide file tree
Showing 16 changed files with 446 additions and 650 deletions.
3 changes: 3 additions & 0 deletions example/lib/countdown_operation.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ class CountdownOperation extends HydratedOperation<int, void> {
@override
String get name => 'Countdown';

@override
bool get compute => true;

@override
FutureOr<Result<int, void>> run(OperationContext<int, void> context) async {
int data = context.data;
Expand Down
3 changes: 1 addition & 2 deletions example/lib/main.dart
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,11 @@ class _MyHomePageState extends State<MyHomePage> {
super.initState();

worker.maxConcurrencies = 2;
worker.loadTasksWithStorage();
}

void _addTasks() {
for (var i = 0; i < 1; i++) {
worker.addTask(const CountdownOperation(), 60);
worker.run(const CountdownOperation(), 60);
}
}

Expand Down
21 changes: 9 additions & 12 deletions example/lib/task_manager_view.dart
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,15 @@ class TaskManagerView extends StatelessWidget {
),
const Divider(height: 1, thickness: 1),
Expanded(
child: Container(
// color: color,
child: StreamBuilder(
initialData: tasks(worker),
stream: worker.stream
.map((event) => tasks(event))
.distinct(listEquals),
builder: (context, snapshot) {
final tasks = snapshot.requireData;
return _TaskListView(tasks: tasks);
},
),
child: StreamBuilder(
initialData: tasks(worker),
stream: worker.stream
.map((event) => tasks(event))
.distinct(listEquals),
builder: (context, snapshot) {
final tasks = snapshot.requireData;
return _TaskListView(tasks: tasks);
},
),
)
],
Expand Down
137 changes: 59 additions & 78 deletions lib/src/operation/isolate_operation_context_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,66 @@ part of '../../task_manager.dart';
class IsolateOperationContextImpl<D, R> extends OperationContextImpl<D, R> {
IsolateOperationContextImpl({
required super.initialData,
super.id,
super.identifier,
super.priority,
super.status,
}) {
_receivePort.listen((message) {
debugPrint('receive: $message');
if (message is SendPort) {
_sendPort.complete(message);
message.send('hello background');
} else if (message is _Emit) {
emit(message.data);
} else if (message is Result<D, R>) {
handlerResult(message);
switch (message.runtimeType) {
case SendPort:
_sendPort.complete(message);
break;
case _EmitTaskAction:
emit((message as _EmitTaskAction).data);
case Result:
_handlerResult(message);
break;
default:
}
});
}

@override
void setup(
{D? data, TaskStatus? status, TaskFlag? flag, TaskPriority? priority}) {
super.setup(data: data, status: status, flag: flag, priority: priority);
if (flag == TaskFlag.cancel) {
_sendPort.future.then((value) => value.send(const _Cancel()));
} else if (flag == TaskFlag.pause) {
_sendPort.future.then((value) => value.send(const _Pause()));
void pause() {
super.pause();
_sendPort.future.then((value) => value.send(const _PauseTaskAction()));
}

@override
void cancel() {
super.cancel();
_sendPort.future.then((value) => value.send(const _CancelTaskAction()));
}

@override
Future<ResultType> run(Operation<D, R> operation) async {
status = TaskStatus.running;
_controller.add(this);

try {
final context = wrapper();
final result = await compute(
(message) async {
final operation = message[0] as Operation;
final context = message[1] as IsolateOperationContextImplWrapper;
context.ensureInitialized();
return await operation.run(context);
},
[
operation,
context,
],
);
_handlerResult(result as Result<D, R>);
return result.type;
} catch (e) {
_handlerResult(Result<D, R>.error(e));
return ResultType.error;
}
}

@override
void handlerResult(Result result) {
super.handlerResult(result);
void _handlerResult(Result<D, R> result) {
super._handlerResult(result);
switch (result.type) {
case ResultType.canceled:
case ResultType.completed:
Expand All @@ -53,9 +81,6 @@ class IsolateOperationContextImpl<D, R> extends OperationContextImpl<D, R> {
return IsolateOperationContextImplWrapper<D, R>(
sendPort: _receivePort.sendPort,
initialData: data,
id: id,
identifier: identifier,
priority: priority,
);
}
}
Expand All @@ -64,94 +89,50 @@ class IsolateOperationContextImplWrapper<D, R> extends OperationContext<D, R> {
IsolateOperationContextImplWrapper({
required this.sendPort,
required D initialData,
required this.id,
required this.identifier,
required this.priority,
}) : data = initialData;

final SendPort sendPort;
late final ReceivePort receivePort;

@override
D data;

@override
final String id;

@override
final String? identifier;

@override
final TaskPriority priority;
bool get shouldCancel => _flag == TaskFlag.cancel;

@override
TaskStatus get status => TaskStatus.running;
bool get shouldPause => _flag == TaskFlag.pause;

TaskFlag _flag = TaskFlag.none;

@override
void emit(D data) {
this.data = data;
sendPort.send(_Emit(data));
sendPort.send(_EmitTaskAction(data));
}

void ensureInitialized() {
receivePort = ReceivePort();
final receivePort = ReceivePort();
sendPort.send(receivePort.sendPort);

receivePort.listen((message) {
if (message is _Cancel) {
if (message is _CancelTaskAction) {
_flag = TaskFlag.cancel;
} else if (message is _Pause) {
} else if (message is _PauseTaskAction) {
_flag = TaskFlag.pause;
}
});
}

@override
bool get shouldCancel => _flag == TaskFlag.cancel;

@override
bool get shouldPause => _flag == TaskFlag.pause;
}

final class _Cancel {
const _Cancel();
final class _CancelTaskAction {
const _CancelTaskAction();
}

final class _Pause {
const _Pause();
final class _PauseTaskAction {
const _PauseTaskAction();
}

final class _Emit {
_Emit(this.data);
final class _EmitTaskAction {
_EmitTaskAction(this.data);
final dynamic data;

@override
String toString() {
return '$data';
}
}

class IsolateTaskImpl<D, R> {
IsolateTaskImpl({required this.operation, required this.context});

factory IsolateTaskImpl.from(TaskImpl<D, R, Operation<D, R>> task) =>
IsolateTaskImpl(
operation: task.operation,
context: (task._context as IsolateOperationContextImpl<D, R>).wrapper(),
);

final Operation<D, R> operation;
final IsolateOperationContextImplWrapper<D, R> context;

Future<Result<D, R>> run() async {
try {
context.ensureInitialized();
final result = await operation.run(context);
return result;
} catch (e) {
return Result<D, R>.error(e);
}
}
}
81 changes: 53 additions & 28 deletions lib/src/operation/operation_context_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,18 @@ part of '../../task_manager.dart';
class OperationContextImpl<D, R> extends OperationContext<D, R> {
OperationContextImpl({
required D initialData,
String? id,
this.identifier,
this.priority = TaskPriority.normal,
this.status = TaskStatus.pending,
}) : data = initialData,
id = id ?? generateIncrementalId('task');
}) : data = initialData;

TaskFlag _flag = TaskFlag.none;
final _controller = StreamController<OperationContextImpl<D, R>>.broadcast();
Completer<R>? _completer;
R? _result;
dynamic _error;

@override
final String id;

@override
final String? identifier;

@override
TaskPriority priority;

@override
TaskStatus status;

@override
Expand Down Expand Up @@ -62,28 +51,64 @@ class OperationContextImpl<D, R> extends OperationContext<D, R> {
_controller.add(this);
}

void setup({
D? data,
TaskStatus? status,
TaskFlag? flag,
TaskPriority? priority,
}) {
if (data != null) {
this.data = data;
}
if (status != null) {
this.status = status;
void cancel() {
switch (status) {
case TaskStatus.running:
_flag = TaskFlag.cancel;
_controller.add(this);
break;
case TaskStatus.pending:
case TaskStatus.paused:
_handlerResult(Result.canceled());
break;
default:
}
if (flag != null) {
_flag = flag;
}

void pause() {
switch (status) {
case TaskStatus.running:
_flag = TaskFlag.pause;
_controller.add(this);
break;
case TaskStatus.pending:
status = TaskStatus.paused;
_controller.add(this);
break;
default:
}
if (priority != null) {
this.priority = priority;
}

void resume() {
switch (status) {
case TaskStatus.paused:
status = TaskStatus.pending;
_controller.add(this);
break;
default:
}
}

void setPriority(TaskPriority priority) {
this.priority = priority;
_controller.add(this);
}

Future<ResultType> run(Operation<D, R> operation) async {
status = TaskStatus.running;
_controller.add(this);

try {
final result = await operation.run(this);
_handlerResult(result);
return result.type;
} catch (e) {
_handlerResult(Result<D, R>.error(e));
return ResultType.error;
}
}

void handlerResult(Result result) {
void _handlerResult(Result<D, R> result) {
switch (result.type) {
case ResultType.paused:
_flag = TaskFlag.none;
Expand Down
Loading

0 comments on commit bd94a1a

Please sign in to comment.