Skip to content

Commit

Permalink
protocol: add mergeFilter to StreamConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
fracek committed Jul 3, 2024
1 parent e2a2e65 commit 06b7c38
Show file tree
Hide file tree
Showing 11 changed files with 345 additions and 8 deletions.
25 changes: 25 additions & 0 deletions packages/beaconchain/src/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,28 @@ export const FilterFromBytes = Schema.transform(

export const filterToBytes = Schema.encodeSync(FilterFromBytes);
export const filterFromBytes = Schema.decodeSync(FilterFromBytes);

export function mergeFilter(a: Filter, b: Filter): Filter {
const header = mergeHeaderFilter(a.header, b.header);
return {
header,
transactions: [...(a.transactions ?? []), ...(b.transactions ?? [])],
validators: [...(a.validators ?? []), ...(b.validators ?? [])],
blobs: [...(a.blobs ?? []), ...(b.blobs ?? [])],
};
}

function mergeHeaderFilter(
a?: HeaderFilter,
b?: HeaderFilter,
): HeaderFilter | undefined {
if (a === undefined) {
return b;
}
if (b === undefined) {
return a;
}
return {
always: a.always || b.always,
};
}
3 changes: 2 additions & 1 deletion packages/beaconchain/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { StreamConfig } from "@apibara/protocol";
import { BlockFromBytes } from "./block";
import { FilterFromBytes } from "./filter";
import { FilterFromBytes, mergeFilter } from "./filter";

export * as proto from "./proto";

Expand All @@ -11,4 +11,5 @@ export * from "./block";
export const BeaconChainStream = new StreamConfig(
FilterFromBytes,
BlockFromBytes,
mergeFilter,
);
113 changes: 112 additions & 1 deletion packages/evm/src/filter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ import { encodeEventTopics, pad, parseAbi } from "viem";
import { describe, expect, it } from "vitest";

import { Schema } from "@effect/schema";
import { Filter, LogFilter, filterFromProto, filterToProto } from "./filter";
import {
Filter,
LogFilter,
filterFromProto,
filterToProto,
mergeFilter,
} from "./filter";

const abi = parseAbi([
"event Transfer(address indexed from, address indexed to, uint256 value)",
Expand Down Expand Up @@ -78,3 +84,108 @@ describe("LogFilter", () => {
expect(back).toEqual(filter);
});
});

describe("mergeFilter", () => {
it("returns header.always if any has it", () => {
const fa = mergeFilter({}, { header: { always: true } });
expect(fa).toMatchInlineSnapshot(`
{
"header": {
"always": true,
},
"logs": [],
"transactions": [],
"withdrawals": [],
}
`);
const fb = mergeFilter({ header: { always: true } }, {});
expect(fb).toMatchInlineSnapshot(`
{
"header": {
"always": true,
},
"logs": [],
"transactions": [],
"withdrawals": [],
}
`);
});

it("returns an empty header by default", () => {
const f = mergeFilter({}, {});
expect(f).toMatchInlineSnapshot(`
{
"header": undefined,
"logs": [],
"transactions": [],
"withdrawals": [],
}
`);
});

it("concatenates logs", () => {
const f = mergeFilter(
{ logs: [{ address: "0xAAAAAAAAAAAAAAAAAAAAAA" }] },
{ logs: [{ address: "0xBBBBBBBBBBBBBBBBBBBBBB" }] },
);
expect(f).toMatchInlineSnapshot(`
{
"header": undefined,
"logs": [
{
"address": "0xAAAAAAAAAAAAAAAAAAAAAA",
},
{
"address": "0xBBBBBBBBBBBBBBBBBBBBBB",
},
],
"transactions": [],
"withdrawals": [],
}
`);
});

it("concatenates transactions", () => {
const f = mergeFilter(
{ transactions: [{ from: "0xAAAAAAAAAAAAAAAAAAAAAA" }] },
{ transactions: [{ from: "0xBBBBBBBBBBBBBBBBBBBBBB" }] },
);
expect(f).toMatchInlineSnapshot(`
{
"header": undefined,
"logs": [],
"transactions": [
{
"from": "0xAAAAAAAAAAAAAAAAAAAAAA",
},
{
"from": "0xBBBBBBBBBBBBBBBBBBBBBB",
},
],
"withdrawals": [],
}
`);
});

it("concatenates withdrawals", () => {
const f = mergeFilter(
{ withdrawals: [{ validatorIndex: 1n }] },
{ withdrawals: [{ validatorIndex: 100n }] },
);
expect(f).toMatchInlineSnapshot(`
{
"header": undefined,
"logs": [],
"transactions": [],
"withdrawals": [
{
"validatorIndex": 1n,
},
{
"validatorIndex": 100n,
},
],
}
`);
});
});
25 changes: 25 additions & 0 deletions packages/evm/src/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,28 @@ export const FilterFromBytes = Schema.transform(

export const filterToBytes = Schema.encodeSync(FilterFromBytes);
export const filterFromBytes = Schema.decodeSync(FilterFromBytes);

export function mergeFilter(a: Filter, b: Filter): Filter {
const header = mergeHeaderFilter(a.header, b.header);
return {
header,
withdrawals: [...(a.withdrawals ?? []), ...(b.withdrawals ?? [])],
logs: [...(a.logs ?? []), ...(b.logs ?? [])],
transactions: [...(a.transactions ?? []), ...(b.transactions ?? [])],
};
}

function mergeHeaderFilter(
a?: HeaderFilter,
b?: HeaderFilter,
): HeaderFilter | undefined {
if (a === undefined) {
return b;
}
if (b === undefined) {
return a;
}
return {
always: a.always || b.always,
};
}
8 changes: 6 additions & 2 deletions packages/evm/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import { StreamConfig } from "@apibara/protocol";
import { BlockFromBytes } from "./block";
import { FilterFromBytes } from "./filter";
import { FilterFromBytes, mergeFilter } from "./filter";

export * as proto from "./proto";

export * from "./common";
export * from "./filter";
export * from "./block";

export const EvmStream = new StreamConfig(FilterFromBytes, BlockFromBytes);
export const EvmStream = new StreamConfig(
FilterFromBytes,
BlockFromBytes,
mergeFilter,
);
1 change: 1 addition & 0 deletions packages/protocol/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export class StreamConfig<TFilter, TBlock> {
constructor(
private filter: Schema.Schema<TFilter, Uint8Array, never>,
private block: Schema.Schema<TBlock | null, Uint8Array, never>,
public mergeFilter: (a: TFilter, b: TFilter) => TFilter,
) {
this.request = StreamDataRequest(this.filter);
this.response = StreamDataResponse(this.block);
Expand Down
9 changes: 8 additions & 1 deletion packages/protocol/src/testing/mock.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Schema } from "@effect/schema";
import { describe, expect, it } from "vitest";

import { type MockBlock, MockBlockFromBytes } from "./mock";
import { type MockBlock, MockBlockFromBytes, MockStream } from "./mock";

describe("MockBlock", () => {
const encode = Schema.encodeSync(MockBlockFromBytes);
Expand All @@ -26,3 +26,10 @@ describe("MockBlock", () => {
expect(block).toBe(null);
});
});

describe("MockStream", () => {
it("allow filters to be merged", () => {
const f = MockStream.mergeFilter({ filter: "hello" }, { filter: "world" });
expect(f).toEqual({ filter: "helloworld" });
});
});
13 changes: 13 additions & 0 deletions packages/protocol/src/testing/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,22 @@ export const MockBlockFromBytes = Schema.transform(
},
);

/** For testing, simply concatenate the values of `.filter` */
function mergeMockFilter(a: MockFilter, b: MockFilter): MockFilter {
let filter = "";
if (a.filter) {
filter += a.filter;
}
if (b.filter) {
filter += b.filter;
}
return { filter };
}

export const MockStream = new StreamConfig(
MockFilterFromBytes,
MockBlockFromBytes,
mergeMockFilter,
);

export const MockStreamResponse = StreamDataResponse(MockBlockFromBytes);
Expand Down
123 changes: 122 additions & 1 deletion packages/starknet/src/filter.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { Schema } from "@effect/schema";
import { describe, expect, it } from "vitest";

import { EventFilter, HeaderFilter, Key, TransactionFilter } from "./filter";
import {
EventFilter,
HeaderFilter,
Key,
TransactionFilter,
mergeFilter,
} from "./filter";

describe("HeaderFilter", () => {
const encode = Schema.encodeSync(HeaderFilter);
Expand Down Expand Up @@ -461,3 +467,118 @@ describe("TransactionFilter", () => {
`);
});
});

describe("mergeFilter", () => {
it("returns header.always if any has it", () => {
const fa = mergeFilter({}, { header: { always: true } });
expect(fa).toMatchInlineSnapshot(`
{
"events": [],
"header": {
"always": true,
},
"messages": [],
"transactions": [],
}
`);
const fb = mergeFilter({ header: { always: true } }, {});
expect(fb).toMatchInlineSnapshot(`
{
"events": [],
"header": {
"always": true,
},
"messages": [],
"transactions": [],
}
`);
});

it("returns an empty header by default", () => {
const f = mergeFilter({}, {});
expect(f).toMatchInlineSnapshot(`
{
"events": [],
"header": undefined,
"messages": [],
"transactions": [],
}
`);
});

it("concatenates transactions", () => {
const f = mergeFilter(
{
transactions: [{ transactionType: { _tag: "invokeV0", invokeV0: {} } }],
},
{
transactions: [{ transactionType: { _tag: "invokeV3", invokeV3: {} } }],
},
);
expect(f).toMatchInlineSnapshot(`
{
"events": [],
"header": undefined,
"messages": [],
"transactions": [
{
"transactionType": {
"_tag": "invokeV0",
"invokeV0": {},
},
},
{
"transactionType": {
"_tag": "invokeV3",
"invokeV3": {},
},
},
],
}
`);
});

it("concatenates events", () => {
const f = mergeFilter(
{ events: [{ fromAddress: "0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" }] },
{ events: [{ fromAddress: "0xBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB" }] },
);
expect(f).toMatchInlineSnapshot(`
{
"events": [
{
"fromAddress": "0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
},
{
"fromAddress": "0xBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB",
},
],
"header": undefined,
"messages": [],
"transactions": [],
}
`);
});

it("concatenates messages", () => {
const f = mergeFilter(
{ messages: [{ fromAddress: "0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" }] },
{ messages: [{ fromAddress: "0xBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB" }] },
);
expect(f).toMatchInlineSnapshot(`
{
"events": [],
"header": undefined,
"messages": [
{
"fromAddress": "0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
},
{
"fromAddress": "0xBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB",
},
],
"transactions": [],
}
`);
});
});
Loading

0 comments on commit 06b7c38

Please sign in to comment.