Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-79 Copy and delete performance #92

Merged
merged 8 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/helpers/copy.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
const formData = await req.formData();
if (!formData) return {};
const fullDest = formData.get('destination');
const continuationToken = formData.get('continuation-token');

Check warning on line 16 in src/helpers/copy.js

View check run for this annotation

Codecov / codecov/patch

src/helpers/copy.js#L16

Added line #L16 was not covered by tests
const lower = fullDest.slice(1).toLowerCase();
const sanitized = lower.endsWith('/') ? lower.slice(0, -1) : lower;
const destination = sanitized.split('/').slice(1).join('/');
const source = daCtx.key;
return { source, destination };
return { source, destination, continuationToken };

Check warning on line 21 in src/helpers/copy.js

View check run for this annotation

Codecov / codecov/patch

src/helpers/copy.js#L21

Added line #L21 was not covered by tests
}
22 changes: 22 additions & 0 deletions src/helpers/delete.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2024 Adobe. All rights reserved.
* This file is licensed to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may obtain a copy
* of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
* OF ANY KIND, either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/

export default async function deleteHelper(req) {
try {
const formData = await req.formData();
if (!formData) return {};
const continuationToken = formData.get('continuation-token');
return { continuationToken };

Check warning on line 18 in src/helpers/delete.js

View check run for this annotation

Codecov / codecov/patch

src/helpers/delete.js#L16-L18

Added lines #L16 - L18 were not covered by tests
} catch {
return {};
}
}
6 changes: 4 additions & 2 deletions src/routes/source.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import putObject from '../storage/object/put.js';
import deleteObjects from '../storage/object/delete.js';

import putHelper from '../helpers/source.js';
import deleteHelper from '../helpers/delete.js';

