Skip to content

Commit

Permalink
feat(requeue): add native sqs requeue capability to prevent error log…
Browse files Browse the repository at this point in the history
…s on retryLater
  • Loading branch information
uladkasach committed Jul 6, 2024
1 parent d8b716a commit ca83c9b
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 39 deletions.
52 changes: 48 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
"postversion": "git push origin HEAD --tags --no-verify"
},
"dependencies": {
"@ehmpathy/error-fns": "1.0.2",
"@ehmpathy/error-fns": "1.3.0",
"date-fns": "2.30.0",
"simple-in-memory-queue": "1.1.7",
"uuid": "9.0.0"
Expand Down
15 changes: 13 additions & 2 deletions src/domain/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,24 @@ export type AsyncTaskDaoContext = Record<string, any> | void;
export interface AsyncTaskDao<
T extends AsyncTask,
U extends Partial<T>,
M extends Partial<T>,
M extends Partial<T> & { status: AsyncTaskStatus },
C extends AsyncTaskDaoContext,
> {
findByMutex?: (
input: M & { status: AsyncTaskStatus.ATTEMPTED }, // needs to be able to search by mutex keys + status
input: M & { status: AsyncTaskStatus }, // needs to be able to search by mutex keys + status
context: C,
) => Promise<HasMetadata<T>[]>;
findByUnique: (input: U, context: C) => Promise<HasMetadata<T> | null>;
upsert: (input: { task: T }, context: C) => Promise<HasMetadata<T>>;
}

/**
* the shape of a simple aws sqs api
*/
export type SimpleAwsSqsApi = {
sendMessage: (input: {
queueUrl: string;
messageBody: string;
delaySeconds?: number;
}) => Promise<void>;
};
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export { extractTaskFromSqsEvent } from './logic/extractTaskFromSqsEvent';
export { extractTaskParcelFromSqsEvent } from './logic/extractTaskParcelFromSqsEvent';
export {
withAsyncTaskExecutionLifecycleEnqueue,
SimpleAsyncTaskSqsQueueContract,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,30 @@
import { UnexpectedCodePathError } from '@ehmpathy/error-fns';
import type { SQSEvent } from 'aws-lambda';
import { HasMetadata } from 'type-fns';

import { SimpleAsyncTaskEnqueueMetadata } from './withAsyncTaskExecutionLifecycleEnqueue';

/**
* a queue parcel contains both the task and meta about how the task was enqueued
*/
export interface AsyncTaskQueueParcel<T extends Record<string, any>> {
/**
* the task contained within the parcel
*/
task: HasMetadata<T>;

/**
* metadata about how this parcel was enqueued
*/
meta?: SimpleAsyncTaskEnqueueMetadata;
}

/**
* method to extract an async task from an sqs event sent to an aws-lambda
* method to extract an async task parcel from an sqs event sent to an aws-lambda
*/
export const extractTaskFromSqsEvent = <T>(event: SQSEvent) => {
export const extractTaskParcelFromSqsEvent = <T extends Record<string, any>>(
event: SQSEvent,
) => {
// extract the body
if (event.Records.length > 1)
throw new UnexpectedCodePathError(
Expand All @@ -19,15 +39,15 @@ export const extractTaskFromSqsEvent = <T>(event: SQSEvent) => {
// });
// if (requeued) return { delayed: true }; // stop here if we requeued the message due to sqs scheduling

// parse the body into task
// parse the body
const message = event.Records[0]!.body;
const body = JSON.parse(message) as { task: T };
const body = JSON.parse(message) as AsyncTaskQueueParcel<T>;
if (!body.task)
throw new UnexpectedCodePathError(
'could not find task on sqs message body',
{
body,
},
);
return body.task;
return { task: body.task, meta: body.meta };
};
72 changes: 61 additions & 11 deletions src/logic/withAsyncTaskExecutionLifecycleEnqueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ import { UnexpectedCodePathError } from '@ehmpathy/error-fns';
import type { LogMethods } from 'simple-leveled-log-methods';
import { HasMetadata, isAFunction } from 'type-fns';

import { AsyncTaskDao, AsyncTaskDaoContext } from '../domain/constants';
import { uuid } from '../deps';
import {
AsyncTaskDao,
AsyncTaskDaoContext,
SimpleAwsSqsApi,
} from '../domain/constants';
import { AsyncTask, AsyncTaskStatus } from '../domain/objects/AsyncTask';

/**
Expand All @@ -14,15 +19,49 @@ import { AsyncTask, AsyncTaskStatus } from '../domain/objects/AsyncTask';
*/
export type SimpleAsyncTaskSqsQueueContract = {
type: 'SQS';
api: {
sendMessage: (input: {
queueUrl: string;
messageBody: string;
}) => Promise<void>;
};
api: SimpleAwsSqsApi;
url: string | (() => Promise<string>);
};

/**
* simple-async-task metadata embedded in the queue message
*/
export type SimpleAsyncTaskSqsEnqueueMetadata = {
/**
* the type of queue this task was enqueued to
*
* usecase
* - type narrow
*/
queueType: 'SQS';

/**
* the queue that the message was originally enqueued to
*/
queueUrl: string;

/**
* a uuid assigned to this message upon enqueue
*
* usecase
* - traceability
*/
enqueueUuid: string;

/**
* the number of times this sqs.message was requeued, post enqueue
*
* usecase
* - infiniloop prevention
*/
requeueDepth: number;
};

/**
* the different types of metadata that are available enqueue
*/
export type SimpleAsyncTaskEnqueueMetadata = SimpleAsyncTaskSqsEnqueueMetadata;

/**
* a simple, generic, contract for async-tasks queued via any queue
*
Expand Down Expand Up @@ -56,7 +95,7 @@ export type SimpleAsyncTaskAnyQueueContract<T> = {
export const withAsyncTaskExecutionLifecycleEnqueue = <
T extends AsyncTask,
U extends Partial<T>,
M extends Partial<T>,
M extends Partial<T> & { status: AsyncTaskStatus },
C extends AsyncTaskDaoContext,
I extends U,
>({
Expand Down Expand Up @@ -131,11 +170,22 @@ export const withAsyncTaskExecutionLifecycleEnqueue = <
});
await (async () => {
// support sqs queues natively
if (queue.type === 'SQS')
if (queue.type === 'SQS') {
const queueUrl = isAFunction(queue.url) ? await queue.url() : queue.url;
const meta: SimpleAsyncTaskSqsEnqueueMetadata = {
queueType: 'SQS',
queueUrl,
enqueueUuid: uuid(),
requeueDepth: 0,
};
return await queue.api.sendMessage({
queueUrl: isAFunction(queue.url) ? await queue.url() : queue.url,
messageBody: JSON.stringify({ task: taskToQueue }),
queueUrl,
messageBody: JSON.stringify({
task: taskToQueue,
meta,
}),
});
}

// otherwise, assume it has a generic queue contract
if (queue.push) return await queue.push(taskToQueue);
Expand Down
Loading

0 comments on commit ca83c9b

Please sign in to comment.