diff --git a/packages/beaconchain/src/filter.ts b/packages/beaconchain/src/filter.ts index 704766a..d372685 100644 --- a/packages/beaconchain/src/filter.ts +++ b/packages/beaconchain/src/filter.ts @@ -82,3 +82,28 @@ export const FilterFromBytes = Schema.transform( export const filterToBytes = Schema.encodeSync(FilterFromBytes); export const filterFromBytes = Schema.decodeSync(FilterFromBytes); + +export function mergeFilter(a: Filter, b: Filter): Filter { + const header = mergeHeaderFilter(a.header, b.header); + return { + header, + transactions: [...(a.transactions ?? []), ...(b.transactions ?? [])], + validators: [...(a.validators ?? []), ...(b.validators ?? [])], + blobs: [...(a.blobs ?? []), ...(b.blobs ?? [])], + }; +} + +function mergeHeaderFilter( + a?: HeaderFilter, + b?: HeaderFilter, +): HeaderFilter | undefined { + if (a === undefined) { + return b; + } + if (b === undefined) { + return a; + } + return { + always: a.always || b.always, + }; +} diff --git a/packages/beaconchain/src/index.ts b/packages/beaconchain/src/index.ts index ea6cf7f..22c6e78 100644 --- a/packages/beaconchain/src/index.ts +++ b/packages/beaconchain/src/index.ts @@ -1,6 +1,6 @@ import { StreamConfig } from "@apibara/protocol"; import { BlockFromBytes } from "./block"; -import { FilterFromBytes } from "./filter"; +import { FilterFromBytes, mergeFilter } from "./filter"; export * as proto from "./proto"; @@ -11,4 +11,5 @@ export * from "./block"; export const BeaconChainStream = new StreamConfig( FilterFromBytes, BlockFromBytes, + mergeFilter, ); diff --git a/packages/evm/src/filter.test.ts b/packages/evm/src/filter.test.ts index 9fdae58..bd1f304 100644 --- a/packages/evm/src/filter.test.ts +++ b/packages/evm/src/filter.test.ts @@ -2,7 +2,13 @@ import { encodeEventTopics, pad, parseAbi } from "viem"; import { describe, expect, it } from "vitest"; import { Schema } from "@effect/schema"; -import { Filter, LogFilter, filterFromProto, filterToProto } from "./filter"; +import { + Filter, + LogFilter, + filterFromProto, + filterToProto, + mergeFilter, +} from "./filter"; const abi = parseAbi([ "event Transfer(address indexed from, address indexed to, uint256 value)", @@ -78,3 +84,108 @@ describe("LogFilter", () => { expect(back).toEqual(filter); }); }); + +describe("mergeFilter", () => { + it("returns header.always if any has it", () => { + const fa = mergeFilter({}, { header: { always: true } }); + expect(fa).toMatchInlineSnapshot(` + { + "header": { + "always": true, + }, + "logs": [], + "transactions": [], + "withdrawals": [], + } + `); + const fb = mergeFilter({ header: { always: true } }, {}); + expect(fb).toMatchInlineSnapshot(` + { + "header": { + "always": true, + }, + "logs": [], + "transactions": [], + "withdrawals": [], + } + `); + }); + + it("returns an empty header by default", () => { + const f = mergeFilter({}, {}); + expect(f).toMatchInlineSnapshot(` + { + "header": undefined, + "logs": [], + "transactions": [], + "withdrawals": [], + } + `); + }); + + it("concatenates logs", () => { + const f = mergeFilter( + { logs: [{ address: "0xAAAAAAAAAAAAAAAAAAAAAA" }] }, + { logs: [{ address: "0xBBBBBBBBBBBBBBBBBBBBBB" }] }, + ); + expect(f).toMatchInlineSnapshot(` + { + "header": undefined, + "logs": [ + { + "address": "0xAAAAAAAAAAAAAAAAAAAAAA", + }, + { + "address": "0xBBBBBBBBBBBBBBBBBBBBBB", + }, + ], + "transactions": [], + "withdrawals": [], + } + `); + }); + + it("concatenates transactions", () => { + const f = mergeFilter( + { transactions: [{ from: "0xAAAAAAAAAAAAAAAAAAAAAA" }] }, + { transactions: [{ from: "0xBBBBBBBBBBBBBBBBBBBBBB" }] }, + ); + expect(f).toMatchInlineSnapshot(` + { + "header": undefined, + "logs": [], + "transactions": [ + { + "from": "0xAAAAAAAAAAAAAAAAAAAAAA", + }, + { + "from": "0xBBBBBBBBBBBBBBBBBBBBBB", + }, + ], + "withdrawals": [], + } + `); + }); + + it("concatenates withdrawals", () => { + const f = mergeFilter( + { withdrawals: [{ validatorIndex: 1n }] }, + { withdrawals: [{ validatorIndex: 100n }] }, + ); + expect(f).toMatchInlineSnapshot(` + { + "header": undefined, + "logs": [], + "transactions": [], + "withdrawals": [ + { + "validatorIndex": 1n, + }, + { + "validatorIndex": 100n, + }, + ], + } + `); + }); +}); diff --git a/packages/evm/src/filter.ts b/packages/evm/src/filter.ts index 030bb3a..fb520d2 100644 --- a/packages/evm/src/filter.ts +++ b/packages/evm/src/filter.ts @@ -90,3 +90,28 @@ export const FilterFromBytes = Schema.transform( export const filterToBytes = Schema.encodeSync(FilterFromBytes); export const filterFromBytes = Schema.decodeSync(FilterFromBytes); + +export function mergeFilter(a: Filter, b: Filter): Filter { + const header = mergeHeaderFilter(a.header, b.header); + return { + header, + withdrawals: [...(a.withdrawals ?? []), ...(b.withdrawals ?? [])], + logs: [...(a.logs ?? []), ...(b.logs ?? [])], + transactions: [...(a.transactions ?? []), ...(b.transactions ?? [])], + }; +} + +function mergeHeaderFilter( + a?: HeaderFilter, + b?: HeaderFilter, +): HeaderFilter | undefined { + if (a === undefined) { + return b; + } + if (b === undefined) { + return a; + } + return { + always: a.always || b.always, + }; +} diff --git a/packages/evm/src/index.ts b/packages/evm/src/index.ts index 7a6e94d..0a2f5a7 100644 --- a/packages/evm/src/index.ts +++ b/packages/evm/src/index.ts @@ -1,6 +1,6 @@ import { StreamConfig } from "@apibara/protocol"; import { BlockFromBytes } from "./block"; -import { FilterFromBytes } from "./filter"; +import { FilterFromBytes, mergeFilter } from "./filter"; export * as proto from "./proto"; @@ -8,4 +8,8 @@ export * from "./common"; export * from "./filter"; export * from "./block"; -export const EvmStream = new StreamConfig(FilterFromBytes, BlockFromBytes); +export const EvmStream = new StreamConfig( + FilterFromBytes, + BlockFromBytes, + mergeFilter, +); diff --git a/packages/protocol/src/config.ts b/packages/protocol/src/config.ts index 5c58c08..40660cd 100644 --- a/packages/protocol/src/config.ts +++ b/packages/protocol/src/config.ts @@ -10,6 +10,7 @@ export class StreamConfig { constructor( private filter: Schema.Schema, private block: Schema.Schema, + public mergeFilter: (a: TFilter, b: TFilter) => TFilter, ) { this.request = StreamDataRequest(this.filter); this.response = StreamDataResponse(this.block); diff --git a/packages/protocol/src/testing/mock.test.ts b/packages/protocol/src/testing/mock.test.ts index aa38be7..4649444 100644 --- a/packages/protocol/src/testing/mock.test.ts +++ b/packages/protocol/src/testing/mock.test.ts @@ -1,7 +1,7 @@ import { Schema } from "@effect/schema"; import { describe, expect, it } from "vitest"; -import { type MockBlock, MockBlockFromBytes } from "./mock"; +import { type MockBlock, MockBlockFromBytes, MockStream } from "./mock"; describe("MockBlock", () => { const encode = Schema.encodeSync(MockBlockFromBytes); @@ -26,3 +26,10 @@ describe("MockBlock", () => { expect(block).toBe(null); }); }); + +describe("MockStream", () => { + it("allow filters to be merged", () => { + const f = MockStream.mergeFilter({ filter: "hello" }, { filter: "world" }); + expect(f).toEqual({ filter: "helloworld" }); + }); +}); diff --git a/packages/protocol/src/testing/mock.ts b/packages/protocol/src/testing/mock.ts index 5fff155..7cefc35 100644 --- a/packages/protocol/src/testing/mock.ts +++ b/packages/protocol/src/testing/mock.ts @@ -49,9 +49,22 @@ export const MockBlockFromBytes = Schema.transform( }, ); +/** For testing, simply concatenate the values of `.filter` */ +function mergeMockFilter(a: MockFilter, b: MockFilter): MockFilter { + let filter = ""; + if (a.filter) { + filter += a.filter; + } + if (b.filter) { + filter += b.filter; + } + return { filter }; +} + export const MockStream = new StreamConfig( MockFilterFromBytes, MockBlockFromBytes, + mergeMockFilter, ); export const MockStreamResponse = StreamDataResponse(MockBlockFromBytes); diff --git a/packages/starknet/src/filter.test.ts b/packages/starknet/src/filter.test.ts index daf06ef..c365766 100644 --- a/packages/starknet/src/filter.test.ts +++ b/packages/starknet/src/filter.test.ts @@ -1,7 +1,13 @@ import { Schema } from "@effect/schema"; import { describe, expect, it } from "vitest"; -import { EventFilter, HeaderFilter, Key, TransactionFilter } from "./filter"; +import { + EventFilter, + HeaderFilter, + Key, + TransactionFilter, + mergeFilter, +} from "./filter"; describe("HeaderFilter", () => { const encode = Schema.encodeSync(HeaderFilter); @@ -461,3 +467,118 @@ describe("TransactionFilter", () => { `); }); }); + +describe("mergeFilter", () => { + it("returns header.always if any has it", () => { + const fa = mergeFilter({}, { header: { always: true } }); + expect(fa).toMatchInlineSnapshot(` + { + "events": [], + "header": { + "always": true, + }, + "messages": [], + "transactions": [], + } + `); + const fb = mergeFilter({ header: { always: true } }, {}); + expect(fb).toMatchInlineSnapshot(` + { + "events": [], + "header": { + "always": true, + }, + "messages": [], + "transactions": [], + } + `); + }); + + it("returns an empty header by default", () => { + const f = mergeFilter({}, {}); + expect(f).toMatchInlineSnapshot(` + { + "events": [], + "header": undefined, + "messages": [], + "transactions": [], + } + `); + }); + + it("concatenates transactions", () => { + const f = mergeFilter( + { + transactions: [{ transactionType: { _tag: "invokeV0", invokeV0: {} } }], + }, + { + transactions: [{ transactionType: { _tag: "invokeV3", invokeV3: {} } }], + }, + ); + expect(f).toMatchInlineSnapshot(` + { + "events": [], + "header": undefined, + "messages": [], + "transactions": [ + { + "transactionType": { + "_tag": "invokeV0", + "invokeV0": {}, + }, + }, + { + "transactionType": { + "_tag": "invokeV3", + "invokeV3": {}, + }, + }, + ], + } + `); + }); + + it("concatenates events", () => { + const f = mergeFilter( + { events: [{ fromAddress: "0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" }] }, + { events: [{ fromAddress: "0xBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB" }] }, + ); + expect(f).toMatchInlineSnapshot(` + { + "events": [ + { + "fromAddress": "0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", + }, + { + "fromAddress": "0xBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB", + }, + ], + "header": undefined, + "messages": [], + "transactions": [], + } + `); + }); + + it("concatenates messages", () => { + const f = mergeFilter( + { messages: [{ fromAddress: "0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" }] }, + { messages: [{ fromAddress: "0xBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB" }] }, + ); + expect(f).toMatchInlineSnapshot(` + { + "events": [], + "header": undefined, + "messages": [ + { + "fromAddress": "0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", + }, + { + "fromAddress": "0xBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB", + }, + ], + "transactions": [], + } + `); + }); +}); diff --git a/packages/starknet/src/filter.ts b/packages/starknet/src/filter.ts index 43bbbdb..0fafed4 100644 --- a/packages/starknet/src/filter.ts +++ b/packages/starknet/src/filter.ts @@ -197,3 +197,28 @@ export const FilterFromBytes = Schema.transform( export const filterToBytes = Schema.encodeSync(FilterFromBytes); export const filterFromBytes = Schema.decodeSync(FilterFromBytes); + +export function mergeFilter(a: Filter, b: Filter): Filter { + const header = mergeHeaderFilter(a.header, b.header); + return { + header, + transactions: [...(a.transactions ?? []), ...(b.transactions ?? [])], + events: [...(a.events ?? []), ...(b.events ?? [])], + messages: [...(a.messages ?? []), ...(b.messages ?? [])], + }; +} + +function mergeHeaderFilter( + a?: HeaderFilter, + b?: HeaderFilter, +): HeaderFilter | undefined { + if (a === undefined) { + return b; + } + if (b === undefined) { + return a; + } + return { + always: a.always || b.always, + }; +} diff --git a/packages/starknet/src/index.ts b/packages/starknet/src/index.ts index 0227c45..2d74796 100644 --- a/packages/starknet/src/index.ts +++ b/packages/starknet/src/index.ts @@ -1,6 +1,6 @@ import { StreamConfig } from "@apibara/protocol"; import { BlockFromBytes } from "./block"; -import { FilterFromBytes } from "./filter"; +import { FilterFromBytes, mergeFilter } from "./filter"; export * as proto from "./proto"; @@ -8,4 +8,8 @@ export * from "./common"; export * from "./filter"; export * from "./block"; -export const StarknetStream = new StreamConfig(FilterFromBytes, BlockFromBytes); +export const StarknetStream = new StreamConfig( + FilterFromBytes, + BlockFromBytes, + mergeFilter, +);