Skip to content

Commit

Permalink
fix(exec): auto-retry on unfound task for read-after-write resilience
Browse files Browse the repository at this point in the history
  • Loading branch information
uladkasach committed Aug 1, 2024
1 parent 23da723 commit 597b2b8
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 13 deletions.
48 changes: 35 additions & 13 deletions src/logic/withAsyncTaskExecutionLifecycleExecute.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
import {
BadRequestError,
HelpfulError,
UnexpectedCodePathError,
} from '@ehmpathy/error-fns';
import { HelpfulError, UnexpectedCodePathError } from '@ehmpathy/error-fns';
import { HelpfulErrorMetadata } from '@ehmpathy/error-fns/dist/HelpfulError';
import { addSeconds, isBefore, parseISO } from 'date-fns';
import type { LogMethods } from 'simple-leveled-log-methods';
Expand All @@ -13,6 +9,7 @@ import {
SimpleAwsSqsApi,
} from '../domain/constants';
import { AsyncTask, AsyncTaskStatus } from '../domain/objects/AsyncTask';
import { withRetry } from '../utils/withRetry';
import { AsyncTaskQueueParcel } from './extractTaskParcelFromSqsEvent';

export class SimpleAsyncTaskRetryLaterError extends HelpfulError {
Expand Down Expand Up @@ -86,16 +83,31 @@ export const withAsyncTaskExecutionLifecycleExecute = <
context: C,
): Promise<(O & { task: T }) | (Partial<O> & { task: T })> => {
// try to find the task by unique; it must be defined in db by now
const foundTask = await dao.findByUnique(
const foundTask = await withRetry(
async () => {
// lookup the task
const foundTaskNow = await dao.findByUnique(
{
...(input.task as unknown as U),
},
context,
);

// if not found, throw an error, as this shouldn't have been called for a non-existent task; => drive to dlq if irrecoverable
if (!foundTaskNow)
throw new UnexpectedCodePathError(`task not found by unique`, {
task: input.task,
});

// return the task
return foundTaskNow;
},
{
...(input.task as unknown as U),
// wait 3 seconds before retrying, to attempt to recover from read-after-write out-of-sync issues (e.g., if we tried to search for the task before the db.reader was synced to db.writer)
delay: { seconds: 3 },
log,
},
context,
);
if (!foundTask)
throw new BadRequestError(
`task not found by unique: '${JSON.stringify(input.task)}'`,
);
)();

// define the timeout in seconds; this is how long each attempt could take, max
const attemptTimeoutSeconds = options?.attempt?.timeout.seconds ?? 15 * 60; // default to 15min, a conservative estimate
Expand Down Expand Up @@ -237,6 +249,16 @@ export const withAsyncTaskExecutionLifecycleExecute = <
// otherwise, just return the result with the state of the task now, since this is probably a multi-step task
return { ...result, task: taskNow };
} catch (error) {
log.warn(
'executeTask.progress: caught an error from the execution attempt. marking it as failed. will re-emit the error for sqs retry',
{
task: attemptedTask,
error: {
class: error instanceof Error ? error.name : undefined,
message: error instanceof Error ? error.message : undefined,
},
},
);
await dao.upsert(
{
task: { ...attemptedTask, status: AsyncTaskStatus.FAILED },
Expand Down
26 changes: 26 additions & 0 deletions src/utils/withRetry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { UniDuration, sleep } from '@ehmpathy/uni-time';
import { LogMethods } from 'simple-leveled-log-methods';

/**
* function which calls the wrapped function and runs it again one time if an error is caught
*/
export const withRetry = <T extends (...args: any[]) => Promise<any>>(
logic: T,
options?: {
delay?: UniDuration;
log?: LogMethods;
},
): T => {
return (async (...args: Parameters<T>): Promise<ReturnType<T>> => {
try {
return await logic(...args);
} catch (error: any) {
if (options?.log)
options.log.warn('withRetry.progress: caught an error, will retry', {
errorMessage: error.message,
});
if (options?.delay) await sleep(options.delay);
return await logic(...args);
}
}) as T;
};

0 comments on commit 597b2b8

Please sign in to comment.