From 5f5d9c0b463db129bf250afca8a373c7c63ee7e9 Mon Sep 17 00:00:00 2001 From: Bart Date: Thu, 29 Dec 2022 11:37:40 +0100 Subject: [PATCH] Wait on blockchain processing completion when polling (#156) * wait on processing completion * increase polling frequency * log execution time less frequently * refactor to iterable polling, fix concurrency * start/stop scheduling in project lifecycle hooks --- backend/src/core/async-interval-scheduler.ts | 64 +++++++++++++++++++ .../src/flow/services/aggregator.service.ts | 18 ++++-- 2 files changed, 77 insertions(+), 5 deletions(-) create mode 100644 backend/src/core/async-interval-scheduler.ts diff --git a/backend/src/core/async-interval-scheduler.ts b/backend/src/core/async-interval-scheduler.ts new file mode 100644 index 00000000..a2f5d46e --- /dev/null +++ b/backend/src/core/async-interval-scheduler.ts @@ -0,0 +1,64 @@ +import { Logger } from "@nestjs/common"; + +const logger = new Logger("Utils"); + +export type ExecutableCallback = () => Promise; + +export type AsyncIntervalOptions = { + functionToExecute: ExecutableCallback; + name: string; + intervalInMs: number; +}; + +/** + * Runs the provided function in interval. + * Waits for promise completion until rerunning interval. + */ +export class AsyncIntervalScheduler { + private isRunning: boolean; + private runningTimeoutId: NodeJS.Timeout; + private readonly options: AsyncIntervalOptions; + + constructor(options: AsyncIntervalOptions) { + this.options = options; + this.isRunning = false; + } + + async start(): Promise { + if (this.isRunning) { + return; + } + this.isRunning = true; + await this.pollIfRunning(); + } + + stop(): void { + clearTimeout(this.runningTimeoutId); + this.isRunning = false; + } + + private async pollIfRunning() { + const { intervalInMs } = this.options; + while (this.isRunning) { + await this.executeCallback(); + await new Promise((resolve) => { + this.runningTimeoutId = setTimeout(resolve, intervalInMs); + }); + } + } + + private async executeCallback() { + const { functionToExecute, name, intervalInMs } = this.options; + try { + const startTime = new Date(); + await functionToExecute(); + const endTime = new Date(); + const runTimeInMs = endTime.getTime() - startTime.getTime(); + if (runTimeInMs > intervalInMs) { + logger.debug(`${name} took ${runTimeInMs}ms`); + } + } catch (e) { + logger.error(`${name} failed`, e); + } + } +} diff --git a/backend/src/flow/services/aggregator.service.ts b/backend/src/flow/services/aggregator.service.ts index e1c8c949..7eb29ede 100644 --- a/backend/src/flow/services/aggregator.service.ts +++ b/backend/src/flow/services/aggregator.service.ts @@ -1,5 +1,4 @@ import { Injectable, Logger } from "@nestjs/common"; -import { Interval } from "@nestjs/schedule"; import { FlowBlock, FlowCollection, @@ -43,6 +42,7 @@ import { } from "../../processes/managed-process.entity"; import { CommonService } from "../../core/services/common.service"; import { FlowEmulatorService } from "./emulator.service"; +import { AsyncIntervalScheduler } from "../../core/async-interval-scheduler"; type BlockData = { block: FlowBlock; @@ -70,6 +70,8 @@ export class FlowAggregatorService implements ProjectContextLifecycle { private projectContext: ProjectEntity | undefined; private emulatorProcess: ManagedProcessEntity | undefined; private readonly logger = new Logger(FlowAggregatorService.name); + private readonly processingIntervalMs = 500; + private processingScheduler: AsyncIntervalScheduler; constructor( private blockService: BlocksService, @@ -85,9 +87,16 @@ export class FlowAggregatorService implements ProjectContextLifecycle { private configService: FlowConfigService, private processManagerService: ProcessManagerService, private commonService: CommonService - ) {} + ) { + this.processingScheduler = new AsyncIntervalScheduler({ + name: "Blockchain processing", + intervalInMs: this.processingIntervalMs, + functionToExecute: this.processBlockchainData.bind(this), + }); + } onEnterProjectContext(project: ProjectEntity): void { + this.processingScheduler?.start(); this.projectContext = project; this.emulatorProcess = this.processManagerService.get( FlowEmulatorService.processId @@ -103,6 +112,7 @@ export class FlowAggregatorService implements ProjectContextLifecycle { } onExitProjectContext(): void { + this.processingScheduler?.stop(); this.projectContext = undefined; this.processManagerService.removeListener( ProcessManagerEvent.PROCESS_ADDED, @@ -151,9 +161,7 @@ export class FlowAggregatorService implements ProjectContextLifecycle { return latestUnprocessedBlockHeight - (nextBlockHeightToProcess - 1); } - // TODO(milestone-x): Next interval shouldn't start before this function resolves - @Interval(1000) - async fetchDataFromDataSource(): Promise { + async processBlockchainData(): Promise { if (!this.projectContext) { return; }