Skip to content

Commit

Permalink
fix(api): add reorg handling fixes (#643)
Browse files Browse the repository at this point in the history
* feat(web): add support for dropdown to hold multiple values by option

* fix(web): display rollups with more than one address properly in the rollup filter

* fix(web): simplify dropdown options by including prefix within label props

* fix(api): ensure reorged blocks are correctly reindexed

* feat(db): add address category info fks constraints tx model

* feat(db): create indexes for block number reference fields

* fix(api): remove all block references across the database when marking a block as reorged

* chore: add changesets

* test(db): fix ut
  • Loading branch information
PJColombo authored Nov 25, 2024
1 parent cc62af6 commit 0922d8b
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 33 deletions.
5 changes: 5 additions & 0 deletions .changeset/brown-emus-hunt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@blobscan/db": patch
---

Added address category info constraints to transaction model
5 changes: 5 additions & 0 deletions .changeset/khaki-rules-applaud.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@blobscan/api": patch
---

Resolved an issue where blocks flagged as reorged remained marked as reorged after being reindexed
5 changes: 5 additions & 0 deletions .changeset/rotten-bears-bathe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@blobscan/db": patch
---

Created indexes for block number fields
1 change: 0 additions & 1 deletion apps/web/src/components/Filters/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ export const Filters: FC = function () {
);
});

console.log(rollupOptions);
if (rollupOptions) {
newFilters.rollups = rollupOptions;
}
Expand Down
149 changes: 123 additions & 26 deletions packages/api/src/routers/indexer/handleReorgedSlots.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type { Prisma } from "@blobscan/db";
import { z } from "@blobscan/zod";

import type { TRPCInnerContext } from "../../context";
import { jwtAuthedProcedure } from "../../procedures";
import { INDEXER_PATH } from "./common";

