diff --git a/.changeset/ninety-books-deliver.md b/.changeset/ninety-books-deliver.md new file mode 100644 index 0000000000..bc3be0b8ca --- /dev/null +++ b/.changeset/ninety-books-deliver.md @@ -0,0 +1,5 @@ +--- +'@penumbra-zone/query': minor +--- + +special case local genesis sync diff --git a/packages/query/src/block-processor.ts b/packages/query/src/block-processor.ts index 0efa41ec25..66e9114e4c 100644 --- a/packages/query/src/block-processor.ts +++ b/packages/query/src/block-processor.ts @@ -45,6 +45,7 @@ import { toPlainMessage } from '@bufbuild/protobuf'; import { getAssetIdFromGasPrices } from '@penumbra-zone/getters/compact-block'; import { getSpendableNoteRecordCommitment } from '@penumbra-zone/getters/spendable-note-record'; import { getSwapRecordCommitment } from '@penumbra-zone/getters/swap-record'; +import { CompactBlock } from '@penumbra-zone/protobuf/penumbra/core/component/compact_block/v1/compact_block_pb'; declare global { // eslint-disable-next-line no-var @@ -69,6 +70,7 @@ interface QueryClientProps { viewServer: ViewServerInterface; numeraires: AssetId[]; stakingAssetId: AssetId; + genesisBlock: CompactBlock | undefined; } const BLANK_TX_SOURCE = new CommitmentSource({ @@ -89,13 +91,22 @@ export class BlockProcessor implements BlockProcessorInterface { private numeraires: AssetId[]; private readonly stakingAssetId: AssetId; private syncPromise: Promise | undefined; - - constructor({ indexedDb, viewServer, querier, numeraires, stakingAssetId }: QueryClientProps) { + private genesisBlock: CompactBlock | undefined; + + constructor({ + indexedDb, + viewServer, + querier, + numeraires, + stakingAssetId, + genesisBlock, + }: QueryClientProps) { this.indexedDb = indexedDb; this.viewServer = viewServer; this.querier = querier; this.numeraires = numeraires; this.stakingAssetId = stakingAssetId; + this.genesisBlock = genesisBlock; } // If sync() is called multiple times concurrently, they'll all wait for @@ -125,6 +136,15 @@ export class BlockProcessor implements BlockProcessorInterface { this.numeraires = numeraires; } + /** + * Sync local state to present. This method will + * - identify current synced height (or `-1n` to represent a 'pre-genesis' state) + * - query remote rpc for the chain's latest block height + * - pre-genesis, initialize validator info + * - pre-genesis, process a local genesis block if provided + * - query remote rpc to begin streaming at the next block + * - iterate + */ private async syncAndStore() { // start at next block, or genesis if height is undefined let currentHeight = (await this.indexedDb.getFullSyncHeight()) ?? -1n; @@ -142,17 +162,17 @@ export class BlockProcessor implements BlockProcessorInterface { { retry: () => true }, ); - // TODO: init validator info in a better way, possibly after batch endpoint - // implemented https://github.com/penumbra-zone/penumbra/issues/4688 + // special case genesis sync if (currentHeight === -1n) { - // In the `for` loop below, we only update validator infos once we've - // reached the latest known epoch. This means that, if a user is syncing - // for the first time, they could experience a broken UI until the latest - // known epoch is reached, since they may have delegation tokens but no - // validator info to go with them. So we'll update validator infos at the - // beginning of sync as well, and force the rest of sync to wait until - // it's done. - void this.updateValidatorInfos(0n); + // initialize validator info at genesis + // TODO: use batch endpoint https://github.com/penumbra-zone/penumbra/issues/4688 + void this.updateValidatorInfos(currentHeight + 1n); + + // begin the chain with local genesis block if provided + if (this.genesisBlock?.height === currentHeight + 1n) { + currentHeight = this.genesisBlock.height; + await this.processBlock(this.genesisBlock, latestKnownBlockHeight); + } } // this is an indefinite stream of the (compact) chain from the network @@ -169,170 +189,175 @@ export class BlockProcessor implements BlockProcessorInterface { throw new Error(`Unexpected block height: ${compactBlock.height} at ${currentHeight}`); } - if (compactBlock.appParametersUpdated) { - await this.indexedDb.saveAppParams(await this.querier.app.appParams()); - } - if (compactBlock.fmdParameters) { - await this.indexedDb.saveFmdParams(compactBlock.fmdParameters); + await this.processBlock(compactBlock, latestKnownBlockHeight); + + // We only query Tendermint for the latest known block height once, when + // the block processor starts running. Once we're caught up, though, the + // chain will of course continue adding blocks, and we'll keep processing + // them. So, we need to update `latestKnownBlockHeight` once we've passed + // it. + if (compactBlock.height > latestKnownBlockHeight) { + latestKnownBlockHeight = compactBlock.height; } - if (compactBlock.gasPrices) { + } + } + + // logic for processing a compact block + private async processBlock(compactBlock: CompactBlock, latestKnownBlockHeight: bigint) { + if (compactBlock.appParametersUpdated) { + await this.indexedDb.saveAppParams(await this.querier.app.appParams()); + } + if (compactBlock.fmdParameters) { + await this.indexedDb.saveFmdParams(compactBlock.fmdParameters); + } + if (compactBlock.gasPrices) { + await this.indexedDb.saveGasPrices({ + ...toPlainMessage(compactBlock.gasPrices), + assetId: toPlainMessage(this.stakingAssetId), + }); + } + if (compactBlock.altGasPrices.length) { + for (const altGas of compactBlock.altGasPrices) { await this.indexedDb.saveGasPrices({ - ...toPlainMessage(compactBlock.gasPrices), - assetId: toPlainMessage(this.stakingAssetId), + ...toPlainMessage(altGas), + assetId: getAssetIdFromGasPrices(altGas), }); } - if (compactBlock.altGasPrices.length) { - for (const altGas of compactBlock.altGasPrices) { - await this.indexedDb.saveGasPrices({ - ...toPlainMessage(altGas), - assetId: getAssetIdFromGasPrices(altGas), - }); - } - } + } - // wasm view server scan - // - decrypts new notes - // - decrypts new swaps - // - updates idb with advice - const scannerWantsFlush = await this.viewServer.scanBlock(compactBlock); - - // flushing is slow, avoid it until - // - wasm says - // - every 1000th block - // - every block at tip - const flushReasons = { - scannerWantsFlush, - interval: compactBlock.height % 1000n === 0n, - new: compactBlock.height > latestKnownBlockHeight, - }; - - const recordsByCommitment = new Map(); - let flush: ScanBlockResult | undefined; - if (Object.values(flushReasons).some(Boolean)) { - flush = this.viewServer.flushUpdates(); - - // in an atomic query, this - // - saves 'sctUpdates' - // - saves new decrypted notes - // - saves new decrypted swaps - // - updates last block synced - await this.indexedDb.saveScanResult(flush); - - // - detect unknown asset types - // - shielded pool for asset metadata - // - or, generate default fallback metadata - // - update idb - await this.identifyNewAssets(flush.newNotes); - - for (const spendableNoteRecord of flush.newNotes) { - recordsByCommitment.set(spendableNoteRecord.noteCommitment!, spendableNoteRecord); - } - for (const swapRecord of flush.newSwaps) { - recordsByCommitment.set(swapRecord.swapCommitment!, swapRecord); - } + // wasm view server scan + // - decrypts new notes + // - decrypts new swaps + // - updates idb with advice + const scannerWantsFlush = await this.viewServer.scanBlock(compactBlock); + + // flushing is slow, avoid it until + // - wasm says + // - every 1000th block + // - every block at tip + const flushReasons = { + scannerWantsFlush, + interval: compactBlock.height % 1000n === 0n, + new: compactBlock.height > latestKnownBlockHeight, + }; + + const recordsByCommitment = new Map(); + let flush: ScanBlockResult | undefined; + if (Object.values(flushReasons).some(Boolean)) { + flush = this.viewServer.flushUpdates(); + + // in an atomic query, this + // - saves 'sctUpdates' + // - saves new decrypted notes + // - saves new decrypted swaps + // - updates last block synced + await this.indexedDb.saveScanResult(flush); + + // - detect unknown asset types + // - shielded pool for asset metadata + // - or, generate default fallback metadata + // - update idb + await this.identifyNewAssets(flush.newNotes); + + for (const spendableNoteRecord of flush.newNotes) { + recordsByCommitment.set(spendableNoteRecord.noteCommitment!, spendableNoteRecord); + } + for (const swapRecord of flush.newSwaps) { + recordsByCommitment.set(swapRecord.swapCommitment!, swapRecord); } + } - // nullifiers on this block may match notes or swaps from db - // - update idb, mark as spent/claimed - // - return nullifiers used in this way - const spentNullifiers = await this.resolveNullifiers( - compactBlock.nullifiers, - compactBlock.height, + // nullifiers on this block may match notes or swaps from db + // - update idb, mark as spent/claimed + // - return nullifiers used in this way + const spentNullifiers = await this.resolveNullifiers( + compactBlock.nullifiers, + compactBlock.height, + ); + + // if a new record involves a state commitment, scan all block tx + if (spentNullifiers.size || recordsByCommitment.size) { + // this is a network query + const blockTx = await this.querier.app.txsByHeight(compactBlock.height); + + // identify tx that involve a new record + // - compare nullifiers + // - compare state commitments + // - collect relevant tx for info generation later + // - if matched by commitment, collect record with recovered source + const { relevantTx, recordsWithSources } = await this.identifyTransactions( + spentNullifiers, + recordsByCommitment, + blockTx, ); - // if a new record involves a state commitment, scan all block tx - if (spentNullifiers.size || recordsByCommitment.size) { - // this is a network query - const blockTx = await this.querier.app.txsByHeight(compactBlock.height); - - // identify tx that involve a new record - // - compare nullifiers - // - compare state commitments - // - collect relevant tx for info generation later - // - if matched by commitment, collect record with recovered source - const { relevantTx, recordsWithSources } = await this.identifyTransactions( - spentNullifiers, - recordsByCommitment, - blockTx, - ); - - // this simply stores the new records with 'rehydrated' sources to idb - // TODO: this is the second time we save these records, after "saveScanResult" - await this.saveRecoveredCommitmentSources(recordsWithSources); - - await this.processTransactions(blockTx); - - // at this point txinfo can be generated and saved. this will resolve - // pending broadcasts, and populate the transaction list. - // - calls wasm for each relevant tx - // - saves to idb - await this.saveTransactions(compactBlock.height, relevantTx); - } + // this simply stores the new records with 'rehydrated' sources to idb + // TODO: this is the second time we save these records, after "saveScanResult" + await this.saveRecoveredCommitmentSources(recordsWithSources); - /** - * This... really isn't great. - * - * You can see above that we're already iterating over flush.newNotes. So - * why don't we put this call to - * `this.maybeUpsertAuctionWithNoteCommitment()` inside that earlier `for` - * loop? - * - * The problem is, we need to call `this.processTransactions()` before - * calling `this.maybeUpsertAuctionWithNoteCommitment()`, because - * `this.processTransactions()` is what saves the auction NFT metadata to - * the database. `this.maybeUpsertAuctionWithNoteCommitment()` depends on - * that auction NFT metadata being saved already to be able to detect - * whether a given note is for an auction NFT; only then will it save the - * note's commitment to the `AUCTIONS` table. - * - * "So why not just move `this.processTransactions()` to before the block - * where we handle `flush.newNotes`?" Because `this.processTransactions()` - * should only run after we've handled `flush.newNotes`, since we depend - * on the result of the flush to determine whether there are transactions - * to process in the first place. It's a catch-22. - * - * This isn't a problem in core because core isn't going back and forth - * between Rust and TypeScript like we are. If and when we move the block - * processor into Rust, this issue should be resolved. - */ - for (const spendableNoteRecord of flush?.newNotes ?? []) { - await this.maybeUpsertAuctionWithNoteCommitment(spendableNoteRecord); - } + await this.processTransactions(blockTx); - // We do not store historical prices, - // so there is no point in saving prices that would already be considered obsolete at the time of saving - const blockInPriceRelevanceThreshold = - compactBlock.height >= latestKnownBlockHeight - BigInt(PRICE_RELEVANCE_THRESHOLDS.default); - - // we can't use third-party price oracles for privacy reasons, - // so we have to get asset prices from swap results during block scans - // and store them locally in indexed-db. - if (blockInPriceRelevanceThreshold && compactBlock.swapOutputs.length) { - await updatePricesFromSwaps( - this.indexedDb, - this.numeraires, - compactBlock.swapOutputs, - compactBlock.height, - ); - } + // at this point txinfo can be generated and saved. this will resolve + // pending broadcasts, and populate the transaction list. + // - calls wasm for each relevant tx + // - saves to idb + await this.saveTransactions(compactBlock.height, relevantTx); + } - // We only query Tendermint for the latest known block height once, when - // the block processor starts running. Once we're caught up, though, the - // chain will of course continue adding blocks, and we'll keep processing - // them. So, we need to update `latestKnownBlockHeight` once we've passed - // it. - if (compactBlock.height > latestKnownBlockHeight) { - latestKnownBlockHeight = compactBlock.height; - } + /** + * This... really isn't great. + * + * You can see above that we're already iterating over flush.newNotes. So + * why don't we put this call to + * `this.maybeUpsertAuctionWithNoteCommitment()` inside that earlier `for` + * loop? + * + * The problem is, we need to call `this.processTransactions()` before + * calling `this.maybeUpsertAuctionWithNoteCommitment()`, because + * `this.processTransactions()` is what saves the auction NFT metadata to + * the database. `this.maybeUpsertAuctionWithNoteCommitment()` depends on + * that auction NFT metadata being saved already to be able to detect + * whether a given note is for an auction NFT; only then will it save the + * note's commitment to the `AUCTIONS` table. + * + * "So why not just move `this.processTransactions()` to before the block + * where we handle `flush.newNotes`?" Because `this.processTransactions()` + * should only run after we've handled `flush.newNotes`, since we depend + * on the result of the flush to determine whether there are transactions + * to process in the first place. It's a catch-22. + * + * This isn't a problem in core because core isn't going back and forth + * between Rust and TypeScript like we are. If and when we move the block + * processor into Rust, this issue should be resolved. + */ + for (const spendableNoteRecord of flush?.newNotes ?? []) { + await this.maybeUpsertAuctionWithNoteCommitment(spendableNoteRecord); + } - const isLastBlockOfEpoch = !!compactBlock.epochRoot; - if (isLastBlockOfEpoch) { - await this.handleEpochTransition(compactBlock.height, latestKnownBlockHeight); - } + // We do not store historical prices, + // so there is no point in saving prices that would already be considered obsolete at the time of saving + const blockInPriceRelevanceThreshold = + compactBlock.height >= latestKnownBlockHeight - BigInt(PRICE_RELEVANCE_THRESHOLDS.default); - if (globalThis.__ASSERT_ROOT__) { - await this.assertRootValid(compactBlock.height); - } + // we can't use third-party price oracles for privacy reasons, + // so we have to get asset prices from swap results during block scans + // and store them locally in indexed-db. + if (blockInPriceRelevanceThreshold && compactBlock.swapOutputs.length) { + await updatePricesFromSwaps( + this.indexedDb, + this.numeraires, + compactBlock.swapOutputs, + compactBlock.height, + ); + } + + const isLastBlockOfEpoch = !!compactBlock.epochRoot; + if (isLastBlockOfEpoch) { + await this.handleEpochTransition(compactBlock.height, latestKnownBlockHeight); + } + + if (globalThis.__ASSERT_ROOT__) { + await this.assertRootValid(compactBlock.height); } }