-
Notifications
You must be signed in to change notification settings - Fork 44
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Replace tarball routes with a scheduled Lambda
Fixes #1289.
- Loading branch information
Showing
7 changed files
with
186 additions
and
96 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
/*! | ||
* Copyright © 2023 United States Government as represented by the | ||
* Administrator of the National Aeronautics and Space Administration. | ||
* All Rights Reserved. | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
import type { Circular } from '~/routes/circulars/circulars.lib' | ||
|
||
export interface CircularAction<T = any> { | ||
initialize: () => Promise<T> | ||
action: (circulars: Circular[], context: T) => Promise<void> | ||
finalize: (context: T) => Promise<void> | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
/*! | ||
* Copyright © 2023 United States Government as represented by the | ||
* Administrator of the National Aeronautics and Space Administration. | ||
* All Rights Reserved. | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
import { tables } from '@architect/functions' | ||
import type { DynamoDBDocument } from '@aws-sdk/lib-dynamodb' | ||
import { paginateScan } from '@aws-sdk/lib-dynamodb' | ||
|
||
import type { CircularAction } from './circularAction' | ||
import { jsonUploadAction, txtUploadAction } from './uploadTar' | ||
import { type Circular } from '~/routes/circulars/circulars.lib' | ||
|
||
async function mapCirculars(...actions: CircularAction[]) { | ||
const contexts = await Promise.all( | ||
actions.map((action) => action.initialize()) | ||
) | ||
for await (const circulars of getAllRecords()) { | ||
await Promise.all( | ||
actions.map(({ action }, i) => action(circulars, contexts[i])) | ||
) | ||
} | ||
await Promise.all(actions.map(({ finalize }, i) => finalize(contexts[i]))) | ||
} | ||
|
||
async function* getAllRecords() { | ||
const db = await tables() | ||
const client = db._doc as unknown as DynamoDBDocument | ||
const TableName = db.name('circulars') | ||
const pages = paginateScan({ client }, { TableName }) | ||
|
||
for await (const page of pages) { | ||
yield page.Items as Circular[] | ||
} | ||
} | ||
|
||
// FIXME: must use module.exports here for OpenTelemetry shim to work correctly. | ||
// See https://dev.to/heymarkkop/how-to-solve-cannot-redefine-property-handler-on-aws-lambda-3j67 | ||
module.exports.handler = async () => { | ||
await mapCirculars(jsonUploadAction, txtUploadAction) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
/*! | ||
* Copyright © 2023 United States Government as represented by the | ||
* Administrator of the National Aeronautics and Space Administration. | ||
* All Rights Reserved. | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
import { PutObjectCommand, S3Client } from '@aws-sdk/client-s3' | ||
import { createReadableStreamFromReadable } from '@remix-run/node' | ||
import { Readable } from 'stream' | ||
import type { Pack } from 'tar-stream' | ||
import { pack as tarPack } from 'tar-stream' | ||
|
||
import type { CircularAction } from './circularAction' | ||
import { getEnvOrDie } from '~/lib/env.server' | ||
import type { Circular } from '~/routes/circulars/circulars.lib' | ||
import { formatCircular } from '~/routes/circulars/circulars.lib' | ||
|
||
interface TarContext { | ||
pack: Pack | ||
tarStream: Readable | ReadableStream<Uint8Array> | ||
fileType: string | ||
} | ||
|
||
const formatters: Record<string, (circular: Circular) => string> = { | ||
json({ sub, ...circular }) { | ||
return JSON.stringify(circular, null, 2) | ||
}, | ||
txt: formatCircular, | ||
} | ||
|
||
const s3 = new S3Client({}) | ||
const Bucket = getEnvOrDie('ARC_STATIC_BUCKET') | ||
|
||
async function uploadStream(tarContext: TarContext) { | ||
await s3.send( | ||
new PutObjectCommand({ | ||
Bucket, | ||
Key: `circulars.archive.${tarContext.fileType}.tar`, | ||
Body: tarContext.tarStream, | ||
}) | ||
) | ||
} | ||
|
||
export async function makeTarFile({ | ||
pack, | ||
circulars, | ||
fileType, | ||
}: { | ||
pack: Pack | ||
circulars: Circular[] | ||
fileType: 'txt' | 'json' | ||
}) { | ||
const formatter = formatters[fileType] | ||
for (const circular of circulars) { | ||
const name = `archive.${fileType}/${circular.circularId}.${fileType}` | ||
pack.entry({ name }, formatter(circular)).end() | ||
} | ||
} | ||
|
||
export async function setupTar() { | ||
const tarStream = new Readable({ | ||
read() {}, | ||
}) | ||
|
||
const pack = tarPack() | ||
|
||
pack.on('error', (err: Error) => { | ||
tarStream.emit('error', err) | ||
}) | ||
|
||
pack.on('data', (chunk: Uint8Array) => { | ||
tarStream.push(chunk) | ||
}) | ||
|
||
pack.on('end', () => { | ||
tarStream.push(null) | ||
}) | ||
|
||
return { pack, tarStream } as TarContext | ||
} | ||
|
||
export async function finalizeTar(context: TarContext) { | ||
context.pack.finalize() | ||
const readableTar = createReadableStreamFromReadable( | ||
Readable.from(context.tarStream as Readable) | ||
) | ||
await uploadStream({ | ||
pack: context.pack, | ||
tarStream: readableTar, | ||
fileType: context.fileType, | ||
}) | ||
} | ||
|
||
export async function uploadTxtTar(circulars: Circular[], context: TarContext) { | ||
return await makeTarFile({ | ||
pack: context.pack, | ||
circulars, | ||
fileType: 'txt', | ||
}) | ||
} | ||
|
||
export async function uploadJsonTar( | ||
circulars: Circular[], | ||
context: TarContext | ||
) { | ||
return await makeTarFile({ | ||
pack: context.pack, | ||
circulars, | ||
fileType: 'json', | ||
}) | ||
} | ||
|
||
export const jsonUploadAction: CircularAction<TarContext> = { | ||
action: uploadJsonTar, | ||
initialize: setupTar, | ||
finalize: finalizeTar, | ||
} | ||
|
||
export const txtUploadAction: CircularAction<TarContext> = { | ||
action: uploadTxtTar, | ||
initialize: setupTar, | ||
finalize: finalizeTar, | ||
} |