Skip to content

Commit

Permalink
protocol: parse empty block data as null (#89)
Browse files Browse the repository at this point in the history
The DNA protocol sends an empty block to signal to the client that it
scanned a
block but didn't find any data.
This happens when the client sends multiple filters in one request.

This PR changes the stream definitions to:

 * Require that the schema returns a nullable type.
 * Propagate changes to all the types across packages.

We also extend `StreamConfig` to include a `mergeFilter` function that
can be
used to merge multiple filters into one.
  • Loading branch information
jaipaljadeja authored Jul 3, 2024
2 parents b07fa77 + 06b7c38 commit 253b046
Show file tree
Hide file tree
Showing 29 changed files with 623 additions and 94 deletions.
2 changes: 2 additions & 0 deletions examples/beaconchain-client/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import assert from "node:assert";
import { BeaconChainStream, Filter } from "@apibara/beaconchain";
import { createClient } from "@apibara/protocol";
import { defineCommand, runMain } from "citty";
Expand Down Expand Up @@ -60,6 +61,7 @@ const command = defineCommand({

const maxBlobSize = 131_072; // bytes
for (const block of message.data.data) {
assert(block !== null);
const transactions = block.transactions ?? [];
for (const blob of block.blobs) {
if (blob.blob === undefined) continue;
Expand Down
2 changes: 2 additions & 0 deletions examples/evm-client/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import assert from "node:assert";
import { EvmStream, Filter } from "@apibara/evm";
import { createClient } from "@apibara/protocol";
import { defineCommand, runMain } from "citty";
Expand Down Expand Up @@ -63,6 +64,7 @@ const command = defineCommand({
case "data": {
consola.info("Block", message.data.endCursor?.orderKey);
for (const block of message.data.data) {
assert(block !== null);
consola.info("Block", block.header?.number);
for (const log of block.logs ?? []) {
// const { args } = decodeEventLog({
Expand Down
2 changes: 2 additions & 0 deletions examples/starknet-client/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import assert from "node:assert";
import { createClient } from "@apibara/protocol";
import { Filter, StarknetStream } from "@apibara/starknet";
import { defineCommand, runMain } from "citty";
Expand Down Expand Up @@ -98,6 +99,7 @@ const command = defineCommand({

let events = 0;
for (const block of message.data.data) {
assert(block !== null);
events += block.events.length ?? 0;
consola.info(
`Block n=${block.header?.blockNumber} h=${block.header?.blockHash}`,
Expand Down
8 changes: 7 additions & 1 deletion packages/beaconchain/src/block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,19 @@ export type Block = typeof Block.Type;

export const BlockFromBytes = Schema.transform(
Schema.Uint8ArrayFromSelf,
Block,
Schema.NullOr(Block),
{
strict: false,
decode(value) {
if (value.length === 0) {
return null;
}
return proto.data.Block.decode(value);
},
encode(value) {
if (value === null) {
return new Uint8Array();
}
return proto.data.Block.encode(value).finish();
},
},
Expand Down
25 changes: 25 additions & 0 deletions packages/beaconchain/src/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,28 @@ export const FilterFromBytes = Schema.transform(

export const filterToBytes = Schema.encodeSync(FilterFromBytes);
export const filterFromBytes = Schema.decodeSync(FilterFromBytes);

export function mergeFilter(a: Filter, b: Filter): Filter {
const header = mergeHeaderFilter(a.header, b.header);
return {
header,
transactions: [...(a.transactions ?? []), ...(b.transactions ?? [])],
validators: [...(a.validators ?? []), ...(b.validators ?? [])],
blobs: [...(a.blobs ?? []), ...(b.blobs ?? [])],
};
}

function mergeHeaderFilter(
a?: HeaderFilter,
b?: HeaderFilter,
): HeaderFilter | undefined {
if (a === undefined) {
return b;
}
if (b === undefined) {
return a;
}
return {
always: a.always || b.always,
};
}
3 changes: 2 additions & 1 deletion packages/beaconchain/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { StreamConfig } from "@apibara/protocol";
import { BlockFromBytes } from "./block";
import { FilterFromBytes } from "./filter";
import { FilterFromBytes, mergeFilter } from "./filter";

export * as proto from "./proto";

Expand All @@ -11,4 +11,5 @@ export * from "./block";
export const BeaconChainStream = new StreamConfig(
FilterFromBytes,
BlockFromBytes,
mergeFilter,
);
8 changes: 7 additions & 1 deletion packages/evm/src/block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,19 @@ export type Block = typeof Block.Type;

export const BlockFromBytes = Schema.transform(
Schema.Uint8ArrayFromSelf,
Block,
Schema.NullOr(Block),
{
strict: false,
decode(value) {
if (value.length === 0) {
return null;
}
return proto.data.Block.decode(value);
},
encode(value) {
if (value === null) {
return new Uint8Array();
}
return proto.data.Block.encode(value).finish();
},
},
Expand Down
113 changes: 112 additions & 1 deletion packages/evm/src/filter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ import { encodeEventTopics, pad, parseAbi } from "viem";
import { describe, expect, it } from "vitest";

import { Schema } from "@effect/schema";
import { Filter, LogFilter, filterFromProto, filterToProto } from "./filter";
import {
Filter,
LogFilter,
filterFromProto,
filterToProto,
mergeFilter,
} from "./filter";

const abi = parseAbi([
"event Transfer(address indexed from, address indexed to, uint256 value)",
Expand Down Expand Up @@ -78,3 +84,108 @@ describe("LogFilter", () => {
expect(back).toEqual(filter);
});
});

describe("mergeFilter", () => {
it("returns header.always if any has it", () => {
const fa = mergeFilter({}, { header: { always: true } });
expect(fa).toMatchInlineSnapshot(`
{
"header": {
"always": true,
},
"logs": [],
"transactions": [],
"withdrawals": [],
}
`);
const fb = mergeFilter({ header: { always: true } }, {});
expect(fb).toMatchInlineSnapshot(`
{
"header": {
"always": true,
},
"logs": [],
"transactions": [],
"withdrawals": [],
}
`);
});

it("returns an empty header by default", () => {
const f = mergeFilter({}, {});
expect(f).toMatchInlineSnapshot(`
{
"header": undefined,
"logs": [],
"transactions": [],
"withdrawals": [],
}
`);
});

it("concatenates logs", () => {
const f = mergeFilter(
{ logs: [{ address: "0xAAAAAAAAAAAAAAAAAAAAAA" }] },
{ logs: [{ address: "0xBBBBBBBBBBBBBBBBBBBBBB" }] },
);
expect(f).toMatchInlineSnapshot(`
{
"header": undefined,
"logs": [
{
"address": "0xAAAAAAAAAAAAAAAAAAAAAA",
},
{
"address": "0xBBBBBBBBBBBBBBBBBBBBBB",
},
],
"transactions": [],
"withdrawals": [],
}
`);
});

it("concatenates transactions", () => {
const f = mergeFilter(
{ transactions: [{ from: "0xAAAAAAAAAAAAAAAAAAAAAA" }] },
{ transactions: [{ from: "0xBBBBBBBBBBBBBBBBBBBBBB" }] },
);
expect(f).toMatchInlineSnapshot(`
{
"header": undefined,
"logs": [],
"transactions": [
{
"from": "0xAAAAAAAAAAAAAAAAAAAAAA",
},
{
"from": "0xBBBBBBBBBBBBBBBBBBBBBB",
},
],
"withdrawals": [],
}
`);
});

it("concatenates withdrawals", () => {
const f = mergeFilter(
{ withdrawals: [{ validatorIndex: 1n }] },
{ withdrawals: [{ validatorIndex: 100n }] },
);
expect(f).toMatchInlineSnapshot(`
{
"header": undefined,
"logs": [],
"transactions": [],
"withdrawals": [
{
"validatorIndex": 1n,
},
{
"validatorIndex": 100n,
},
],
}
`);
});
});
25 changes: 25 additions & 0 deletions packages/evm/src/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,28 @@ export const FilterFromBytes = Schema.transform(

export const filterToBytes = Schema.encodeSync(FilterFromBytes);
export const filterFromBytes = Schema.decodeSync(FilterFromBytes);

export function mergeFilter(a: Filter, b: Filter): Filter {
const header = mergeHeaderFilter(a.header, b.header);
return {
header,
withdrawals: [...(a.withdrawals ?? []), ...(b.withdrawals ?? [])],
logs: [...(a.logs ?? []), ...(b.logs ?? [])],
transactions: [...(a.transactions ?? []), ...(b.transactions ?? [])],
};
}

function mergeHeaderFilter(
a?: HeaderFilter,
b?: HeaderFilter,
): HeaderFilter | undefined {
if (a === undefined) {
return b;
}
if (b === undefined) {
return a;
}
return {
always: a.always || b.always,
};
}
8 changes: 6 additions & 2 deletions packages/evm/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import { StreamConfig } from "@apibara/protocol";
import { BlockFromBytes } from "./block";
import { FilterFromBytes } from "./filter";
import { FilterFromBytes, mergeFilter } from "./filter";

export * as proto from "./proto";

export * from "./common";
export * from "./filter";
export * from "./block";

export const EvmStream = new StreamConfig(FilterFromBytes, BlockFromBytes);
export const EvmStream = new StreamConfig(
FilterFromBytes,
BlockFromBytes,
mergeFilter,
);
24 changes: 11 additions & 13 deletions packages/indexer/src/indexer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { generateMockMessages, vcr } from "./testing";
import { type MockRet, getMockIndexer } from "./testing/indexer";

describe("Run Test", () => {
const client = new MockClient(messages, [{}]);
const client = new MockClient(generateMockMessages(), [{}]);

it("should stream messages", async () => {
const sink = vcr<MockRet>();
Expand All @@ -16,7 +16,7 @@ describe("Run Test", () => {
{
"data": [
{
"blockNumber": 5000000,
"data": "5000000",
},
],
"endCursor": {
Expand All @@ -26,7 +26,7 @@ describe("Run Test", () => {
{
"data": [
{
"blockNumber": 5000001,
"data": "5000001",
},
],
"endCursor": {
Expand All @@ -36,7 +36,7 @@ describe("Run Test", () => {
{
"data": [
{
"blockNumber": 5000002,
"data": "5000002",
},
],
"endCursor": {
Expand All @@ -46,7 +46,7 @@ describe("Run Test", () => {
{
"data": [
{
"blockNumber": 5000003,
"data": "5000003",
},
],
"endCursor": {
Expand All @@ -56,7 +56,7 @@ describe("Run Test", () => {
{
"data": [
{
"blockNumber": 5000004,
"data": "5000004",
},
],
"endCursor": {
Expand All @@ -66,7 +66,7 @@ describe("Run Test", () => {
{
"data": [
{
"blockNumber": 5000005,
"data": "5000005",
},
],
"endCursor": {
Expand All @@ -76,7 +76,7 @@ describe("Run Test", () => {
{
"data": [
{
"blockNumber": 5000006,
"data": "5000006",
},
],
"endCursor": {
Expand All @@ -86,7 +86,7 @@ describe("Run Test", () => {
{
"data": [
{
"blockNumber": 5000007,
"data": "5000007",
},
],
"endCursor": {
Expand All @@ -96,7 +96,7 @@ describe("Run Test", () => {
{
"data": [
{
"blockNumber": 5000008,
"data": "5000008",
},
],
"endCursor": {
Expand All @@ -106,7 +106,7 @@ describe("Run Test", () => {
{
"data": [
{
"blockNumber": 5000009,
"data": "5000009",
},
],
"endCursor": {
Expand All @@ -117,5 +117,3 @@ describe("Run Test", () => {
`);
});
});

const messages = generateMockMessages();
Loading

0 comments on commit 253b046

Please sign in to comment.