async function invalidateCollab(api, url, env) {
const invPath = `/api/v1/${api}?doc=${url}`;
Expand All @@ -23,8 +24,9 @@ async function invalidateCollab(api, url, env) {
await env.dacollab.fetch(invURL);
}

export async function deleteSource({ env, daCtx }) {
return /* await */ deleteObjects(env, daCtx);
export async function deleteSource({ req, env, daCtx }) {
const details = await deleteHelper(req);
return /* await */ deleteObjects(env, daCtx, details);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the await commented out instead of deleted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not know. I left it there as it was from the original variation of this work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did that 😄
The reason is that I find it problematic that all of a sudden the linter wants you to lose the await on an async function if its called on return. If you change the function later and want to add some statements before returning you might forget to add the await at that point.
So I just added the await in a comment as a reminder.

But if you don't like it feel free to remove 😄

}

export async function postSource({ req, env, daCtx }) {
Expand Down
77 changes: 39 additions & 38 deletions src/storage/object/copy.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,13 @@
*/
import {
S3Client,
ListObjectsV2Command,
CopyObjectCommand,
} from '@aws-sdk/client-s3';

import getS3Config from '../utils/config.js';
import { listCommand } from '../utils/list.js';

function buildInput(org, key) {
return {
Bucket: `${org}-content`,
Prefix: `${key}/`,
};
}
const MAX_KEYS = 900;

export const copyFile = async (client, daCtx, sourceKey, details, isRename) => {
const Key = `${sourceKey.replace(details.source, details.destination)}`;
Expand Down Expand Up @@ -51,46 +46,52 @@
try {
await client.send(new CopyObjectCommand(input));
} catch (e) {
// eslint-disable-next-line no-console
console.log(e.$metadata);
console.log({

Check warning on line 49 in src/storage/object/copy.js

View workflow job for this annotation

GitHub Actions / Running tests (20.x)

Unexpected console statement
code: e.$metadata.httpStatusCode,
dest: Key,
src: `${daCtx.org}-content/${sourceKey}`,
});

Check warning on line 53 in src/storage/object/copy.js

View check run for this annotation

Codecov / codecov/patch

src/storage/object/copy.js#L49-L53

Added lines #L49 - L53 were not covered by tests
}
};

export default async function copyObject(env, daCtx, details, isRename) {
if (details.source === details.destination) {
return { body: '', status: 409 };
}
if (details.source === details.destination) return { body: '', status: 409 };

const config = getS3Config(env);
const client = new S3Client(config);
const input = buildInput(daCtx.org, details.source);

let ContinuationToken;

// The input prefix has a forward slash to prevent (drafts + drafts-new, etc.).
// Which means the list will only pickup children. This adds to the initial list.
const sourceKeys = [details.source, `${details.source}.props`];

do {
try {
const command = new ListObjectsV2Command({ ...input, ContinuationToken });
const resp = await client.send(command);

const { Contents = [], NextContinuationToken } = resp;
sourceKeys.push(...Contents.map(({ Key }) => Key));
ContinuationToken = NextContinuationToken;
} catch (e) {
return { body: '', status: 404 };
}
} while (ContinuationToken);
let sourceKeys;
let remainingKeys = [];
let continuationToken;

await Promise.all(
new Array(1).fill(null).map(async () => {
while (sourceKeys.length) {
await copyFile(client, daCtx, sourceKeys.pop(), details, isRename);
try {
if (details.continuationToken) {
continuationToken = details.continuationToken;
remainingKeys = await env.DA_JOBS.get(continuationToken, { type: 'json' });
sourceKeys = remainingKeys.splice(0, MAX_KEYS);
} else {
let resp = await listCommand(daCtx, details, client);
sourceKeys = resp.sourceKeys;
if (resp.continuationToken) {
continuationToken = `copy-${details.source}-${details.destination}-${crypto.randomUUID()}`;
while (resp.continuationToken) {
resp = await listCommand(daCtx, { continuationToken: resp.continuationToken }, client);
remainingKeys.push(...resp.sourceKeys);
}
}
}),
);
}
await Promise.all(sourceKeys.map(async (key) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know for the client-side operations @auniverseaway is now using a queue to limit the number of concurrent operations. Is that something that could be needed if a very large copy operation is happening?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as we stay under 1000 sub-requests, we should be fine. The Queue class on client-side is for when we have 2000+, want to send all immediately, and don't want the browser to choke with too many requests.

await copyFile(client, daCtx, key, details, isRename);
}));

return { status: 204 };
if (remainingKeys.length) {
await env.DA_JOBS.put(continuationToken, JSON.stringify(remainingKeys));
return { body: JSON.stringify({ continuationToken }), status: 206 };
} else if (continuationToken) {
await env.DA_JOBS.delete(continuationToken);
}
return { status: 204 };
} catch (e) {
return { body: '', status: 404 };
}

Check warning on line 96 in src/storage/object/copy.js

View check run for this annotation

Codecov / codecov/patch

src/storage/object/copy.js#L95-L96

Added lines #L95 - L96 were not covered by tests
}
58 changes: 16 additions & 42 deletions src/storage/object/delete.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,11 @@
import {
S3Client,
DeleteObjectCommand,
ListObjectsV2Command,
} from '@aws-sdk/client-s3';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';

import getS3Config from '../utils/config.js';
import { postObjectVersionWithLabel } from '../version/put.js';

function buildInput(org, key) {
return {
Bucket: `${org}-content`,
Prefix: `${key}/`,
};
}
import { listCommand } from '../utils/list.js';

async function invalidateCollab(api, url, env) {
const invPath = `/api/v1/${api}?doc=${url}`;
Expand All @@ -37,8 +29,9 @@ async function invalidateCollab(api, url, env) {
export async function deleteObject(client, daCtx, Key, env, isMove = false) {
const fname = Key.split('/').pop();

if (fname.includes('.') && !Key.endsWith('.props')) {
await postObjectVersionWithLabel(isMove ? 'Moved' : 'Deleted', env, daCtx);
if (fname.includes('.') && !fname.startsWith('.') && !fname.endsWith('.props')) {
const tmpCtx = { ...daCtx, key: Key }; // For next calls, ctx needs the passed
await postObjectVersionWithLabel(isMove ? 'Moved' : 'Deleted', env, tmpCtx);
}

let resp;
Expand All @@ -59,40 +52,21 @@ export async function deleteObject(client, daCtx, Key, env, isMove = false) {
return resp;
}

export default async function deleteObjects(env, daCtx) {
export default async function deleteObjects(env, daCtx, details) {
const config = getS3Config(env);
const client = new S3Client(config);
const input = buildInput(daCtx.org, daCtx.key);

let ContinuationToken;

// The input prefix has a forward slash to prevent (drafts + drafts-new, etc.).
// Which means the list will only pickup children. This adds to the initial list.
const sourceKeys = [daCtx.key, `${daCtx.key}.props`];

do {
try {
const command = new ListObjectsV2Command({ ...input, ContinuationToken });
const resp = await client.send(command);

const { Contents = [], NextContinuationToken } = resp;
sourceKeys.push(...Contents.map(({ Key }) => Key));

await Promise.all(
new Array(1).fill(null).map(async () => {
while (sourceKeys.length) {
await deleteObject(client, daCtx, sourceKeys.pop(), env);
}
}),
);
try {
const { sourceKeys, continuationToken } = await listCommand(daCtx, details, client);
await Promise.all(sourceKeys.map(async (key) => {
await deleteObject(client, daCtx, key, env);
}));

ContinuationToken = NextContinuationToken;
} catch (e) {
// eslint-disable-next-line no-console
console.log(e);
return { body: '', status: 404 };
if (continuationToken) {
return { body: JSON.stringify({ continuationToken }), status: 206 };
}
} while (ContinuationToken);

return { body: null, status: 204 };
return { status: 204 };
} catch (e) {
return { body: '', status: 404 };
}
}
41 changes: 41 additions & 0 deletions src/storage/utils/list.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
* OF ANY KIND, either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/
import {
ListObjectsV2Command,
} from '@aws-sdk/client-s3';

export default function formatList(resp, daCtx) {
function compare(a, b) {
if (a.name < b.name) return -1;
Expand Down Expand Up @@ -69,3 +73,40 @@

return combined.sort(compare);
}

Check warning on line 76 in src/storage/utils/list.js

View check run for this annotation

Codecov / codecov/patch

src/storage/utils/list.js#L76

Added line #L76 was not covered by tests
function buildInput(org, key) {
return {
Bucket: `${org}-content`,
Prefix: `${key}/`,
MaxKeys: 300,
};
}

/**
* Lists a files in a bucket under the specified key.
* @param {DaCtx} daCtx the DA Context
* @param {Object} details contains any prevous Continuation token
* @param s3client
* @return {Promise<{sourceKeys: String[], continuationToken: String}>}
*/
export async function listCommand(daCtx, details, s3client) {
// There's no need to use the list command if the item has an extension
if (daCtx.ext) return { sourceKeys: [daCtx.key] };

const input = buildInput(daCtx.org, daCtx.key);
const { continuationToken } = details;

Check warning on line 98 in src/storage/utils/list.js

View check run for this annotation

Codecov / codecov/patch

src/storage/utils/list.js#L98

Added line #L98 was not covered by tests
// The input prefix has a forward slash to prevent (drafts + drafts-new, etc.).
// Which means the list will only pickup children. This adds to the initial list.
const sourceKeys = [];
if (!continuationToken) sourceKeys.push(daCtx.key, `${daCtx.key}.props`);

const commandInput = { ...input, ContinuationToken: continuationToken };
const command = new ListObjectsV2Command(commandInput);
const resp = await s3client.send(command);

const { Contents = [], NextContinuationToken } = resp;
sourceKeys.push(...Contents.map(({ Key }) => Key));

return { sourceKeys, continuationToken: NextContinuationToken };
}
Loading
Loading