Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to the CrashlyticsWorker #6143

Merged
merged 4 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.junit.Assert.assertThrows;

import com.google.android.gms.tasks.Task;
import com.google.android.gms.tasks.TaskCompletionSource;
import com.google.android.gms.tasks.Tasks;
import com.google.firebase.concurrent.TestOnlyExecutors;
import java.io.IOException;
Expand Down Expand Up @@ -421,6 +422,258 @@ public void submitTaskWhenThreadPoolFull() {
assertThrows(TimeoutException.class, () -> Tasks.await(task, 30, TimeUnit.MILLISECONDS));
}

@Test
public void submitTaskThatReturnsWithContinuation() throws Exception {
Task<String> result =
crashlyticsWorker.submitTask(
() -> Tasks.forResult(1337),
task -> Tasks.forResult(Integer.toString(task.getResult())));

assertThat(Tasks.await(result)).isEqualTo("1337");
}

@Test
public void submitTaskThatThrowsWithContinuation() throws Exception {
Task<String> result =
crashlyticsWorker.submitTask(
() -> Tasks.forException(new IndexOutOfBoundsException("Sometimes we look too far.")),
task -> {
if (task.getException() != null) {
return Tasks.forResult("Task threw.");
}
return Tasks.forResult("I dunno how I got here?");
});

assertThat(Tasks.await(result)).isEqualTo("Task threw.");
}

@Test
public void submitTaskWithContinuationThatThrows() throws Exception {
Task<String> result =
crashlyticsWorker.submitTask(
() -> Tasks.forResult(7), task -> Tasks.forException(new IOException()));

ExecutionException thrown = assertThrows(ExecutionException.class, () -> Tasks.await(result));

assertThat(thrown).hasCauseThat().isInstanceOf(IOException.class);

// Verify the worker still executes tasks after the continuation threw.
assertThat(Tasks.await(crashlyticsWorker.submit(() -> 42))).isEqualTo(42);
}

@Test
public void submitTaskThatCancelsWithContinuation() throws Exception {
Task<String> result =
crashlyticsWorker.submitTask(
Tasks::forCanceled,
task -> Tasks.forResult(task.isCanceled() ? "Task cancelled." : "What?"));

assertThat(Tasks.await(result)).isEqualTo("Task cancelled.");
}

@Test
public void submitTaskWithContinuationThatCancels() throws Exception {
Task<String> result =
crashlyticsWorker.submitTask(() -> Tasks.forResult(7), task -> Tasks.forCanceled());

assertThrows(CancellationException.class, () -> Tasks.await(result));

// Verify the worker still executes tasks after the continuation was cancelled.
assertThat(Tasks.await(crashlyticsWorker.submit(() -> "jk"))).isEqualTo("jk");
}

@Test
public void submitTaskWithContinuationExecutesInOrder() throws Exception {
// The integers added to the list represent the order they should be executed in.
List<Integer> list = new ArrayList<>();

// Start the chain which adds 1, then kicks off tasks to add 6 & 7 later, but adds 2 before
// executing the newly added tasks in the continuation.
crashlyticsWorker.submitTask(
() -> {
list.add(1);

// Sleep to give time for the tasks 3, 4, 5 to be submitted.
sleep(300);

// We added the 1 and will add 2 in the continuation. And 3, 4, 5 have been submitted.
crashlyticsWorker.submit(() -> list.add(6));
crashlyticsWorker.submit(() -> list.add(7));

return Tasks.forResult(1);
},
task -> {
// When the task 1 completes the next number to add is 2. Because all the other tasks are
// just submitted, not executed yet.
list.add(2);
return Tasks.forResult("a");
});

// Submit tasks to add 3, 4, 5 since we just added 1 and know a continuation will add the 2.
crashlyticsWorker.submit(() -> list.add(3));
crashlyticsWorker.submit(() -> list.add(4));
crashlyticsWorker.submit(() -> list.add(5));

crashlyticsWorker.await();

// Verify the list is complete and in order.
assertThat(list).isInOrder();
assertThat(list).hasSize(7);
}

