diff --git a/src/streamed/index.ts b/src/streamed/index.ts index 99f1828b5..80c04eec6 100644 --- a/src/streamed/index.ts +++ b/src/streamed/index.ts @@ -83,9 +83,10 @@ export const DEFAULT_ORDERBOOK_OPTIONS: OrderbookOptions = { * account state. */ export class StreamedOrderbook { - private readonly state: AuctionState; private batch = -1; - private pendingEvents: AnyEvent[] = []; + + private readonly confirmedState: AuctionState; + private latestState?: AuctionState; private invalidState?: InvalidAuctionStateError; @@ -95,7 +96,7 @@ export class StreamedOrderbook { private readonly startBlock: number, private readonly options: OrderbookOptions, ) { - this.state = new AuctionState(options) + this.confirmedState = new AuctionState(options) } /** @@ -132,7 +133,10 @@ export class StreamedOrderbook { * Retrieves the current open orders in the orderbook. */ public getOpenOrders(): IndexedOrder[] { - return this.state.getOrders(this.batch) + this.throwOnInvalidState() + + const state = this.latestState ?? this.confirmedState + return state.getOrders(this.batch) } /** @@ -158,7 +162,7 @@ export class StreamedOrderbook { const events = await this.getPastEvents({ fromBlock, toBlock }) this.options.logger?.debug(`applying ${events.length} past events`) - this.state.applyEvents(events) + this.confirmedState.applyEvents(events) } this.batch = await this.getBatchId(endBlock) } @@ -167,6 +171,9 @@ export class StreamedOrderbook { * Apply new confirmed events to the account state and store the remaining * events that are subject to reorgs into the `pendingEvents` array. * + * @returns The block number up until which the streamed orderbook is up to + * date + * * @remarks * If there is an error retrieving the latest events from the node, then the * account state remains unmodified. This allows the updating orderbook to be @@ -175,35 +182,57 @@ export class StreamedOrderbook { * then the streamed orderbook becomes invalid and can no longer apply new * events as the actual auction state is unknown. */ - public async update(): Promise { - if (this.invalidState) { - throw this.invalidState - } + public async update(): Promise { + this.throwOnInvalidState() - const fromBlock = this.state.nextBlock + const fromBlock = this.confirmedState.nextBlock this.options.logger?.debug(`fetching new events from ${fromBlock}-latest`) const events = await this.getPastEvents({ fromBlock }) - const latestBlock = await this.web3.eth.getBlockNumber() + // NOTE: If the web3 instance is connected to nodes behind a load balancer, + // it is possible that the events were queried on a node that includes an + // additional block to the node that handled the query to the latest block + // number, so use the max of `latestEvents.last().blockNumber` and the + // queried latest block number. + const latestBlock = Math.max( + await this.web3.eth.getBlockNumber(), + events[events.length - 1]?.blockNumber ?? 0, + ) const confirmedBlock = latestBlock - this.options.blockConfirmations - const batch = await this.getBatchId(confirmedBlock) - - const confirmedEventCount = events.findIndex(ev => ev.blockNumber > confirmedBlock) - const confirmedEvents = events.splice(0, confirmedEventCount) - const pendingEvents = events - - if (confirmedEvents.length > 0) { - this.options.logger?.debug(`applying ${confirmedEvents.length} confirmed events until block ${confirmedBlock}`) - try { - this.state.applyEvents(confirmedEvents) - } catch (err) { - this.invalidState = new InvalidAuctionStateError(confirmedBlock, err) - this.options.logger?.error(this.invalidState.message) - throw this.invalidState - } + + this.batch = await this.getBatchId(latestBlock) + if (events.length === 0) { + return latestBlock + } + + const firstLatestEvent = events.findIndex(ev => ev.blockNumber > confirmedBlock) + const confirmedEventCount = firstLatestEvent !== -1 ? firstLatestEvent : events.length + const confirmedEvents = events.slice(0, confirmedEventCount) + const latestEvents = events.slice(confirmedEventCount) + + this.options.logger?.debug(`applying ${confirmedEvents.length} confirmed events until block ${confirmedBlock}`) + try { + this.confirmedState.applyEvents(confirmedEvents) + } catch (err) { + this.invalidState = new InvalidAuctionStateError(confirmedBlock, err) + this.options.logger?.error(this.invalidState.message) + throw this.invalidState } - this.batch = batch - this.pendingEvents = pendingEvents + + this.latestState = undefined + this.options.logger?.debug(`reapplying ${latestEvents.length} latest events until block ${latestBlock}`) + if (latestEvents.length > 0) { + // NOTE: Errors applying latest state are not considered fatal as we can + // still recover from them (since the confirmed state is still valid). If + // applying the latest events fails, just make sure that the `latestEvent` + // property is not set, so that the query methods fall back to using the + // confirmed state. + const newLatestState = this.confirmedState.copy() + newLatestState.applyEvents(latestEvents) + this.latestState = newLatestState + } + + return latestBlock } /** @@ -231,10 +260,25 @@ export class StreamedOrderbook { const BATCH_DURATION = 300 const block = await this.web3.eth.getBlock(blockNumber) - const batch = Math.floor(Number(block.timestamp) / BATCH_DURATION) + // NOTE: Pending or future blocks return null when queried, so approximate + // with system time + const timestamp = block?.timestamp ?? Date.now() + const batch = Math.floor(Number(timestamp) / BATCH_DURATION) return batch } + + /** + * Helper method to check for an unrecoverable invalid state in the current + * streamed orderbook. + * + * @throws If the streamed orderbook is in an invalid state. + */ + private throwOnInvalidState(): void { + if (this.invalidState) { + throw this.invalidState + } + } } /** diff --git a/src/streamed/state.ts b/src/streamed/state.ts index 69d79bf30..82054e675 100644 --- a/src/streamed/state.ts +++ b/src/streamed/state.ts @@ -114,6 +114,26 @@ export class AuctionState { private readonly options: OrderbookOptions, ) {} + /** + * Creates a copy of the auction state that can apply events independently + * without modifying the original state. + */ + public copy(): AuctionState { + const clone = new AuctionState(this.options) + clone.lastBlock = this.lastBlock + clone.tokens.push(...this.tokens) + for (const [user, account] of this.accounts.entries()) { + clone.accounts.set(user, { + balances: new Map(account.balances), + pendingWithdrawals: new Map(account.pendingWithdrawals), + orders: account.orders.map(order => ({ ...order })), + }) + } + clone.lastSolution = this.lastSolution + + return clone + } + /** * Create an object representation of the current account state for JSON * serialization. diff --git a/test/models/streamed/index.spec.ts b/test/models/streamed/index.spec.ts index 418b423b1..7d88bd6dd 100644 --- a/test/models/streamed/index.spec.ts +++ b/test/models/streamed/index.spec.ts @@ -28,15 +28,15 @@ describe("Streamed Orderbook", () => { "ETHEREUM_NODE_URL or INFURA_PROJECT_ID environment variable is required", ) const url = ETHEREUM_NODE_URL || `https://mainnet.infura.io/v3/${INFURA_PROJECT_ID}` + const endBlock = ORDERBOOK_END_BLOCK ? parseInt(ORDERBOOK_END_BLOCK) : undefined + const web3 = new Web3(url) const [viewer] = await deployment(web3, BatchExchangeViewerArtifact) - const endBlock = ORDERBOOK_END_BLOCK ? - parseInt(ORDERBOOK_END_BLOCK) : - await web3.eth.getBlockNumber() - console.debug("==> building streamed orderbook...") const orderbook = await StreamedOrderbook.init(web3, { endBlock, strict: true }) + const targetBlock = endBlock ?? await orderbook.update() + const streamedOrders = orderbook.getOpenOrders().map(order => ({ ...order, sellTokenBalance: new BN(order.sellTokenBalance.toString()), @@ -46,7 +46,7 @@ describe("Streamed Orderbook", () => { })) console.debug("==> querying onchain orderbook...") - const queriedOrders = await getOpenOrders(viewer, 300, endBlock) + const queriedOrders = await getOpenOrders(viewer, 300, targetBlock) console.debug("==> comparing orderbooks...") function toDiffableOrders(orders: IndexedOrder[]): Record> { diff --git a/test/models/streamed/state.spec.ts b/test/models/streamed/state.spec.ts index c2def530f..c5fe9b945 100644 --- a/test/models/streamed/state.spec.ts +++ b/test/models/streamed/state.spec.ts @@ -531,4 +531,100 @@ describe("Account State", () => { ])).to.throw() }) }) + + describe("copy", () => { + it("Does not modify the orginal state.", () => { + const state = auctionState() + state.applyEvents([ + event(1, "TokenListing", { id: "0", token: addr(0) }), + event(2, "Deposit", { + user: addr(0), + token: addr(0), + amount: "100000", + batchId: "1", + }), + event(3, "OrderPlacement", { + owner: addr(0), + index: "0", + buyToken: "0", + sellToken: "0", + validFrom: "0", + validUntil: "9999", + priceNumerator: "100000", + priceDenominator: "100000", + }), + event(4, "SolutionSubmission", { + submitter: addr(0), + burntFees: "10000", + utility: "unused", + disregardedUtility: "unused", + lastAuctionBurntFees: "unsued", + prices: ["unsued"], + tokenIdsForPrice: ["unsued"], + }), + event(5, "WithdrawRequest", { + user: addr(0), + token: addr(0), + amount: "100000", + batchId: "1", + }), + ]) + const original = state.toJSON() + + const copy = state.copy() + copy.applyEvents([ + // add a new token + event(10, "TokenListing", { id: "1", token: addr(1) }), + // add new balance + event(11, "Deposit", { + user: addr(0), + token: addr(0), + amount: "100000", + batchId: "1", + }), + // modify existing balance, clear existing pending withdrawal + event(12, "Withdraw", { + user: addr(0), + token: addr(0), + amount: "100000", + }), + // add new pending withdrawal + event(13, "WithdrawRequest", { + user: addr(0), + token: addr(0), + amount: "1000000", + batchId: "42", + }), + // modify existing order + event(14, "OrderCancellation", { + owner: addr(0), + id: "0", + }), + // add new order + event(15, "OrderPlacement", { + owner: addr(0), + index: "1", + buyToken: "0", + sellToken: "0", + validFrom: "0", + validUntil: "9999", + priceNumerator: "100000", + priceDenominator: "100000", + }), + // modify last solution, add new user + event(16, "SolutionSubmission", { + submitter: addr(1), + burntFees: "10000", + utility: "unused", + disregardedUtility: "unused", + lastAuctionBurntFees: "unsued", + prices: ["unsued"], + tokenIdsForPrice: ["unsued"], + }), + ]) + const after = state.toJSON() + + expect(original).to.deep.equal(after) + }) + }) })