Skip to content

Commit

Permalink
Add hydrated task
Browse files Browse the repository at this point in the history
  • Loading branch information
cezres committed Feb 4, 2024
1 parent bd94a1a commit 8d2739f
Show file tree
Hide file tree
Showing 15 changed files with 469 additions and 386 deletions.
122 changes: 86 additions & 36 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ task_manager:
## Usage


### Create a task

The following example demonstrates how to create a simple task:
### Run a task

```dart
class ExampleOperation extends Operation<int, String> {
Expand All @@ -53,15 +51,13 @@ void example() async {
final worker = Worker();
worker.maxConcurrencies = 2;
// Add a task
final task = worker.addTask(const ExampleOperation(), 1);
final task = worker.run(const ExampleOperation(), 1);
// Wait for the task to complete
await task.wait(); // Result.completed('Hello World - 1')
await task.wait(); // 'Hello World - 1'
}
```

### Pause or cancel a task

For tasks in progress, you need to check if the operation should be paused or canceled, as shown below:
### Pause task

```dart
class CountdownOperation extends Operation<int, void> {
Expand All @@ -71,7 +67,7 @@ class CountdownOperation extends Operation<int, void> {
FutureOr<Result<int, void>> run(OperationContext<int, void> context) async {
int data = context.data;
while (data > 0) {
await Future.delayed(const Duration(milliseconds: 1000));
await Future.delayed(const Duration(milliseconds: 200));
data -= 1;
/// Check if the operation should be paused or canceled
Expand All @@ -87,50 +83,104 @@ class CountdownOperation extends Operation<int, void> {
}
}
void example() {
task.cancel();
task.pause();
task.resume();
void example() async {
final worker = Worker();
final task = worker.run(const CountdownOperation(), 10, isPaused: true);
expect(task.status, TaskStatus.paused);
task.resume();
expect(task.status, TaskStatus.running);
await Future.delayed(const Duration(milliseconds: 400));
expect(task.status, TaskStatus.running);
task.pause();
await Future.delayed(const Duration(milliseconds: 400));
expect(task.status, TaskStatus.paused);
task.resume();
expect(task.status, TaskStatus.running);
await task.wait();
expect(task.status, TaskStatus.completed);
}
```

### Create hydrated task
### Cancel task

```dart
void example() async {
final worker = Worker();
final task = worker.run(const CountdownOperation(), 10);
expect(task.status, TaskStatus.running);
await Future.delayed(const Duration(milliseconds: 400));
task.cancel();
await task.wait().onError((error, stackTrace) {
debugPrint('Error: $error');
}).whenComplete(() {
expect(task.status, TaskStatus.canceled);
});
}
```

To create a hydrated task, refer to the following code:
### Run a task in isolate

```dart
class ExampleHydratedOperation extends HydratedOperation<int, void> {
const ExampleHydratedOperation();
class CountdownComputeOperation extends CountdownOperation {
const CountdownComputeOperation();
@override
FutureOr<Result<int, void>> run(OperationContext<int, void> context) async {
await Future.delayed(const Duration(seconds: 1));
return Result.completed('Hello World - ${context.data}');
}
bool get compute => true;
}
@override
toJson(int data) {
return data;
}
void example() async {
final worker = Worker();
final task = worker.run(const CountdownComputeOperation(), 10);
_listenTask(task);
expect(task.status, TaskStatus.running);
await task.wait();
expect(task.status, TaskStatus.completed);
}
```

### Run a hydrated task


```dart
class CountdownHydratedOperation extends CountdownOperation
implements HydratedOperation<int, void> {
const CountdownHydratedOperation();
@override
int fromJson(json) {
return json;
}
@override
toJson(int data) {
return data;
}
}
void example() {
StorageManager.registerStorage(CustomStorage());
StorageManager.registerOperation(() => const ExampleHydratedOperation());
}
final storage = CustomStorage();
final worker = HydratedWorker(storage: storage, identifier: 'test');
worker.register(() => const CountdownHydratedOperation());
final task = worker.run(const CountdownHydratedOperation(), 10);
expect(task.status, TaskStatus.running);
await _ensureDataStored();
var list = await storage.readAll('test').toList();
expect(list.length, 1);
await task.wait();
expect(task.status, TaskStatus.completed);
await _ensureDataStored();
list = await storage.readAll('test').toList();
expect(list.length, 0);}
```


