Skip to content

Commit

Permalink
Added checks for disconnections
Browse files Browse the repository at this point in the history
  • Loading branch information
Kinerius committed Apr 9, 2024
1 parent 4830132 commit 31ba51c
Showing 1 changed file with 53 additions and 44 deletions.
97 changes: 53 additions & 44 deletions src/commands/queue-ab-conversion-snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { CliError } from "../bin"
import { assert } from "../helpers/assert"
import { queueConversions} from "../helpers/asset-bundles"
import { StringDecoder } from "string_decoder"
import { exit } from "process"

// PROCESS AN ENTIRE SNAPSHOT

Expand Down Expand Up @@ -90,6 +91,7 @@ export default async () => {
let argStartPosition = args['--start-position']
let argGrep = args['--grep']
let shouldSkipUntilStartPosition = argStartPosition || false
let lastEntity = ""

for (let i = 0; i < snapshotsCount; i++)
{
Expand Down Expand Up @@ -117,18 +119,9 @@ export default async () => {
}

if (startDate <= entity.entityTimestamp && entity.entityType == snapshot) {
await queueConversions(abServers, {
entity: {
entityId: entity.entityId, authChain: [
{
type: AuthLinkType.SIGNER,
payload: '0x0000000000000000000000000000000000000000',
signature: ''
}
]
}, contentServerUrls: [contentUrl]
}, token)

await tryRetryQueueConversion(abServers, entity.entityId, contentUrl, token)

console.log(` (${i+1}/${snapshotsCount}) [${percent}%]`, entity.entityId, entity.pointers[0])
}
} catch (error) {
Expand Down Expand Up @@ -161,23 +154,12 @@ const processWorlds = async (abServers : string[], token:string) => {

for (let j = 0; j < scenes.length; j++)
{
const scene = scenes[j]

console.log(`> [${percent}%]`, name, scene.id)

await queueConversions(abServers, {
entity: {
entityId: scene.id, authChain: [
{
type: AuthLinkType.SIGNER,
payload: '0x0000000000000000000000000000000000000000',
signature: ''
}
]
}, contentServerUrls: [worldsContentUrl]
}, token)
const scene = scenes[j]

console.log(`> [${percent}%]`, name, scene.id)

await tryRetryQueueConversion(abServers, scene.id, worldsContentUrl, token)
}

}
};

Expand Down Expand Up @@ -211,32 +193,54 @@ const processWorlds = async (abServers : string[], token:string) => {
const percent = (100 * ((j+1) / scenes.length)).toFixed(2)
console.log(`> [${percent}%]`, world.name, scene.id)

await queueConversions(abServers, {
entity: {
entityId: scene.id, authChain: [
{
type: AuthLinkType.SIGNER,
payload: '0x0000000000000000000000000000000000000000',
signature: ''
}
]
}, contentServerUrls: [worldsContentUrl]
}, token)
await tryRetryQueueConversion(abServers, scene.id, worldsContentUrl, token)
}
}
}

};

const processSnapshot = async (url:any, processLine: (line:string, index:number) => Promise<void>) => {
const tryRetryQueueConversion = async(abServers:string[], entityId:string, contentUrl: string, token:string, retryCount:number = 0 ) => {
if (retryCount > 3)
{
console.log(`> ${abServers} ${entityId} retry count exceeded, please check your connection.`)
exit(1)
}
try {
await queueConversions(abServers, {
entity: {
entityId: entityId, authChain: [
{
type: AuthLinkType.SIGNER,
payload: '0x0000000000000000000000000000000000000000',
signature: ''
}
]
}, contentServerUrls: [contentUrl]
}, token)
} catch (error)
{
console.log(`> Unexpected error, retrying in 5 seconds...`)
await new Promise(f => setTimeout(f, 5000));
tryRetryQueueConversion(abServers, entityId, contentUrl, token, retryCount+1);
}
}

const processSnapshot = async (url:any, processLine: (line:string, index:number) => Promise<void>, retryCount : number = 0, skipToIndex : number = -1) => {
if (retryCount > 3)
{
console.error(`Failed downloading snapshot with url: ${url} after 3 retries.`);
return;
}

const decoder = new StringDecoder('utf8');
let remaining = '';

let index = 0
try {
const response = await fetch(url);
if (!response.ok) throw new CliError(`Invalid response from ${url}`)

let index = 0

for await (const data of response.body) {
const chunk = decoder.write(data);
const lines = (remaining + chunk).split('\n');
Expand All @@ -248,8 +252,11 @@ const processSnapshot = async (url:any, processLine: (line:string, index:number)
}

for (const line of lines) {
if (line.trim().startsWith('{')) {
await processLine(line, index)
if (skipToIndex < 0 || skipToIndex >= index)
{
if (line.trim().startsWith('{')) {
await processLine(line, index)
}
}
index++
process.stdout.write(index + '\r')
Expand All @@ -263,6 +270,8 @@ const processSnapshot = async (url:any, processLine: (line:string, index:number)
}

} catch (error) {
console.error(`Failed to download file: ${error}`);
console.error(`Failed downloading snapshot ${url}: ${error} \n - Retrying in 5 seconds...`);
await new Promise(f => setTimeout(f, 5000));
await processSnapshot(url, processLine, retryCount+1, index)
}
};

0 comments on commit 31ba51c

Please sign in to comment.