From 35a2ee28f8a1dacbb6ff2d436ef49c62e7114003 Mon Sep 17 00:00:00 2001 From: Nicholas Rodrigues Lordello Date: Wed, 6 May 2020 17:56:45 +0200 Subject: [PATCH] [Events Orderbook] Implement querying the orderbook at the latest block (#735) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR implements querying the latest open orderbook. This is implemented by keeping a separate "confirmed" auction state, built from events that are part of confirmed blocks, and not subject to reorgs, and a "latest" auction state that is built by cloning the "confirmed" state and applying events in between the confirmed block and the latest block. The "latest" state is implemented to be discardable and rebuilt on every update so that it is not prone to errors stemming from reorgs, nor does complex logic for rolling back events need to be implemented. Unfortunately, the current implementation is slightly inefficient. I created #734 as an issue to capture a potential solution to the problem. This closes #719 ### Test Plan Run the e2e test to ensure the event based orderbook on the latest block matches the onchain queried orderbook: ``` $ yarn test-streamed-orderbook Streamed Orderbook init ==> building streamed orderbook... ==> querying onchain orderbook... ==> comparing orderbooks... ✓ should successfully apply all events and match on-chain orderbook (287940ms) 1 passing (5m) ``` * implement querying the orderbook at the latest block * comments * add unit test for copy * fix array index logic * default to system time for future blocks --- src/streamed/index.ts | 102 +++++++++++++++++++++-------- src/streamed/state.ts | 20 ++++++ test/models/streamed/index.spec.ts | 10 +-- test/models/streamed/state.spec.ts | 96 +++++++++++++++++++++++++++ 4 files changed, 194 insertions(+), 34 deletions(-) diff --git a/src/streamed/index.ts b/src/streamed/index.ts index 99f1828b5..80c04eec6 100644 --- a/src/streamed/index.ts +++ b/src/streamed/index.ts @@ -83,9 +83,10 @@ export const DEFAULT_ORDERBOOK_OPTIONS: OrderbookOptions = { * account state. */ export class StreamedOrderbook { - private readonly state: AuctionState; private batch = -1; - private pendingEvents: AnyEvent[] = []; + + private readonly confirmedState: AuctionState; + private latestState?: AuctionState; private invalidState?: InvalidAuctionStateError; @@ -95,7 +96,7 @@ export class StreamedOrderbook { private readonly startBlock: number, private readonly options: OrderbookOptions, ) { - this.state = new AuctionState(options) + this.confirmedState = new AuctionState(options) } /** @@ -132,7 +133,10 @@ export class StreamedOrderbook { * Retrieves the current open orders in the orderbook. */ public getOpenOrders(): IndexedOrder[] { - return this.state.getOrders(this.batch) + this.throwOnInvalidState() + + const state = this.latestState ?? this.confirmedState + return state.getOrders(this.batch) } /** @@ -158,7 +162,7 @@ export class StreamedOrderbook { const events = await this.getPastEvents({ fromBlock, toBlock }) this.options.logger?.debug(`applying ${events.length} past events`) - this.state.applyEvents(events) + this.confirmedState.applyEvents(events) } this.batch = await this.getBatchId(endBlock) } @@ -167,6 +171,9 @@ export class StreamedOrderbook { * Apply new confirmed events to the account state and store the remaining * events that are subject to reorgs into the `pendingEvents` array. * + * @returns The block number up until which the streamed orderbook is up to + * date + * * @remarks * If there is an error retrieving the latest events from the node, then the * account state remains unmodified. This allows the updating orderbook to be @@ -175,35 +182,57 @@ export class StreamedOrderbook { * then the streamed orderbook becomes invalid and can no longer apply new * events as the actual auction state is unknown. */ - public async update(): Promise { - if (this.invalidState) { - throw this.invalidState - } + public async update(): Promise { + this.throwOnInvalidState() - const fromBlock = this.state.nextBlock + const fromBlock = this.confirmedState.nextBlock this.options.logger?.debug(`fetching new events from ${fromBlock}-latest`) const events = await this.getPastEvents({ fromBlock }) - const latestBlock = await this.web3.eth.getBlockNumber() + // NOTE: If the web3 instance is connected to nodes behind a load balancer, + // it is possible that the events were queried on a node that includes an + // additional block to the node that handled the query to the latest block + // number, so use the max of `latestEvents.last().blockNumber` and the + // queried latest block number. + const latestBlock = Math.max( + await this.web3.eth.getBlockNumber(), + events[events.length - 1]?.blockNumber ?? 0, + ) const confirmedBlock = latestBlock - this.options.blockConfirmations - const batch = await this.getBatchId(confirmedBlock) - - const confirmedEventCount = events.findIndex(ev => ev.blockNumber > confirmedBlock) - const confirmedEvents = events.splice(0, confirmedEventCount) - const pendingEvents = events - - if (confirmedEvents.length > 0) { - this.options.logger?.debug(`applying ${confirmedEvents.length} confirmed events until block ${confirmedBlock}`) - try { - this.state.applyEvents(confirmedEvents) - } catch (err) { - this.invalidState = new InvalidAuctionStateError(confirmedBlock, err) - this.options.logger?.error(this.invalidState.message) - throw this.invalidState - } + + this.batch = await this.getBatchId(latestBlock) + if (events.length === 0) { + return latestBlock + } + + const firstLatestEvent = events.findIndex(ev => ev.blockNumber > confirmedBlock) + const confirmedEventCount = firstLatestEvent !== -1 ? firstLatestEvent : events.length + const confirmedEvents = events.slice(0, confirmedEventCount) + const latestEvents = events.slice(confirmedEventCount) + + this.options.logger?.debug(`applying ${confirmedEvents.length} confirmed events until block ${confirmedBlock}`) + try { + this.confirmedState.applyEvents(confirmedEvents) + } catch (err) { + this.invalidState = new InvalidAuctionStateError(confirmedBlock, err) + this.options.logger?.error(this.invalidState.message) + throw this.invalidState } - this.batch = batch - this.pendingEvents = pendingEvents + + this.latestState = undefined + this.options.logger?.debug(`reapplying ${latestEvents.length} latest events until block ${latestBlock}`) + if (latestEvents.length > 0) { + // NOTE: Errors applying latest state are not considered fatal as we can + // still recover from them (since the confirmed state is still valid). If + // applying the latest events fails, just make sure that the `latestEvent` + // property is not set, so that the query methods fall back to using the + // confirmed state. + const newLatestState = this.confirmedState.copy() + newLatestState.applyEvents(latestEvents) + this.latestState = newLatestState + } + + return latestBlock } /** @@ -231,10 +260,25 @@ export class StreamedOrderbook { const BATCH_DURATION = 300 const block = await this.web3.eth.getBlock(blockNumber) - const batch = Math.floor(Number(block.timestamp) / BATCH_DURATION) + // NOTE: Pending or future blocks return null when queried, so approximate + // with system time + const timestamp = block?.timestamp ?? Date.now() + const batch = Math.floor(Number(timestamp) / BATCH_DURATION) return batch } + + /** + * Helper method to check for an unrecoverable invalid state in the current + * streamed orderbook. + * + * @throws If the streamed orderbook is in an invalid state. + */ + private throwOnInvalidState(): void { + if (this.invalidState) { + throw this.invalidState + } + } } /** diff --git a/src/streamed/state.ts b/src/streamed/state.ts index 69d79bf30..82054e675 100644 --- a/src/streamed/state.ts +++ b/src/streamed/state.ts @@ -114,6 +114,26 @@ export class AuctionState { private readonly options: OrderbookOptions, ) {} + /** + * Creates a copy of the auction state that can apply events independently + * without modifying the original state. + */ + public copy(): AuctionState { + const clone = new AuctionState(this.options) + clone.lastBlock = this.lastBlock + clone.tokens.push(...this.tokens) + for (const [user, account] of this.accounts.entries()) { + clone.accounts.set(user, { + balances: new Map(account.balances), + pendingWithdrawals: new Map(account.pendingWithdrawals), + orders: account.orders.map(order => ({ ...order })), + }) + } + clone.lastSolution = this.lastSolution + + return clone + } + /** * Create an object representation of the current account state for JSON * serialization. diff --git a/test/models/streamed/index.spec.ts b/test/models/streamed/index.spec.ts index 418b423b1..7d88bd6dd 100644 --- a/test/models/streamed/index.spec.ts +++ b/test/models/streamed/index.spec.ts @@ -28,15 +28,15 @@ describe("Streamed Orderbook", () => { "ETHEREUM_NODE_URL or INFURA_PROJECT_ID environment variable is required", ) const url = ETHEREUM_NODE_URL || `https://mainnet.infura.io/v3/${INFURA_PROJECT_ID}` + const endBlock = ORDERBOOK_END_BLOCK ? parseInt(ORDERBOOK_END_BLOCK) : undefined + const web3 = new Web3(url) const [viewer] = await deployment(web3, BatchExchangeViewerArtifact) - const endBlock = ORDERBOOK_END_BLOCK ? - parseInt(ORDERBOOK_END_BLOCK) : - await web3.eth.getBlockNumber() - console.debug("==> building streamed orderbook...") const orderbook = await StreamedOrderbook.init(web3, { endBlock, strict: true }) + const targetBlock = endBlock ?? await orderbook.update() + const streamedOrders = orderbook.getOpenOrders().map(order => ({ ...order, sellTokenBalance: new BN(order.sellTokenBalance.toString()), @@ -46,7 +46,7 @@ describe("Streamed Orderbook", () => { })) console.debug("==> querying onchain orderbook...") - const queriedOrders = await getOpenOrders(viewer, 300, endBlock) + const queriedOrders = await getOpenOrders(viewer, 300, targetBlock) console.debug("==> comparing orderbooks...") function toDiffableOrders(orders: IndexedOrder[]): Record> { diff --git a/test/models/streamed/state.spec.ts b/test/models/streamed/state.spec.ts index c2def530f..c5fe9b945 100644 --- a/test/models/streamed/state.spec.ts +++ b/test/models/streamed/state.spec.ts @@ -531,4 +531,100 @@ describe("Account State", () => { ])).to.throw() }) }) + + describe("copy", () => { + it("Does not modify the orginal state.", () => { + const state = auctionState() + state.applyEvents([ + event(1, "TokenListing", { id: "0", token: addr(0) }), + event(2, "Deposit", { + user: addr(0), + token: addr(0), + amount: "100000", + batchId: "1", + }), + event(3, "OrderPlacement", { + owner: addr(0), + index: "0", + buyToken: "0", + sellToken: "0", + validFrom: "0", + validUntil: "9999", + priceNumerator: "100000", + priceDenominator: "100000", + }), + event(4, "SolutionSubmission", { + submitter: addr(0), + burntFees: "10000", + utility: "unused", + disregardedUtility: "unused", + lastAuctionBurntFees: "unsued", + prices: ["unsued"], + tokenIdsForPrice: ["unsued"], + }), + event(5, "WithdrawRequest", { + user: addr(0), + token: addr(0), + amount: "100000", + batchId: "1", + }), + ]) + const original = state.toJSON() + + const copy = state.copy() + copy.applyEvents([ + // add a new token + event(10, "TokenListing", { id: "1", token: addr(1) }), + // add new balance + event(11, "Deposit", { + user: addr(0), + token: addr(0), + amount: "100000", + batchId: "1", + }), + // modify existing balance, clear existing pending withdrawal + event(12, "Withdraw", { + user: addr(0), + token: addr(0), + amount: "100000", + }), + // add new pending withdrawal + event(13, "WithdrawRequest", { + user: addr(0), + token: addr(0), + amount: "1000000", + batchId: "42", + }), + // modify existing order + event(14, "OrderCancellation", { + owner: addr(0), + id: "0", + }), + // add new order + event(15, "OrderPlacement", { + owner: addr(0), + index: "1", + buyToken: "0", + sellToken: "0", + validFrom: "0", + validUntil: "9999", + priceNumerator: "100000", + priceDenominator: "100000", + }), + // modify last solution, add new user + event(16, "SolutionSubmission", { + submitter: addr(1), + burntFees: "10000", + utility: "unused", + disregardedUtility: "unused", + lastAuctionBurntFees: "unsued", + prices: ["unsued"], + tokenIdsForPrice: ["unsued"], + }), + ]) + const after = state.toJSON() + + expect(original).to.deep.equal(after) + }) + }) })