<details>
<summary>More...</summary>
test
#### test
```dart
final test = 'test';
```
</details>
3 changes: 0 additions & 3 deletions example/lib/countdown_operation.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ class CountdownOperation extends HydratedOperation<int, void> {
@override
String get name => 'Countdown';

@override
bool get compute => true;

@override
FutureOr<Result<int, void>> run(OperationContext<int, void> context) async {
int data = context.data;
Expand Down
13 changes: 8 additions & 5 deletions example/lib/main.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ import 'package:flutter/material.dart';
import 'package:task_manager/task_manager.dart';

void main() async {
StorageManager.registerStorage(CustomStorage.adapter());
StorageManager.registerOperation(() => const CountdownOperation());

runApp(const MyApp());
}

Expand All @@ -17,7 +14,7 @@ class MyApp extends StatelessWidget {
@override
Widget build(BuildContext context) {
return MaterialApp(
title: 'Flutter Task Example',
title: 'Task Manager Example',
debugShowCheckedModeBanner: false,
theme: ThemeData(
colorScheme: ColorScheme.fromSeed(seedColor: Colors.deepPurple),
Expand All @@ -36,13 +33,19 @@ class MyHomePage extends StatefulWidget {
}

class _MyHomePageState extends State<MyHomePage> {
final Worker worker = Worker();
late final HydratedWorker worker;

@override
void initState() {
super.initState();

worker = HydratedWorker(
storage: CustomStorage.adapter(),
identifier: 'default',
);
worker.maxConcurrencies = 2;
worker.register(() => const CountdownOperation());
worker.loadTasks();
}

void _addTasks() {
Expand Down
24 changes: 12 additions & 12 deletions example/lib/storage/custom_storage_io.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import 'package:task_manager/task_manager.dart';

class CustomStorageIOImpl extends CustomStorage {
@override
Stream<TaskEntity> readAll(String identifier) async* {
final directory = await getDirectory(identifier);
Stream<TaskEntity> readAll(String worker) async* {
final directory = await getDirectory(worker);
final list = await directory.list().toList();
for (var element in list) {
if (!element.path.endsWith('json')) {
Expand All @@ -23,41 +23,41 @@ class CustomStorageIOImpl extends CustomStorage {
}

@override
FutureOr<void> write(TaskEntity task, String identifier) async {
return getFile(task.id, identifier).then(
FutureOr<void> write(TaskEntity task, String worker) async {
return getFile(task.id, worker).then(
(value) => value.writeAsString(
json.encode(task.toJson()),
),
);
}

@override
FutureOr<void> delete(String taskId, String identifier) async {
return getFile(taskId, identifier).then((value) async {
FutureOr<void> delete(String taskId, String worker) async {
return getFile(taskId, worker).then((value) async {
if (await value.exists()) {
value.delete();
}
});
}

@override
FutureOr<void> clear(String identifier) {
return getDirectory(identifier).then((value) => value.delete());
FutureOr<void> clear(String worker) {
return getDirectory(worker).then((value) => value.delete());
}

@override
FutureOr<void> close() {}

Future<Directory> getDirectory(String managerIdentifier) async {
final directory = Directory('task_storage/$managerIdentifier');
Future<Directory> getDirectory(String worker) async {
final directory = Directory('task_storage/$worker');
if (!(await directory.exists())) {
await directory.create(recursive: true);
}
return directory;
}

Future<File> getFile(String taskId, String identifier) async {
final directory = await getDirectory(identifier);
Future<File> getFile(String taskId, String worker) async {
final directory = await getDirectory(worker);
final path = join(directory.path, '$taskId.json');
return File(path);
}
Expand Down
14 changes: 7 additions & 7 deletions example/lib/storage/custom_storage_web.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ class CustomStorageWebImpl extends CustomStorage {
}

@override
FutureOr<void> clear(String identifier) async {
FutureOr<void> clear(String worker) async {
await _ready.future;
_database.transaction((transaction) {
return _store.delete(
transaction,
finder: Finder(filter: Filter.equals('identifier', identifier)),
finder: Finder(filter: Filter.equals('worker', worker)),
);
});
}
Expand All @@ -37,7 +37,7 @@ class CustomStorageWebImpl extends CustomStorage {
}

@override
FutureOr<void> delete(String taskId, String identifier) async {
FutureOr<void> delete(String taskId, String worker) async {
await _ready.future;
await _database.transaction((transaction) {
final record = _store.record(taskId);
Expand All @@ -46,13 +46,13 @@ class CustomStorageWebImpl extends CustomStorage {
}

@override
Stream<TaskEntity> readAll(String identifier) {
Stream<TaskEntity> readAll(String worker) {
final controller = StreamController<TaskEntity>();

_ready.future.then((value) {
return _database.transaction((transaction) async {
final finder = Finder(
filter: Filter.equals('identifier', identifier),
filter: Filter.equals('worker', worker),
);
final records = await _store.find(transaction, finder: finder);
for (var element in records) {
Expand All @@ -71,14 +71,14 @@ class CustomStorageWebImpl extends CustomStorage {
}

@override
FutureOr<void> write(TaskEntity task, String identifier) async {
FutureOr<void> write(TaskEntity task, String worker) async {
await _ready.future;
await _database.transaction((transaction) {
final record = _store.record(task.id);
record.put(
transaction,
{
'identifier': identifier,
'worker': worker,
'entity': task.toJson(),
},
);
Expand Down
4 changes: 0 additions & 4 deletions example/lib/task_manager_view.dart
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,4 @@ class _TaskListItem extends StatelessWidget {
return const SizedBox.shrink();
}
}

// Widget _buildProgressView(BuildContext context, {required Task task}) {
// return
// }
}
Loading

0 comments on commit 8d2739f

Please sign in to comment.