@Test
public void raceReturnsFirstResult() throws Exception {
// Create 2 tasks on different workers to race.
Task<String> task1 =
new CrashlyticsWorker(TestOnlyExecutors.background())
.submit(
() -> {
sleep(200);
return "first";
});
Task<String> task2 =
new CrashlyticsWorker(TestOnlyExecutors.background())
.submit(
() -> {
sleep(400);
return "slow";
});

Task<String> task = crashlyticsWorker.race(task1, task2);
String result = Tasks.await(task);

assertThat(result).isEqualTo("first");
}

@Test
public void raceReturnsFirstException() {
// Create 2 tasks on different workers to race.
Task<String> task1 =
new CrashlyticsWorker(TestOnlyExecutors.background())
.submitTask(
() -> {
sleep(200);
return Tasks.forException(new ArithmeticException());
});
Task<String> task2 =
new CrashlyticsWorker(TestOnlyExecutors.background())
.submitTask(
() -> {
sleep(400);
return Tasks.forException(new IllegalStateException());
});

Task<String> task = crashlyticsWorker.race(task1, task2);
ExecutionException thrown = assertThrows(ExecutionException.class, () -> Tasks.await(task));

// The first task throws an ArithmeticException.
assertThat(thrown).hasCauseThat().isInstanceOf(ArithmeticException.class);
}

@Test
public void raceFirstCancelsReturnsSecondResult() throws Exception {
// Create 2 tasks on different workers to race.
Task<String> task1 =
new CrashlyticsWorker(TestOnlyExecutors.background())
.submitTask(
() -> {
sleep(200);
return Tasks.forCanceled();
});
Task<String> task2 =
new CrashlyticsWorker(TestOnlyExecutors.background())
.submitTask(
() -> {
sleep(400);
return Tasks.forResult("I am slow but didn't cancel.");
});

Task<String> task = crashlyticsWorker.race(task1, task2);
String result = Tasks.await(task);

assertThat(result).isEqualTo("I am slow but didn't cancel.");
}

@Test
public void raceBothCancel() {
// Create 2 tasks on different workers to race.
Task<String> task1 =
new CrashlyticsWorker(TestOnlyExecutors.background())
.submitTask(
() -> {
sleep(200);
return Tasks.forCanceled();
});
Task<String> task2 =
new CrashlyticsWorker(TestOnlyExecutors.background())
.submitTask(
() -> {
sleep(400);
return Tasks.forCanceled();
});

Task<String> task = crashlyticsWorker.race(task1, task2);

// Both cancelled, so cancel the race result.
assertThrows(CancellationException.class, () -> Tasks.await(task));
}

@Test
public void raceTasksOnSameWorker() throws Exception {
// Create 2 tasks on this worker to race.
Task<String> task1 =
crashlyticsWorker.submit(
() -> {
sleep(200);
return "first";
});
Task<String> task2 =
crashlyticsWorker.submit(
() -> {
sleep(300);
return "second";
});

Task<String> task = crashlyticsWorker.race(task1, task2);
String result = Tasks.await(task);

// The first task is submitted to this worker first, so will always be first.
assertThat(result).isEqualTo("first");
}

@Test
public void raceTaskOneOnSameWorkerAnotherNeverCompletes() throws Exception {
// Create a task on this worker, and another that never completes, to race.
Task<String> task1 = crashlyticsWorker.submit(() -> "first");
Task<String> task2 = new TaskCompletionSource<String>().getTask();

Task<String> task = crashlyticsWorker.race(task1, task2);
String result = Tasks.await(task);

assertThat(result).isEqualTo("first");
}

@Test
public void raceTaskOneOnSameWorkerAnotherOtherThatCompletesFirst() throws Exception {
// Add a decoy task to the worker to take up some time.
crashlyticsWorker.submitTask(
() -> {
sleep(200);
return Tasks.forResult(null);
});

// Create a task on this worker, and another, to race.
Task<String> task1 = crashlyticsWorker.submit(() -> "same worker");
TaskCompletionSource<String> task2 = new TaskCompletionSource<>();
task2.trySetResult("other");

Task<String> task = crashlyticsWorker.race(task1, task2.getTask());
String result = Tasks.await(task);

// The other tasks completes first because the first task is queued up later on the worker.
assertThat(result).isEqualTo("other");
}

