Skip to content

Commit

Permalink
fix(mutex): ensure mutex blocks only upto max lambda duration
Browse files Browse the repository at this point in the history
  • Loading branch information
uladkasach committed Aug 19, 2024
1 parent 13da8cd commit 62dca67
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 3 deletions.
157 changes: 157 additions & 0 deletions src/logic/withAsyncTaskExecutionLifecycleExecute.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { asUniDateTime, subDuration } from '@ehmpathy/uni-time';
import { DomainEntity, getUniqueIdentifier, serialize } from 'domain-objects';
import { getError, given, then, when } from 'test-fns';
import { HasMetadata } from 'type-fns';
Expand Down Expand Up @@ -135,4 +136,160 @@ describe('withAsyncTaskExecutionLifecycleExecute', () => {
},
);
});

given(
'a task class with an in memory dao, sqs driver, and mutex keys',
() => {
interface AsyncTaskSyncProspects {
updatedAt?: Date;
forAgentUuid: string;
perOpportunityUuid: string;
status: AsyncTaskStatus;
}
class AsyncTaskSyncProspects
extends DomainEntity<AsyncTaskSyncProspects>
implements AsyncTaskSyncProspects
{
public static unique = ['perOpportunityUuid'] as const;
public static mutex = ['forAgentUuid'] as const;
}
const database: Record<string, HasMetadata<AsyncTaskSyncProspects>> = {};
const daoAsyncTaskSyncProspects = {
upsert: jest.fn(
async (input: {
task: AsyncTaskSyncProspects;
mockUpdatedAt?: Date;
}): Promise<HasMetadata<AsyncTaskSyncProspects>> => {
const withMetadata = new AsyncTaskSyncProspects({
...input.task,
updatedAt: input.mockUpdatedAt ?? new Date(),
}) as HasMetadata<AsyncTaskSyncProspects>;
database[serialize(getUniqueIdentifier(withMetadata))] =
withMetadata;
return withMetadata;
},
),
findByUnique: jest.fn(
async (input: {
perOpportunityUuid: string;
}): Promise<null | HasMetadata<AsyncTaskSyncProspects>> =>
database[
serialize({ perOpportunityUuid: input.perOpportunityUuid })
] ?? null,
),
findByMutex: jest.fn(
async (input: { forAgentUuid: string; status: AsyncTaskStatus }) =>
Object.values(database).filter(
(task) =>
task.forAgentUuid === input.forAgentUuid &&
task.status === input.status,
),
),
};
const executeInnerLogicMock = jest.fn();
const sqsSendMessageMock = jest.fn();
const execute = withAsyncTaskExecutionLifecycleExecute(
async ({ task }) => {
executeInnerLogicMock({ on: task });
return {
task: await daoAsyncTaskSyncProspects.upsert({
task: { ...task, status: AsyncTaskStatus.FULFILLED },
}),
};
},
{
dao: daoAsyncTaskSyncProspects,
log: console,
api: {
sqs: {
sendMessage: sqsSendMessageMock,
},
},
},
);

beforeEach(() => {
executeInnerLogicMock.mockReset();
sqsSendMessageMock.mockReset();
});

when('asked to execute on a queued task', () => {
then('it should successfully attempt to execute', async () => {
const task = await daoAsyncTaskSyncProspects.upsert({
task: new AsyncTaskSyncProspects({
forAgentUuid: uuid(),
perOpportunityUuid: uuid(),
status: AsyncTaskStatus.QUEUED,
}),
});
const result = await execute({ task });
expect(executeInnerLogicMock).toHaveBeenCalledTimes(1);
expect(result.task.status).toEqual(AsyncTaskStatus.FULFILLED);
});
});

when(
'asked to execute on a queued task with an attempted mutex peer',
() => {
then(
'it should retry later since mutex is reserved by peer',
async () => {
const task = await daoAsyncTaskSyncProspects.upsert({
task: new AsyncTaskSyncProspects({
forAgentUuid: uuid(),
perOpportunityUuid: uuid(),
status: AsyncTaskStatus.QUEUED,
}),
});
await daoAsyncTaskSyncProspects.upsert({
task: new AsyncTaskSyncProspects({
forAgentUuid: task.forAgentUuid,
perOpportunityUuid: uuid(),
status: AsyncTaskStatus.ATTEMPTED, // in attempted status
}),
});

const error = await getError(execute({ task }));
expect(error.message).toContain(
'this error was thrown to ensure this task is retried later',
);
expect(error.message).toContain(
'mutex lock is reserved by at least one other task currently being attempted',
);
},
);
},
);

when(
'asked to execute on a queued task with an attempted mutex peer which was last updated over 17 min ago',
() => {
then('it should successfully attempt to execute', async () => {
const task = await daoAsyncTaskSyncProspects.upsert({
task: new AsyncTaskSyncProspects({
forAgentUuid: uuid(),
perOpportunityUuid: uuid(),
status: AsyncTaskStatus.QUEUED,
}),
});
await daoAsyncTaskSyncProspects.upsert({
task: new AsyncTaskSyncProspects({
forAgentUuid: task.forAgentUuid,
perOpportunityUuid: uuid(),
status: AsyncTaskStatus.ATTEMPTED, // in attempted status
}),
mockUpdatedAt: new Date(
subDuration(asUniDateTime(new Date()), {
minutes: 17,
}),
),
});
const result = await execute({ task });
expect(executeInnerLogicMock).toHaveBeenCalledTimes(1);
expect(result.task.status).toEqual(AsyncTaskStatus.FULFILLED);
});
},
);
},
);
});
27 changes: 24 additions & 3 deletions src/logic/withAsyncTaskExecutionLifecycleExecute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,37 @@ export const withAsyncTaskExecutionLifecycleExecute = <
);

// check whether there are mutually exclusive tasks in flight
const mutexActiveTasks = await dao.findByMutex(
const mutexTasks = await dao.findByMutex(
{ ...(foundTask as unknown as M), status: AsyncTaskStatus.ATTEMPTED },
context,
);
if (mutexActiveTasks.length)

// filter them down to the ones that are active (lambdas have durations w/ max of up to 15min)
const mutexTasksStillPotentiallyActive = mutexTasks.filter((task) => {
const updatedAtLast =
typeof task.updatedAt === 'string'
? parseISO(task.updatedAt)
: task.updatedAt;
if (!updatedAtLast)
throw new UnexpectedCodePathError(
'task did not have an .updatedAt attribute. this is required for reliable async-tasks',
{ mutexTaskFound: task },
);
const attemptTimeoutAt = addSeconds(
updatedAtLast,
attemptTimeoutSeconds, // default to 15 min
);
const now = new Date();
return isBefore(now, attemptTimeoutAt);
});

// retry the task if there are any
if (mutexTasksStillPotentiallyActive.length)
return await retryLater(
`this task's mutex lock is reserved by at least one other task currently being attempted by a different invocation`,
{
mutexKeys,
mutexActiveTasks,
mutexTasksStillPotentiallyActive,
},
);
}
Expand Down

0 comments on commit 62dca67

Please sign in to comment.