Skip to content

Commit

Permalink
[Events Orderbook] Implement querying the orderbook at the latest blo…
Browse files Browse the repository at this point in the history
…ck (#735)

This PR implements querying the latest open orderbook.

This is implemented by keeping a separate "confirmed" auction state, built from events that are part of confirmed blocks, and not subject to reorgs, and a "latest" auction state that is built by cloning the "confirmed" state and applying events in between the confirmed block and the latest block.

The "latest" state is implemented to be discardable and rebuilt on every update so that it is not prone to errors stemming from reorgs, nor does complex logic for rolling back events need to be implemented. Unfortunately, the current implementation is slightly inefficient. I created #734 as an issue to capture a potential solution to the problem.

This closes #719 

### Test Plan

Run the e2e test to ensure the event based orderbook on the latest block matches the onchain queried orderbook:
```
$ yarn test-streamed-orderbook

  Streamed Orderbook
    init
==> building streamed orderbook...
==> querying onchain orderbook...
==> comparing orderbooks...
      ✓ should successfully apply all events and match on-chain orderbook (287940ms)


  1 passing (5m)
```


* implement querying the orderbook at the latest block

* comments

* add unit test for copy

* fix array index logic

* default to system time for future blocks
  • Loading branch information
nlordell authored May 6, 2020
1 parent 630705e commit 35a2ee2
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 34 deletions.
102 changes: 73 additions & 29 deletions src/streamed/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ export const DEFAULT_ORDERBOOK_OPTIONS: OrderbookOptions = {
* account state.
*/
export class StreamedOrderbook {
private readonly state: AuctionState;
private batch = -1;
private pendingEvents: AnyEvent<BatchExchange>[] = [];

private readonly confirmedState: AuctionState;
private latestState?: AuctionState;

private invalidState?: InvalidAuctionStateError;

Expand All @@ -95,7 +96,7 @@ export class StreamedOrderbook {
private readonly startBlock: number,
private readonly options: OrderbookOptions,
) {
this.state = new AuctionState(options)
this.confirmedState = new AuctionState(options)
}

/**
Expand Down Expand Up @@ -132,7 +133,10 @@ export class StreamedOrderbook {
* Retrieves the current open orders in the orderbook.
*/
public getOpenOrders(): IndexedOrder<bigint>[] {
return this.state.getOrders(this.batch)
this.throwOnInvalidState()

const state = this.latestState ?? this.confirmedState
return state.getOrders(this.batch)
}

/**
Expand All @@ -158,7 +162,7 @@ export class StreamedOrderbook {
const events = await this.getPastEvents({ fromBlock, toBlock })

this.options.logger?.debug(`applying ${events.length} past events`)
this.state.applyEvents(events)
this.confirmedState.applyEvents(events)
}
this.batch = await this.getBatchId(endBlock)
}
Expand All @@ -167,6 +171,9 @@ export class StreamedOrderbook {
* Apply new confirmed events to the account state and store the remaining
* events that are subject to reorgs into the `pendingEvents` array.
*
* @returns The block number up until which the streamed orderbook is up to
* date
*
* @remarks
* If there is an error retrieving the latest events from the node, then the
* account state remains unmodified. This allows the updating orderbook to be
Expand All @@ -175,35 +182,57 @@ export class StreamedOrderbook {
* then the streamed orderbook becomes invalid and can no longer apply new
* events as the actual auction state is unknown.
*/
public async update(): Promise<void> {
if (this.invalidState) {
throw this.invalidState
}
public async update(): Promise<number> {
this.throwOnInvalidState()

const fromBlock = this.state.nextBlock
const fromBlock = this.confirmedState.nextBlock
this.options.logger?.debug(`fetching new events from ${fromBlock}-latest`)
const events = await this.getPastEvents({ fromBlock })

const latestBlock = await this.web3.eth.getBlockNumber()
// NOTE: If the web3 instance is connected to nodes behind a load balancer,
// it is possible that the events were queried on a node that includes an
// additional block to the node that handled the query to the latest block
// number, so use the max of `latestEvents.last().blockNumber` and the
// queried latest block number.
const latestBlock = Math.max(
await this.web3.eth.getBlockNumber(),
events[events.length - 1]?.blockNumber ?? 0,
)
const confirmedBlock = latestBlock - this.options.blockConfirmations
const batch = await this.getBatchId(confirmedBlock)

const confirmedEventCount = events.findIndex(ev => ev.blockNumber > confirmedBlock)
const confirmedEvents = events.splice(0, confirmedEventCount)
const pendingEvents = events

if (confirmedEvents.length > 0) {
this.options.logger?.debug(`applying ${confirmedEvents.length} confirmed events until block ${confirmedBlock}`)
try {
this.state.applyEvents(confirmedEvents)
} catch (err) {
this.invalidState = new InvalidAuctionStateError(confirmedBlock, err)
this.options.logger?.error(this.invalidState.message)
throw this.invalidState
}

this.batch = await this.getBatchId(latestBlock)
if (events.length === 0) {
return latestBlock
}

const firstLatestEvent = events.findIndex(ev => ev.blockNumber > confirmedBlock)
const confirmedEventCount = firstLatestEvent !== -1 ? firstLatestEvent : events.length
const confirmedEvents = events.slice(0, confirmedEventCount)
const latestEvents = events.slice(confirmedEventCount)

this.options.logger?.debug(`applying ${confirmedEvents.length} confirmed events until block ${confirmedBlock}`)
try {
this.confirmedState.applyEvents(confirmedEvents)
} catch (err) {
this.invalidState = new InvalidAuctionStateError(confirmedBlock, err)
this.options.logger?.error(this.invalidState.message)
throw this.invalidState
}
this.batch = batch
this.pendingEvents = pendingEvents

this.latestState = undefined
this.options.logger?.debug(`reapplying ${latestEvents.length} latest events until block ${latestBlock}`)
if (latestEvents.length > 0) {
// NOTE: Errors applying latest state are not considered fatal as we can
// still recover from them (since the confirmed state is still valid). If
// applying the latest events fails, just make sure that the `latestEvent`
// property is not set, so that the query methods fall back to using the
// confirmed state.
const newLatestState = this.confirmedState.copy()
newLatestState.applyEvents(latestEvents)
this.latestState = newLatestState
}

return latestBlock
}

/**
Expand Down Expand Up @@ -231,10 +260,25 @@ export class StreamedOrderbook {
const BATCH_DURATION = 300

const block = await this.web3.eth.getBlock(blockNumber)
const batch = Math.floor(Number(block.timestamp) / BATCH_DURATION)
// NOTE: Pending or future blocks return null when queried, so approximate
// with system time
const timestamp = block?.timestamp ?? Date.now()
const batch = Math.floor(Number(timestamp) / BATCH_DURATION)

return batch
}

/**
* Helper method to check for an unrecoverable invalid state in the current
* streamed orderbook.
*
* @throws If the streamed orderbook is in an invalid state.
*/
private throwOnInvalidState(): void {
if (this.invalidState) {
throw this.invalidState
}
}
}

/**
Expand Down
20 changes: 20 additions & 0 deletions src/streamed/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,26 @@ export class AuctionState {
private readonly options: OrderbookOptions,
) {}

/**
* Creates a copy of the auction state that can apply events independently
* without modifying the original state.
*/
public copy(): AuctionState {
const clone = new AuctionState(this.options)
clone.lastBlock = this.lastBlock
clone.tokens.push(...this.tokens)
for (const [user, account] of this.accounts.entries()) {
clone.accounts.set(user, {
balances: new Map(account.balances),
pendingWithdrawals: new Map(account.pendingWithdrawals),
orders: account.orders.map(order => ({ ...order })),
})
}
clone.lastSolution = this.lastSolution

return clone
}

/**
* Create an object representation of the current account state for JSON
* serialization.
Expand Down
10 changes: 5 additions & 5 deletions test/models/streamed/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ describe("Streamed Orderbook", () => {
"ETHEREUM_NODE_URL or INFURA_PROJECT_ID environment variable is required",
)
const url = ETHEREUM_NODE_URL || `https://mainnet.infura.io/v3/${INFURA_PROJECT_ID}`
const endBlock = ORDERBOOK_END_BLOCK ? parseInt(ORDERBOOK_END_BLOCK) : undefined

const web3 = new Web3(url)
const [viewer] = await deployment<BatchExchangeViewer>(web3, BatchExchangeViewerArtifact)

const endBlock = ORDERBOOK_END_BLOCK ?
parseInt(ORDERBOOK_END_BLOCK) :
await web3.eth.getBlockNumber()

console.debug("==> building streamed orderbook...")
const orderbook = await StreamedOrderbook.init(web3, { endBlock, strict: true })
const targetBlock = endBlock ?? await orderbook.update()

const streamedOrders = orderbook.getOpenOrders().map(order => ({
...order,
sellTokenBalance: new BN(order.sellTokenBalance.toString()),
Expand All @@ -46,7 +46,7 @@ describe("Streamed Orderbook", () => {
}))

console.debug("==> querying onchain orderbook...")
const queriedOrders = await getOpenOrders(viewer, 300, endBlock)
const queriedOrders = await getOpenOrders(viewer, 300, targetBlock)

console.debug("==> comparing orderbooks...")
function toDiffableOrders<T>(orders: IndexedOrder<T>[]): Record<string, IndexedOrder<T>> {
Expand Down
96 changes: 96 additions & 0 deletions test/models/streamed/state.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -531,4 +531,100 @@ describe("Account State", () => {
])).to.throw()
})
})

describe("copy", () => {
it("Does not modify the orginal state.", () => {
const state = auctionState()
state.applyEvents([
event(1, "TokenListing", { id: "0", token: addr(0) }),
event(2, "Deposit", {
user: addr(0),
token: addr(0),
amount: "100000",
batchId: "1",
}),
event(3, "OrderPlacement", {
owner: addr(0),
index: "0",
buyToken: "0",
sellToken: "0",
validFrom: "0",
validUntil: "9999",
priceNumerator: "100000",
priceDenominator: "100000",
}),
event(4, "SolutionSubmission", {
submitter: addr(0),
burntFees: "10000",
utility: "unused",
disregardedUtility: "unused",
lastAuctionBurntFees: "unsued",
prices: ["unsued"],
tokenIdsForPrice: ["unsued"],
}),
event(5, "WithdrawRequest", {
user: addr(0),
token: addr(0),
amount: "100000",
batchId: "1",
}),
])
const original = state.toJSON()

const copy = state.copy()
copy.applyEvents([
// add a new token
event(10, "TokenListing", { id: "1", token: addr(1) }),
// add new balance
event(11, "Deposit", {
user: addr(0),
token: addr(0),
amount: "100000",
batchId: "1",
}),
// modify existing balance, clear existing pending withdrawal
event(12, "Withdraw", {
user: addr(0),
token: addr(0),
amount: "100000",
}),
// add new pending withdrawal
event(13, "WithdrawRequest", {
user: addr(0),
token: addr(0),
amount: "1000000",
batchId: "42",
}),
// modify existing order
event(14, "OrderCancellation", {
owner: addr(0),
id: "0",
}),
// add new order
event(15, "OrderPlacement", {
owner: addr(0),
index: "1",
buyToken: "0",
sellToken: "0",
validFrom: "0",
validUntil: "9999",
priceNumerator: "100000",
priceDenominator: "100000",
}),
// modify last solution, add new user
event(16, "SolutionSubmission", {
submitter: addr(1),
burntFees: "10000",
utility: "unused",
disregardedUtility: "unused",
lastAuctionBurntFees: "unsued",
prices: ["unsued"],
tokenIdsForPrice: ["unsued"],
}),
])
const after = state.toJSON()

expect(original).to.deep.equal(after)
})
})
})

0 comments on commit 35a2ee2

Please sign in to comment.