From 31ba51c130c45d9909f1eca024b82676e266f410 Mon Sep 17 00:00:00 2001 From: Patricio Guerra Date: Tue, 9 Apr 2024 09:06:52 -0300 Subject: [PATCH] Added checks for disconnections --- src/commands/queue-ab-conversion-snapshot.ts | 97 +++++++++++--------- 1 file changed, 53 insertions(+), 44 deletions(-) diff --git a/src/commands/queue-ab-conversion-snapshot.ts b/src/commands/queue-ab-conversion-snapshot.ts index 1b44dfc..dd91389 100644 --- a/src/commands/queue-ab-conversion-snapshot.ts +++ b/src/commands/queue-ab-conversion-snapshot.ts @@ -5,6 +5,7 @@ import { CliError } from "../bin" import { assert } from "../helpers/assert" import { queueConversions} from "../helpers/asset-bundles" import { StringDecoder } from "string_decoder" +import { exit } from "process" // PROCESS AN ENTIRE SNAPSHOT @@ -90,6 +91,7 @@ export default async () => { let argStartPosition = args['--start-position'] let argGrep = args['--grep'] let shouldSkipUntilStartPosition = argStartPosition || false + let lastEntity = "" for (let i = 0; i < snapshotsCount; i++) { @@ -117,18 +119,9 @@ export default async () => { } if (startDate <= entity.entityTimestamp && entity.entityType == snapshot) { - await queueConversions(abServers, { - entity: { - entityId: entity.entityId, authChain: [ - { - type: AuthLinkType.SIGNER, - payload: '0x0000000000000000000000000000000000000000', - signature: '' - } - ] - }, contentServerUrls: [contentUrl] - }, token) + await tryRetryQueueConversion(abServers, entity.entityId, contentUrl, token) + console.log(` (${i+1}/${snapshotsCount}) [${percent}%]`, entity.entityId, entity.pointers[0]) } } catch (error) { @@ -161,23 +154,12 @@ const processWorlds = async (abServers : string[], token:string) => { for (let j = 0; j < scenes.length; j++) { - const scene = scenes[j] - - console.log(`> [${percent}%]`, name, scene.id) - - await queueConversions(abServers, { - entity: { - entityId: scene.id, authChain: [ - { - type: AuthLinkType.SIGNER, - payload: '0x0000000000000000000000000000000000000000', - signature: '' - } - ] - }, contentServerUrls: [worldsContentUrl] - }, token) + const scene = scenes[j] + + console.log(`> [${percent}%]`, name, scene.id) + + await tryRetryQueueConversion(abServers, scene.id, worldsContentUrl, token) } - } }; @@ -211,32 +193,54 @@ const processWorlds = async (abServers : string[], token:string) => { const percent = (100 * ((j+1) / scenes.length)).toFixed(2) console.log(`> [${percent}%]`, world.name, scene.id) - await queueConversions(abServers, { - entity: { - entityId: scene.id, authChain: [ - { - type: AuthLinkType.SIGNER, - payload: '0x0000000000000000000000000000000000000000', - signature: '' - } - ] - }, contentServerUrls: [worldsContentUrl] - }, token) + await tryRetryQueueConversion(abServers, scene.id, worldsContentUrl, token) } } } }; -const processSnapshot = async (url:any, processLine: (line:string, index:number) => Promise) => { +const tryRetryQueueConversion = async(abServers:string[], entityId:string, contentUrl: string, token:string, retryCount:number = 0 ) => { + if (retryCount > 3) + { + console.log(`> ${abServers} ${entityId} retry count exceeded, please check your connection.`) + exit(1) + } + try { + await queueConversions(abServers, { + entity: { + entityId: entityId, authChain: [ + { + type: AuthLinkType.SIGNER, + payload: '0x0000000000000000000000000000000000000000', + signature: '' + } + ] + }, contentServerUrls: [contentUrl] + }, token) + } catch (error) + { + console.log(`> Unexpected error, retrying in 5 seconds...`) + await new Promise(f => setTimeout(f, 5000)); + tryRetryQueueConversion(abServers, entityId, contentUrl, token, retryCount+1); + } +} + +const processSnapshot = async (url:any, processLine: (line:string, index:number) => Promise, retryCount : number = 0, skipToIndex : number = -1) => { + if (retryCount > 3) + { + console.error(`Failed downloading snapshot with url: ${url} after 3 retries.`); + return; + } + const decoder = new StringDecoder('utf8'); let remaining = ''; - + let index = 0 try { const response = await fetch(url); if (!response.ok) throw new CliError(`Invalid response from ${url}`) - let index = 0 + for await (const data of response.body) { const chunk = decoder.write(data); const lines = (remaining + chunk).split('\n'); @@ -248,8 +252,11 @@ const processSnapshot = async (url:any, processLine: (line:string, index:number) } for (const line of lines) { - if (line.trim().startsWith('{')) { - await processLine(line, index) + if (skipToIndex < 0 || skipToIndex >= index) + { + if (line.trim().startsWith('{')) { + await processLine(line, index) + } } index++ process.stdout.write(index + '\r') @@ -263,6 +270,8 @@ const processSnapshot = async (url:any, processLine: (line:string, index:number) } } catch (error) { - console.error(`Failed to download file: ${error}`); + console.error(`Failed downloading snapshot ${url}: ${error} \n - Retrying in 5 seconds...`); + await new Promise(f => setTimeout(f, 5000)); + await processSnapshot(url, processLine, retryCount+1, index) } }; \ No newline at end of file