Skip to content

Commit

Permalink
Implement reusable isolate
Browse files Browse the repository at this point in the history
  • Loading branch information
cezres committed Feb 6, 2024
1 parent be52afe commit 4af6a2e
Show file tree
Hide file tree
Showing 5 changed files with 361 additions and 79 deletions.
7 changes: 6 additions & 1 deletion example/lib/storage/custom_storage_io.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import 'dart:convert';
import 'dart:io';

import 'package:example/storage/custom_storage.dart';
import 'package:flutter/material.dart';
import 'package:path/path.dart';
import 'package:task_manager/task_manager.dart';

Expand All @@ -18,7 +19,11 @@ class CustomStorageIOImpl extends CustomStorage {
final file = File(element.path);
final contents = await file.readAsString();
final json = jsonDecode(contents);
yield TaskEntity.fromJson(json);
try {
yield TaskEntity.fromJson(json);
} catch (e) {
debugPrint('Error parsing task: $e');
}
}
}

Expand Down
117 changes: 40 additions & 77 deletions lib/src/operation/isolate_operation_context_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,18 @@ class IsolateOperationContextImpl<D, R> extends OperationContextImpl<D, R> {
required super.initialData,
super.priority,
super.status,
}) {
_receivePort.listen((message) {
switch (message.runtimeType) {
case SendPort:
_sendPort.complete(message);
break;
case _EmitTaskAction:
emit((message as _EmitTaskAction).data);
case Result:
_handlerResult(message);
break;
default:
}
});
}
});

@override
void pause() {
super.pause();
_sendPort.future.then((value) => value.send(const _PauseTaskAction()));
_isolateCallback?.emit(const _PauseTaskAction());
}

@override
void cancel() {
super.cancel();
_sendPort.future.then((value) => value.send(const _CancelTaskAction()));
_isolateCallback?.emit(const _CancelTaskAction());
}

@override
Expand All @@ -39,60 +25,55 @@ class IsolateOperationContextImpl<D, R> extends OperationContextImpl<D, R> {
_controller.add(this);

try {
final context = wrapper();
final result = await compute(
(message) async {
final callback = ReusableIsolate.run(
(message, receive, emit) {
final operation = message[0] as Operation;
final context = message[1] as IsolateOperationContextImplWrapper;
context.ensureInitialized();
final result = await operation.run(context);
return result.tryToTransferableTypedData();
context.ensureInitialized(receive, emit);
return operation.run(context);
},
[
operation,
context,
],
[operation, wrapper()],
);
callback.receive.listen((event) {
emit(_tryFromTransferableTypedData(event));
});

_isolateCallback = callback;
final result = await callback.wait();
_isolateCallback = null;
_handlerResult(result.tryFromTransferableTypedData());
return result.type;
} catch (e) {
_isolateCallback = null;
_handlerResult(Result.error(e));
return ResultType.error;
}
}

@override
void _handlerResult(Result result) {
super._handlerResult(result);
switch (result.type) {
case ResultType.canceled:
case ResultType.completed:
case ResultType.error:
_receivePort.close();
break;
default:
}
}

final ReceivePort _receivePort = ReceivePort();
late Completer<SendPort> _sendPort;
IsolateCallback? _isolateCallback;

IsolateOperationContextImplWrapper<D, R> wrapper() {
_sendPort = Completer<SendPort>();
return IsolateOperationContextImplWrapper<D, R>(
sendPort: _receivePort.sendPort,
initialData: data,
);
return IsolateOperationContextImplWrapper<D, R>(initialData: data);
}
}

class IsolateOperationContextImplWrapper<D, R> extends OperationContext<D, R> {
IsolateOperationContextImplWrapper({
required this.sendPort,
required D initialData,
}) : data = initialData;

final SendPort sendPort;
void Function(dynamic)? _emitter;

void ensureInitialized(Stream receive, void Function(dynamic) emit) {
_emitter = emit;
receive.listen((message) {
if (message is _CancelTaskAction) {
_flag = TaskFlag.cancel;
} else if (message is _PauseTaskAction) {
_flag = TaskFlag.pause;
}
});
}

@override
D data;
Expand All @@ -108,20 +89,7 @@ class IsolateOperationContextImplWrapper<D, R> extends OperationContext<D, R> {
@override
void emit(D data) {
this.data = data;
sendPort.send(_EmitTaskAction(data));
}

void ensureInitialized() {
final receivePort = ReceivePort();
sendPort.send(receivePort.sendPort);

receivePort.listen((message) {
if (message is _CancelTaskAction) {
_flag = TaskFlag.cancel;
} else if (message is _PauseTaskAction) {
_flag = TaskFlag.pause;
}
});
_emitter?.call(_tryToTransferableTypedData(data));
}
}

Expand All @@ -133,21 +101,16 @@ final class _PauseTaskAction {
const _PauseTaskAction();
}

final class _EmitTaskAction {
_EmitTaskAction(dynamic data) {
if (data is Uint8List) {
_data = TransferableTypedData.fromList([data]);
} else {
_data = data;
}
dynamic _tryFromTransferableTypedData(dynamic data) {
if (data is TransferableTypedData) {
return data.materialize().asUint8List();
}
late final dynamic _data;
return data;
}

dynamic get data {
if (_data is TransferableTypedData) {
return _data.materialize().asUint8List();
} else {
return _data;
}
dynamic _tryToTransferableTypedData(dynamic data) {
if (data is Uint8List) {
return TransferableTypedData.fromList([data]);
}
return data;
}
Loading

0 comments on commit 4af6a2e

Please sign in to comment.