diff --git a/src/components/dam/uploadQueue/api/damAssetApi.ts b/src/components/dam/uploadQueue/api/damAssetApi.ts new file mode 100644 index 00000000..d8491484 --- /dev/null +++ b/src/components/dam/uploadQueue/api/damAssetApi.ts @@ -0,0 +1,11 @@ +import type { AxiosInstance } from 'axios' +import type { DocId } from '@/types/common' +import { SYSTEM_CORE_DAM } from '@/services/api/coreDam/assetApi' +import type { AssetDetailItemDto } from '@/types/coreDam/Asset' +import { apiFetchOne } from '@/services/api/apiFetchOne' + +const END_POINT = '/adm/v1/asset' +const ENTITY = 'asset' + +export const fetchAsset = (client: () => AxiosInstance, id: DocId) => + apiFetchOne(client, END_POINT + '/:id', { id }, SYSTEM_CORE_DAM, ENTITY) diff --git a/src/components/dam/uploadQueue/api/damImageApi.ts b/src/components/dam/uploadQueue/api/damImageApi.ts new file mode 100644 index 00000000..7f21cf70 --- /dev/null +++ b/src/components/dam/uploadQueue/api/damImageApi.ts @@ -0,0 +1,109 @@ +import type { AxiosInstance } from 'axios' +import { type UploadQueueItem, UploadQueueItemType } from '@/types/coreDam/UploadQueue' +import type { DocId } from '@/types/common' +import { HTTP_STATUS_CREATED, HTTP_STATUS_OK } from '@/composables/statusCodes' +import { damFileTypeFix } from '@/components/file/composables/fileType' + +const END_POINT = '/adm/v1/image' +const CHUNK_UPLOAD_TIMEOUT = 420 + +export const imageUploadStart = (client: () => AxiosInstance, item: UploadQueueItem) => { + return new Promise((resolve, reject) => { + let url = END_POINT + '/licence/' + item.licenceId + if (item.type === UploadQueueItemType.SlotFile && item.slotName && item.assetId) { + url = END_POINT + '/asset/' + item.assetId + '/slot-name/' + item.slotName + } + client() + .post( + url, + JSON.stringify({ + mimeType: damFileTypeFix(item.file), + size: item.file?.size, + }) + ) + .then((res) => { + if (res.status === HTTP_STATUS_CREATED) { + resolve(res.data) + } else { + // + reject() + } + }) + .catch((err) => { + // + reject(err) + }) + }) +} + +export const imageUploadChunk = ( + client: (timeout?: number) => AxiosInstance, + item: UploadQueueItem, + imageId: DocId, + buffer: string, + size: number, + offset: number, + onUploadProgressCallback: ((progressEvent: any) => void) | undefined = undefined +) => { + return new Promise((resolve, reject) => { + const formData = new FormData() + const url = END_POINT + '/' + imageId + '/chunk' + formData.append('file', buffer) + formData.append( + 'chunk', + JSON.stringify({ + offset: offset, + size: size, + }) + ) + + client(CHUNK_UPLOAD_TIMEOUT) + .post(url, formData, { + cancelToken: + item.chunks[item.currentChunkIndex] && item.chunks[item.currentChunkIndex].cancelTokenSource + ? item.chunks[item.currentChunkIndex].cancelTokenSource.token + : undefined, + headers: { + 'Content-Type': 'multipart/form-data', + }, + onUploadProgress: onUploadProgressCallback, + }) + .then((res) => { + if (res.status === HTTP_STATUS_CREATED) { + resolve(res.data) + } else { + // + reject() + } + }) + .catch((err) => { + // + reject(err) + }) + }) +} + +export const imageUploadFinish = (client: () => AxiosInstance, item: UploadQueueItem, sha: string) => { + return new Promise((resolve, reject) => { + const url = END_POINT + '/' + item.fileId + '/uploaded' + client() + .patch( + url, + JSON.stringify({ + checksum: sha, + }) + ) + .then((res) => { + if (res.status === HTTP_STATUS_OK) { + resolve(res.data) + } else { + // + reject() + } + }) + .catch((err) => { + // + reject(err) + }) + }) +} diff --git a/src/components/dam/uploadQueue/api/uploadApi.ts b/src/components/dam/uploadQueue/api/uploadApi.ts new file mode 100644 index 00000000..b6e0dc2f --- /dev/null +++ b/src/components/dam/uploadQueue/api/uploadApi.ts @@ -0,0 +1,107 @@ +import { DamAssetType } from '@/types/coreDam/Asset' +import { type DamUploadStartResponse, type UploadQueueItem, UploadQueueItemStatus } from '@/types/coreDam/UploadQueue' +import { imageUploadChunk, imageUploadFinish, imageUploadStart } from '@/components/dam/uploadQueue/api/damImageApi' +import type { AxiosInstance } from 'axios' +import type { DocId } from '@/types/common' +import { AssetFileProcessStatus } from '@/types/coreDam/AssetFile' +import { fetchAsset } from '@/components/dam/uploadQueue/api/damAssetApi' +import { useUploadQueuesStore } from '@/components/dam/uploadQueue/uploadQueuesStore' + +const NOTIFICATION_FALLBACK_TIMER_CHECK_SECONDS = 10 +const NOTIFICATION_FALLBACK_MAX_TRIES = 3 + +export const damUploadStart: (client: () => AxiosInstance, item: UploadQueueItem) => Promise = ( + client: () => AxiosInstance, + item: UploadQueueItem +) => { + return new Promise((resolve, reject) => { + if (item.assetType !== DamAssetType.Image) { + reject() + return + } + imageUploadStart(client, item) + .then((res) => { + resolve(res as DamUploadStartResponse) + return + }) + .catch((err) => reject(err)) + }) +} + +export const damUploadChunk = ( + client: () => AxiosInstance, + item: UploadQueueItem, + imageId: DocId, + buffer: string, + size: number, + offset: number, + onUploadProgressCallback: any +) => { + return new Promise((resolve, reject) => { + if (item.assetType !== DamAssetType.Image) { + reject() + return + } + imageUploadChunk(client, item, imageId, buffer, size, offset, onUploadProgressCallback) + .then((res) => { + resolve(res) + }) + .catch((err) => { + reject(err) + }) + }) +} + +export const damUploadFinish = ( + client: () => AxiosInstance, + item: UploadQueueItem, + sha: string, + uploadStatusFallback: boolean +) => { + return new Promise((resolve, reject) => { + if (item.assetType !== DamAssetType.Image) { + reject() + return + } + imageUploadFinish(client, item, sha) + .then((res) => { + item.status = UploadQueueItemStatus.Processing + if (uploadStatusFallback) { + item.notificationFallbackTimer = setTimeout(function () { + notificationFallbackCallback(client, item) + }, calculateFallbackTime(item)) + } + resolve(res) + }) + .catch((err) => reject(err)) + }) +} + +function calculateFallbackTime(item: UploadQueueItem) { + return NOTIFICATION_FALLBACK_TIMER_CHECK_SECONDS * 1000 * item.notificationFallbackTry * item.notificationFallbackTry +} + +async function notificationFallbackCallback(client: () => AxiosInstance, item: UploadQueueItem) { + clearTimeout(item.notificationFallbackTimer) + if (item.status === UploadQueueItemStatus.Uploaded) return + if (item.notificationFallbackTry > NOTIFICATION_FALLBACK_MAX_TRIES) return + if (!item.assetId) return + const asset = await fetchAsset(client, item.assetId) + if (asset && asset.mainFile && asset.mainFile.fileAttributes) { + const uploadQueuesStore = useUploadQueuesStore() + if (asset.mainFile.fileAttributes.status === AssetFileProcessStatus.Processed) { + uploadQueuesStore.queueItemProcessed(asset.id) + return + } else if (asset.mainFile.fileAttributes.status === AssetFileProcessStatus.Duplicate) { + uploadQueuesStore.queueItemDuplicate(asset.id) + return + } else if (asset.mainFile.fileAttributes.status === AssetFileProcessStatus.Failed) { + uploadQueuesStore.queueItemFailed(asset.id, asset.mainFile.fileAttributes.failReason) + return + } + } + item.notificationFallbackTry++ + item.notificationFallbackTimer = setTimeout(function () { + notificationFallbackCallback(client, item) + }, calculateFallbackTime(item)) +} diff --git a/src/components/dam/uploadQueue/uploadQueuesStore.ts b/src/components/dam/uploadQueue/uploadQueuesStore.ts index a2ff975a..bc74388e 100644 --- a/src/components/dam/uploadQueue/uploadQueuesStore.ts +++ b/src/components/dam/uploadQueue/uploadQueuesStore.ts @@ -12,6 +12,8 @@ import { import { useUploadQueueItemFactory } from '@/components/dam/uploadQueue/UploadQueueItemFactory' import { getAssetTypeByMimeType } from '@/components/dam/uploadQueue/mimeTypeHelper' import { useDamConfigState } from '@/components/dam/uploadQueue/damConfigState' +import { useUpload } from '@/components/dam/uploadQueue/uploadService' +import { DamAssetType } from '@/types/coreDam/Asset' const QUEUE_MAX_PARALLEL_UPLOADS = 2 const QUEUE_CHUNK_SIZE = 10485760 @@ -39,7 +41,7 @@ export const useUploadQueuesStore = defineStore('commonUploadQueuesStore', () => const { damConfigExtSystem } = useDamConfigState() for await (const file of files) { const type = getAssetTypeByMimeType(damFileTypeFix(file), damConfigExtSystem.value) - if (!type) continue + if (!type || type !== DamAssetType.Image) continue // only image now const queueItem = createDefault( 'file_' + file.name, UploadQueueItemType.File, @@ -107,19 +109,19 @@ export const useUploadQueuesStore = defineStore('commonUploadQueuesStore', () => } async function queueItemUploadStart(item: UploadQueueItem, queueId: string) { - // const { upload, uploadInit } = useUpload(item, (progress: number, speed: number, estimate: number) => { - // setUploadSpeed(item, progress, speed, estimate) - // }) - // try { - // await uploadInit() - // await upload() - // processUpload(queueId) - // } catch (e) { - // item.error.hasError = true - // item.status = UploadQueueItemStatus.Failed - // recalculateQueueCounts(queueId) - // processUpload(queueId) - // } + const { upload, uploadInit } = useUpload(item, (progress: number, speed: number, estimate: number) => { + setUploadSpeed(item, progress, speed, estimate) + }) + try { + await uploadInit() + await upload() + processUpload(queueId) + } catch (e) { + item.error.hasError = true + item.status = UploadQueueItemStatus.Failed + recalculateQueueCounts(queueId) + processUpload(queueId) + } } function setUploadSpeed(item: UploadQueueItem, progress: number, speed: number, estimate: number) { @@ -128,9 +130,16 @@ export const useUploadQueuesStore = defineStore('commonUploadQueuesStore', () => item.progress.speed = speed } + function queueItemProcessed () {} + function queueItemDuplicate () {} + function queueItemFailed () {} + return { getQueue, getQueueItems, addByFiles, + queueItemProcessed, + queueItemDuplicate, + queueItemFailed, } }) diff --git a/src/components/dam/uploadQueue/uploadService.ts b/src/components/dam/uploadQueue/uploadService.ts new file mode 100644 index 00000000..accfe978 --- /dev/null +++ b/src/components/dam/uploadQueue/uploadService.ts @@ -0,0 +1,247 @@ +import { inject, ref } from 'vue' +import sha1 from 'js-sha1' +import type { CancelTokenSource } from 'axios' +import axios from 'axios' +import { i18n } from '@/plugins/i18n' +import { + type AnzuApiValidationResponseData, + axiosErrorResponseHasValidationData, +} from '@/model/error/AnzuApiValidationError' +import { type UploadQueueItem, UploadQueueItemStatus } from '@/types/coreDam/UploadQueue' +import { NEW_LINE_MARK } from '@/composables/system/alerts' +import { isUndefined } from '@/utils/common' +import { useDamUploadChunkSize } from '@/components/dam/uploadQueue/damUploadChunkSize' +import type { CommonAdminCoreDamOptions } from '@/AnzuSystemsCommonAdmin' +import { CoreDamOptions } from '@/components/injectionKeys' +import { damUploadChunk, damUploadFinish, damUploadStart } from '@/components/dam/uploadQueue/api/uploadApi' +import { useCoreDamOptions } from '@/components/dam/assetSelect/composables/coreDamOptions' + +// const CHUNK_MAX_RETRY = 6 +const CHUNK_MAX_RETRY = 4 +const SPEED_CHECK_INTERVAL = 1000 +const CHUNK_RETRY_INTERVAL = 1000 +const CHUNK_RETRY_MULTIPLY = 3 + +const failUpload = async (queueItem: UploadQueueItem, error: unknown = null) => { + throw error +} + +const finishUpload = async (queueItem: UploadQueueItem, sha: string) => { + const { client } = useCoreDamOptions() + const coreDamOptions = inject(CoreDamOptions, undefined) + return await damUploadFinish(client, queueItem, sha, coreDamOptions?.uploadStatusFallback ?? false) +} + +const handleValidationErrorMessage = (error: Error | any) => { + const { t } = i18n.global || i18n + if (!error || !error.response || !error.response.data) { + // @ts-ignore + return t('system.uploadErrors.unknownError') + } + const data = error.response.data as AnzuApiValidationResponseData + const errorMessages: string[] = [] + for (const [key, values] of Object.entries(data.fields)) { + switch (key) { + case 'size': + errorMessages.push(t('system.uploadErrors.size')) + break + case 'offset': + errorMessages.push(t('system.uploadErrors.offset')) + break + case 'mimeType': + errorMessages.push(t('system.uploadErrors.mimeType')) + break + default: + // @ts-ignore + errorMessages.push(t('system.uploadErrors.systemError') + ': ' + key + ' - ' + values.join(',')) + } + } + return errorMessages.length > 0 ? errorMessages.join(NEW_LINE_MARK) : t('system.uploadErrors.unknownError') +} + +const readFile = async (offset: number, size: number, file: File): Promise<{ data: string; offset: number }> => { + return new Promise((resolve, reject) => { + const partial = file.slice(offset, offset + size) + const reader = new FileReader() + reader.onload = function (e) { + if (e.target?.readyState === FileReader.DONE) { + resolve({ data: e.target.result as string, offset: offset }) + } + } + reader.onerror = function (e) { + reject(e) + } + reader.readAsArrayBuffer(partial) + }) +} + +const sleep = (ms: number) => { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + +export function useUpload(queueItem: UploadQueueItem, uploadCallback: any = undefined) { + const coreDamOptions = inject(CoreDamOptions, undefined) + const apiTimeout = coreDamOptions?.apiTimeout ?? 30 + const { client } = useCoreDamOptions() + const fileSize = ref(0) + + const progress = ref(0) + + let speedStack: any[] = [] + let lastTimestamp = 0 + let endTimestamp = 0 + let lastLoaded = 0 + const assetAlgo = sha1.create() + const { updateChunkSize, lastChunkSize } = useDamUploadChunkSize(apiTimeout) + + const getCurrentTimestamp = () => { + return Date.now() / 1000 + } + + // @ts-ignore + function progressCallback(progressEvent) { + const currentStamp = getCurrentTimestamp() + if (lastTimestamp === 0) { + lastTimestamp = currentStamp + + return + } + + const dataSent = lastLoaded > 0 ? progressEvent.loaded - lastLoaded : progressEvent.loaded + lastLoaded = progressEvent.total === progressEvent.loaded ? 0 : progressEvent.loaded + speedStack.push(dataSent / (currentStamp - lastTimestamp)) + + lastTimestamp = currentStamp + } + + const uploadChunk = async (chunkFile: File, offset: number) => { + return new Promise((resolve, reject) => { + if (!queueItem.fileId) { + reject() + return + } + damUploadChunk( + client, + queueItem, + queueItem.fileId, + chunkFile as unknown as string, // todo check + chunkFile.size, + offset, + progressCallback + ) + .then((result) => { + resolve(result) + }) + .catch((exception) => { + reject(exception) + }) + }) + } + + const processAndUploadChunk = async (offset: number): Promise => { + updateChunkSize(queueItem.progress.speed) + let arrayBuffer = await readFile(offset, lastChunkSize.value, queueItem.file!) + let chunkFile = new File([arrayBuffer.data], queueItem.file!.name) + + queueItem.currentChunkIndex = offset + const cancelToken = axios.CancelToken + queueItem.chunks[offset] = { cancelTokenSource: cancelToken.source() } + + let sleepTime = CHUNK_RETRY_INTERVAL + let attempt = 0 + do { + attempt++ + try { + await uploadChunk(chunkFile, offset) + assetAlgo.update(arrayBuffer.data) + + return chunkFile + } catch (error) { + // in error recompute + if (axiosErrorResponseHasValidationData(error as Error)) { + attempt = CHUNK_MAX_RETRY + queueItem.error.message = handleValidationErrorMessage(error) + return Promise.reject(error) + } + + if (updateChunkSize(queueItem.progress.speed)) { + arrayBuffer = await readFile(offset, lastChunkSize.value, queueItem.file!) + chunkFile = new File([arrayBuffer.data], queueItem.file!.name) + } + + await sleep(sleepTime) + attempt === CHUNK_MAX_RETRY - 1 ? (sleepTime = 1) : (sleepTime *= CHUNK_RETRY_MULTIPLY) + } + } while (attempt < CHUNK_MAX_RETRY) + return Promise.reject('Unable to upload chunk, max tries exceeded') + } + + function speedCheck() { + function speedCheckRun() { + speedStack = speedStack.slice(-15) + if (speedStack.length > 0) { + const avgSpeed = Math.ceil(speedStack.reduce((sum, current) => sum + current) / speedStack.length) + const remainingBytes = Math.ceil(fileSize.value * ((100 - progress.value) / 100)) + + uploadCallback(progress.value, avgSpeed, Math.ceil(remainingBytes / avgSpeed)) + } + + if (endTimestamp === 0) { + setTimeout(function () { + speedCheckRun() + }, SPEED_CHECK_INTERVAL) + } + } + + speedCheckRun() + } + + const uploadInit = async () => { + return new Promise((resolve, reject) => { + if (!queueItem.file) { + failUpload(queueItem) + return + } + fileSize.value = queueItem.file ? queueItem.file.size : 0 + queueItem.status = UploadQueueItemStatus.Uploading + damUploadStart(client, queueItem) + .then((res) => { + queueItem.assetId = res.asset + queueItem.fileId = res.id + resolve(queueItem) + }) + .catch((err) => { + reject(err) + }) + }) + } + + const upload = async () => { + if (uploadCallback) { + speedCheck() + } + + const filesize = queueItem.file?.size + if (isUndefined(filesize)) return Promise.reject() + + let i = 0 + while (i < filesize) { + const uploadedChunk = await processAndUploadChunk(i) + i += uploadedChunk.size + progress.value = (i / filesize) * 100 + } + + endTimestamp = Date.now() / 1000 + return await finishUpload(queueItem, assetAlgo.hex()) + } + + return { + uploadInit, + upload, + } +} + +export const uploadStop = (cancelTokenSource: CancelTokenSource) => { + // todo stop speed check + cancelTokenSource.cancel('axios request cancelled') +} diff --git a/src/index.d.ts b/src/index.d.ts new file mode 100644 index 00000000..b07cbfe6 --- /dev/null +++ b/src/index.d.ts @@ -0,0 +1,8 @@ +declare module 'js-sha1' { + interface Hasher { + update: (data: string) => this + hex: () => string + } + + declare function create(): Hasher +} diff --git a/src/lib.ts b/src/lib.ts index 47d82362..a22f574d 100644 --- a/src/lib.ts +++ b/src/lib.ts @@ -302,6 +302,7 @@ import { type UploadQueueItem, UploadQueueItemStatus, UploadQueueItemType, + type DamUploadStartResponse } from '@/types/coreDam/UploadQueue' import type { CustomDataAware, @@ -555,6 +556,7 @@ export { DamDistributionServiceType, DamDistributionStatus, UserAuthType, + DamUploadStartResponse, // FACTORIES useAnzuUserFactory, diff --git a/src/types/coreDam/UploadQueue.ts b/src/types/coreDam/UploadQueue.ts index 4644fe95..9585613a 100644 --- a/src/types/coreDam/UploadQueue.ts +++ b/src/types/coreDam/UploadQueue.ts @@ -83,3 +83,8 @@ export interface UploadQueueItem { notificationFallbackTry: number slotName: string | null } + +export interface DamUploadStartResponse { + id: DocId + asset: DocId +}