private static void sleep(long millis) {
try {
Thread.sleep(millis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
package com.google.firebase.crashlytics.internal;

import androidx.annotation.VisibleForTesting;
import com.google.android.gms.tasks.CancellationTokenSource;
import com.google.android.gms.tasks.Continuation;
import com.google.android.gms.tasks.Task;
import com.google.android.gms.tasks.TaskCompletionSource;
import com.google.android.gms.tasks.Tasks;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Helper for executing tasks sequentially on the given executor service.
Expand Down Expand Up @@ -57,9 +62,12 @@ public ExecutorService getExecutor() {
/**
* Submits a <code>Callable</code> task for asynchronous execution on the executor.
*
* <p>A blocking callable will block an underlying thread.
*
* <p>Returns a <code>Task</code> which will be resolved upon successful completion of the
* callable, or throws an <code>ExecutionException</code> if the callable throws an exception.
*/
@CanIgnoreReturnValue
mrober marked this conversation as resolved.
Show resolved Hide resolved
public <T> Task<T> submit(Callable<T> callable) {
synchronized (tailLock) {
// Do not propagate a cancellation.
Expand All @@ -76,9 +84,12 @@ public <T> Task<T> submit(Callable<T> callable) {
/**
* Submits a <code>Runnable</code> task for asynchronous execution on the executor.
*
* <p>A blocking runnable will block an underlying thread.
*
* <p>Returns a <code>Task</code> which will be resolved with null upon successful completion of
* the runnable, or throws an <code>ExecutionException</code> if the runnable throws an exception.
*/
@CanIgnoreReturnValue
themiswang marked this conversation as resolved.
Show resolved Hide resolved
public Task<Void> submit(Runnable runnable) {
synchronized (tailLock) {
// Do not propagate a cancellation.
Expand Down Expand Up @@ -108,6 +119,7 @@ public Task<Void> submit(Runnable runnable) {
* returned by the callable, throws an <code>ExecutionException</code> if the callable throws an
* exception, or throws a <code>CancellationException</code> if the task is cancelled.
*/
@CanIgnoreReturnValue
public <T> Task<T> submitTask(Callable<Task<T>> callable) {
synchronized (tailLock) {
// Chain the new callable task onto the queue's tail, regardless of cancellation.
Expand All @@ -117,6 +129,60 @@ public <T> Task<T> submitTask(Callable<Task<T>> callable) {
}
}

/**
* Submits a <code>Callable</code> <code>Task</code> followed by a <code>Continuation</code> for
* asynchronous execution on the executor.
*
* <p>This is useful for submitting a task that must be immediately followed by another task,
* regardless of more tasks being submitted in parallel. For example, settings.
*
* <p>Returns a <code>Task</code> which will be resolved upon successful completion of the Task
* returned by the callable and continued by the continuation, throws an <code>ExecutionException
* </code> if either task throws an exception, or throws a <code>CancellationException</code> if
* either task is cancelled.
*/
@CanIgnoreReturnValue
public <T, R> Task<R> submitTask(
Callable<Task<T>> callable, Continuation<T, Task<R>> continuation) {
synchronized (tailLock) {
// Chain the new callable task and continuation onto the queue's tail.
Task<R> result =
tail.continueWithTask(executor, task -> callable.call())
.continueWithTask(executor, continuation);
tail = result;
return result;
}
}

/**
* Returns a task that is resolved when either of the given tasks is resolved.
*
* <p>When both tasks are cancelled, the returned task will be cancelled.
*/
public <T> Task<T> race(Task<T> task1, Task<T> task2) {
CancellationTokenSource cancellation = new CancellationTokenSource();
TaskCompletionSource<T> result = new TaskCompletionSource<>(cancellation.getToken());

AtomicBoolean otherTaskCancelled = new AtomicBoolean(false);

Continuation<T, Task<Void>> continuation =
task -> {
if (task.isSuccessful()) {
result.trySetResult(task.getResult());
} else if (task.getException() != null) {
result.trySetException(task.getException());
} else if (otherTaskCancelled.getAndSet(true)) {
cancellation.cancel();
}
return Tasks.forResult(null);
};

task1.continueWithTask(executor, continuation);
task2.continueWithTask(executor, continuation);

return result.getTask();
}

/**
* Blocks until all current pending tasks have completed, up to 30 seconds. Useful for testing.
*
Expand Down
Loading