Expand All @@ -13,6 +15,112 @@ const outputSchema = z.object({

export type HandleReorgedSlotsInput = z.infer<typeof inputSchema>;

const blockSelect = {
hash: true,
number: true,
transactions: {
select: {
hash: true,
},
},
} satisfies Prisma.BlockSelect;

type BlockPayload = Prisma.BlockGetPayload<{ select: typeof blockSelect }>;

/**
* Generates a set of Prisma operations to remove references to reorged blocks from the db.
*
* When blocks are no longer part of the canonical chain due to reorg, any references to these blocks
* must be cleaned up from relevant tables to maintain database integrity.
*
* @returns An array of Prisma operation promises to be executed for database cleanup.
*/
async function generateBlockCleanupOperations(
prisma: TRPCInnerContext["prisma"],
reorgedBlocks: BlockPayload[]
) {
const reorgedBlockNumbers = reorgedBlocks.map((b) => b.number);
const [addressCategoryInfos, blobs] = await Promise.all([
prisma.addressCategoryInfo.findMany({
where: {
OR: [
{
firstBlockNumberAsSender: {
in: reorgedBlockNumbers,
},
},
{
firstBlockNumberAsReceiver: {
in: reorgedBlockNumbers,
},
},
],
},
}),
prisma.blob.findMany({
where: {
firstBlockNumber: {
in: reorgedBlockNumbers,
},
},
}),
]);

const referenceRemovalOps = [];

for (const {
id,
firstBlockNumberAsReceiver,
firstBlockNumberAsSender,
} of addressCategoryInfos) {
const data: Prisma.AddressCategoryInfoUpdateInput = {};

if (
firstBlockNumberAsSender &&
reorgedBlockNumbers.includes(firstBlockNumberAsSender)
) {
data.firstBlockNumberAsSender = null;
}

if (
firstBlockNumberAsReceiver &&
reorgedBlockNumbers.includes(firstBlockNumberAsReceiver)
) {
data.firstBlockNumberAsReceiver = null;
}

const hasBlockReferences = Object.keys(data).length > 0;

if (hasBlockReferences) {
referenceRemovalOps.push(
prisma.addressCategoryInfo.update({
data,
where: {
id,
},
})
);
}
}

for (const { firstBlockNumber, versionedHash } of blobs) {
if (firstBlockNumber && reorgedBlockNumbers.includes(firstBlockNumber)) {
referenceRemovalOps.push(
prisma.blob.update({
where: {
versionedHash,
},
data: {
firstBlockNumber: null,
},
})
);
}
}

return referenceRemovalOps;
}

export const handleReorgedSlots = jwtAuthedProcedure
.meta({
openapi: {
Expand All @@ -28,42 +136,31 @@ export const handleReorgedSlots = jwtAuthedProcedure
.output(outputSchema)
.mutation(async ({ ctx: { prisma }, input: { reorgedSlots } }) => {
const reorgedBlocks = await prisma.block.findMany({
select: {
hash: true,
transactions: {
select: {
hash: true,
},
},
},
select: blockSelect,
where: {
slot: {
in: reorgedSlots,
},
},
});

const result = await prisma.transactionFork.upsertMany(
reorgedBlocks.flatMap((b) =>
b.transactions.map((tx) => ({
hash: tx.hash,
blockHash: b.hash,
}))
)
const reorgedBlockTxs = reorgedBlocks.flatMap((b) =>
b.transactions.map((tx) => ({
hash: tx.hash,
blockHash: b.hash,
}))
);

let totalUpdatedSlots: number;
const blockReferenceRemovalOps = await generateBlockCleanupOperations(
prisma,
reorgedBlocks
);

if (typeof result === "number") {
totalUpdatedSlots = result;
} else {
totalUpdatedSlots = result.reduce(
(acc, totalSlots) => acc + totalSlots.count,
0
);
}
await prisma.$transaction([
...blockReferenceRemovalOps,
prisma.transactionFork.upsertMany(reorgedBlockTxs),
]);

return {
totalUpdatedSlots,
totalUpdatedSlots: reorgedBlocks.length,
};
});
7 changes: 7 additions & 0 deletions packages/api/src/routers/indexer/indexData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@ export const indexData = jwtAuthedProcedure
const dbAddressCategoryInfos = createDBAddressCategoryInfo(dbTxs);

operations.push(
// We may be indexing a block that was marked as a reorg previously,
// so we delete any possible rows from the fork table
prisma.transactionFork.deleteMany({
where: {
blockHash: input.block.hash,
},
}),
prisma.block.upsert({
where: { hash: input.block.hash },
create: {
Expand Down
75 changes: 75 additions & 0 deletions packages/api/test/indexer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,32 @@ describe("Indexer router", async () => {
).resolves.toBeUndefined();
});

it("should reindex a block previously marked as reorged correctly", async () => {
const blockHash = INPUT.block.hash;
const blockTxHashes = INPUT.transactions.map((tx) => tx.hash);

// Marked the block as reorged
await authorizedContext.prisma.transactionFork.createMany({
data: blockTxHashes.map((hash) => ({
hash,
blockHash,
})),
});

// Reindex the block
await authorizedCaller.indexer.indexData(INPUT);

const forkTxs = await authorizedContext.prisma.transactionFork.findMany(
{
where: {
blockHash,
},
}
);

expect(forkTxs, "Block still has forked transactions").toEqual([]);
});

testValidError(
"should fail when receiving an empty array of transactions",
async () => {
Expand Down Expand Up @@ -692,6 +718,55 @@ describe("Indexer router", async () => {
expect(transactionForks).toEqual(expectedTransactionForks);
});

it("should clean up references to the reorged blocks", async () => {
const reorgedBlocks = await authorizedContext.prisma.block.findMany({
where: {
slot: {
in: input.reorgedSlots,
},
},
});

const reorgedBlockNumbers = reorgedBlocks.map((block) => block.number);

await authorizedCaller.indexer.handleReorgedSlots(input);

const reorgedBlocksAddressCategoryInfos =
await authorizedContext.prisma.addressCategoryInfo.findMany({
where: {
OR: [
{
firstBlockNumberAsSender: {
in: reorgedBlockNumbers,
},
},
{
firstBlockNumberAsReceiver: {
in: reorgedBlockNumbers,
},
},
],
},
});
const blobsWithReorgedBlocks =
await authorizedContext.prisma.blob.findMany({
where: {
firstBlockNumber: {
in: reorgedBlockNumbers,
},
},
});

expect(
reorgedBlocksAddressCategoryInfos,
"Reorged block references in address category records found"
).toEqual([]);
expect(
blobsWithReorgedBlocks,
"Reorged block references in blob records found"
).toEqual([]);
});

it("should return the number of updated slots", async () => {
const result = await authorizedCaller.indexer.handleReorgedSlots(input);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- AddForeignKey
ALTER TABLE "transaction" ADD CONSTRAINT "transaction_from_id_category_fkey" FOREIGN KEY ("from_id", "category") REFERENCES "address_category_info"("address", "category") ON DELETE RESTRICT ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "transaction" ADD CONSTRAINT "transaction_to_id_category_fkey" FOREIGN KEY ("to_id", "category") REFERENCES "address_category_info"("address", "category") ON DELETE RESTRICT ON UPDATE CASCADE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- CreateIndex
CREATE INDEX "address_category_info_first_block_number_as_receiver_idx" ON "address_category_info"("first_block_number_as_receiver");

-- CreateIndex
CREATE INDEX "address_category_info_first_block_number_as_sender_idx" ON "address_category_info"("first_block_number_as_sender");

-- CreateIndex
CREATE INDEX "blob_first_block_number_idx" ON "blob"("first_block_number");
19 changes: 13 additions & 6 deletions packages/db/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
generator client {
provider = "prisma-client-js"
// rhel-openssl-1.0.x is required by Vercel
binaryTargets = ["native", "rhel-openssl-1.0.x", "linux-musl-openssl-3.0.x"]
binaryTargets = ["native", "rhel-openssl-1.0.x", "linux-musl-openssl-3.0.x"]
previewFeatures = ["tracing", "metrics", "typedSql"]
}

Expand Down Expand Up @@ -83,9 +83,13 @@ model AddressCategoryInfo {
firstBlockNumberAsReceiver Int? @map("first_block_number_as_receiver")
firstBlockNumberAsSender Int? @map("first_block_number_as_sender")
addressEntity Address @relation(fields: [address], references: [address])
addressEntity Address @relation(fields: [address], references: [address])
transactionsAsSender Transaction[] @relation("senderAddressCategoryInfoRelation")
transactionsAsReceiver Transaction[] @relation("receiverAddressCategoryInfoRelation")
@@unique([address, category])
@@index([firstBlockNumberAsReceiver])
@@index([firstBlockNumberAsSender])
@@map("address_category_info")
}

Expand Down Expand Up @@ -114,6 +118,7 @@ model Blob {
dataStorageReferences BlobDataStorageReference[]
transactions BlobsOnTransactions[]
@@index([firstBlockNumber])
@@index([proof])
@@index([insertedAt])
@@map("blob")
Expand Down Expand Up @@ -176,10 +181,12 @@ model Transaction {
updatedAt DateTime @default(now()) @map("updated_at")
decodedFields Json @default("{}") @map("decoded_fields")
blobs BlobsOnTransactions[]
block Block @relation(fields: [blockHash], references: [hash])
from Address @relation("senderAddressRelation", fields: [fromId], references: [address])
to Address @relation("receiverAddressRelation", fields: [toId], references: [address])
blobs BlobsOnTransactions[]
block Block @relation(fields: [blockHash], references: [hash])
from Address @relation("senderAddressRelation", fields: [fromId], references: [address])
to Address @relation("receiverAddressRelation", fields: [toId], references: [address])
fromAddressCategoryInfo AddressCategoryInfo @relation("senderAddressCategoryInfoRelation", fields: [fromId, category], references: [address, category])
toAddressCategoryInfo AddressCategoryInfo @relation("receiverAddressCategoryInfoRelation", fields: [toId, category], references: [address, category])
transactionForks TransactionFork[]
Expand Down
Loading

0 comments on commit 0922d8b

Please sign in to comment.