diff --git a/examples/beaconchain-client/src/main.ts b/examples/beaconchain-client/src/main.ts index 0cc10bd..198d336 100644 --- a/examples/beaconchain-client/src/main.ts +++ b/examples/beaconchain-client/src/main.ts @@ -1,3 +1,4 @@ +import assert from "node:assert"; import { BeaconChainStream, Filter } from "@apibara/beaconchain"; import { createClient } from "@apibara/protocol"; import { defineCommand, runMain } from "citty"; @@ -60,6 +61,7 @@ const command = defineCommand({ const maxBlobSize = 131_072; // bytes for (const block of message.data.data) { + assert(block !== null); const transactions = block.transactions ?? []; for (const blob of block.blobs) { if (blob.blob === undefined) continue; diff --git a/examples/evm-client/src/main.ts b/examples/evm-client/src/main.ts index 334bff0..e3fa24c 100644 --- a/examples/evm-client/src/main.ts +++ b/examples/evm-client/src/main.ts @@ -1,3 +1,4 @@ +import assert from "node:assert"; import { EvmStream, Filter } from "@apibara/evm"; import { createClient } from "@apibara/protocol"; import { defineCommand, runMain } from "citty"; @@ -63,6 +64,7 @@ const command = defineCommand({ case "data": { consola.info("Block", message.data.endCursor?.orderKey); for (const block of message.data.data) { + assert(block !== null); consola.info("Block", block.header?.number); for (const log of block.logs ?? []) { // const { args } = decodeEventLog({ diff --git a/examples/starknet-client/src/main.ts b/examples/starknet-client/src/main.ts index 4eec8af..11c12cd 100644 --- a/examples/starknet-client/src/main.ts +++ b/examples/starknet-client/src/main.ts @@ -1,3 +1,4 @@ +import assert from "node:assert"; import { createClient } from "@apibara/protocol"; import { Filter, StarknetStream } from "@apibara/starknet"; import { defineCommand, runMain } from "citty"; @@ -98,6 +99,7 @@ const command = defineCommand({ let events = 0; for (const block of message.data.data) { + assert(block !== null); events += block.events.length ?? 0; consola.info( `Block n=${block.header?.blockNumber} h=${block.header?.blockHash}`, diff --git a/packages/beaconchain/src/block.ts b/packages/beaconchain/src/block.ts index cf8d47e..9cf0f21 100644 --- a/packages/beaconchain/src/block.ts +++ b/packages/beaconchain/src/block.ts @@ -92,13 +92,19 @@ export type Block = typeof Block.Type; export const BlockFromBytes = Schema.transform( Schema.Uint8ArrayFromSelf, - Block, + Schema.NullOr(Block), { strict: false, decode(value) { + if (value.length === 0) { + return null; + } return proto.data.Block.decode(value); }, encode(value) { + if (value === null) { + return new Uint8Array(); + } return proto.data.Block.encode(value).finish(); }, }, diff --git a/packages/beaconchain/src/filter.ts b/packages/beaconchain/src/filter.ts index 704766a..d372685 100644 --- a/packages/beaconchain/src/filter.ts +++ b/packages/beaconchain/src/filter.ts @@ -82,3 +82,28 @@ export const FilterFromBytes = Schema.transform( export const filterToBytes = Schema.encodeSync(FilterFromBytes); export const filterFromBytes = Schema.decodeSync(FilterFromBytes); + +export function mergeFilter(a: Filter, b: Filter): Filter { + const header = mergeHeaderFilter(a.header, b.header); + return { + header, + transactions: [...(a.transactions ?? []), ...(b.transactions ?? [])], + validators: [...(a.validators ?? []), ...(b.validators ?? [])], + blobs: [...(a.blobs ?? []), ...(b.blobs ?? [])], + }; +} + +function mergeHeaderFilter( + a?: HeaderFilter, + b?: HeaderFilter, +): HeaderFilter | undefined { + if (a === undefined) { + return b; + } + if (b === undefined) { + return a; + } + return { + always: a.always || b.always, + }; +} diff --git a/packages/beaconchain/src/index.ts b/packages/beaconchain/src/index.ts index ea6cf7f..22c6e78 100644 --- a/packages/beaconchain/src/index.ts +++ b/packages/beaconchain/src/index.ts @@ -1,6 +1,6 @@ import { StreamConfig } from "@apibara/protocol"; import { BlockFromBytes } from "./block"; -import { FilterFromBytes } from "./filter"; +import { FilterFromBytes, mergeFilter } from "./filter"; export * as proto from "./proto"; @@ -11,4 +11,5 @@ export * from "./block"; export const BeaconChainStream = new StreamConfig( FilterFromBytes, BlockFromBytes, + mergeFilter, ); diff --git a/packages/evm/src/block.ts b/packages/evm/src/block.ts index d3fa2bf..92c5470 100644 --- a/packages/evm/src/block.ts +++ b/packages/evm/src/block.ts @@ -148,13 +148,19 @@ export type Block = typeof Block.Type; export const BlockFromBytes = Schema.transform( Schema.Uint8ArrayFromSelf, - Block, + Schema.NullOr(Block), { strict: false, decode(value) { + if (value.length === 0) { + return null; + } return proto.data.Block.decode(value); }, encode(value) { + if (value === null) { + return new Uint8Array(); + } return proto.data.Block.encode(value).finish(); }, }, diff --git a/packages/evm/src/filter.test.ts b/packages/evm/src/filter.test.ts index 9fdae58..bd1f304 100644 --- a/packages/evm/src/filter.test.ts +++ b/packages/evm/src/filter.test.ts @@ -2,7 +2,13 @@ import { encodeEventTopics, pad, parseAbi } from "viem"; import { describe, expect, it } from "vitest"; import { Schema } from "@effect/schema"; -import { Filter, LogFilter, filterFromProto, filterToProto } from "./filter"; +import { + Filter, + LogFilter, + filterFromProto, + filterToProto, + mergeFilter, +} from "./filter"; const abi = parseAbi([ "event Transfer(address indexed from, address indexed to, uint256 value)", @@ -78,3 +84,108 @@ describe("LogFilter", () => { expect(back).toEqual(filter); }); }); + +describe("mergeFilter", () => { + it("returns header.always if any has it", () => { + const fa = mergeFilter({}, { header: { always: true } }); + expect(fa).toMatchInlineSnapshot(` + { + "header": { + "always": true, + }, + "logs": [], + "transactions": [], + "withdrawals": [], + } + `); + const fb = mergeFilter({ header: { always: true } }, {}); + expect(fb).toMatchInlineSnapshot(` + { + "header": { + "always": true, + }, + "logs": [], + "transactions": [], + "withdrawals": [], + } + `); + }); + + it("returns an empty header by default", () => { + const f = mergeFilter({}, {}); + expect(f).toMatchInlineSnapshot(` + { + "header": undefined, + "logs": [], + "transactions": [], + "withdrawals": [], + } + `); + }); + + it("concatenates logs", () => { + const f = mergeFilter( + { logs: [{ address: "0xAAAAAAAAAAAAAAAAAAAAAA" }] }, + { logs: [{ address: "0xBBBBBBBBBBBBBBBBBBBBBB" }] }, + ); + expect(f).toMatchInlineSnapshot(` + { + "header": undefined, + "logs": [ + { + "address": "0xAAAAAAAAAAAAAAAAAAAAAA", + }, + { + "address": "0xBBBBBBBBBBBBBBBBBBBBBB", + }, + ], + "transactions": [], + "withdrawals": [], + } + `); + }); + + it("concatenates transactions", () => { + const f = mergeFilter( + { transactions: [{ from: "0xAAAAAAAAAAAAAAAAAAAAAA" }] }, + { transactions: [{ from: "0xBBBBBBBBBBBBBBBBBBBBBB" }] }, + ); + expect(f).toMatchInlineSnapshot(` + { + "header": undefined, + "logs": [], + "transactions": [ + { + "from": "0xAAAAAAAAAAAAAAAAAAAAAA", + }, + { + "from": "0xBBBBBBBBBBBBBBBBBBBBBB", + }, + ], + "withdrawals": [], + } + `); + }); + + it("concatenates withdrawals", () => { + const f = mergeFilter( + { withdrawals: [{ validatorIndex: 1n }] }, + { withdrawals: [{ validatorIndex: 100n }] }, + ); + expect(f).toMatchInlineSnapshot(` + { + "header": undefined, + "logs": [], + "transactions": [], + "withdrawals": [ + { + "validatorIndex": 1n, + }, + { + "validatorIndex": 100n, + }, + ], + } + `); + }); +}); diff --git a/packages/evm/src/filter.ts b/packages/evm/src/filter.ts index 030bb3a..fb520d2 100644 --- a/packages/evm/src/filter.ts +++ b/packages/evm/src/filter.ts @@ -90,3 +90,28 @@ export const FilterFromBytes = Schema.transform( export const filterToBytes = Schema.encodeSync(FilterFromBytes); export const filterFromBytes = Schema.decodeSync(FilterFromBytes); + +export function mergeFilter(a: Filter, b: Filter): Filter { + const header = mergeHeaderFilter(a.header, b.header); + return { + header, + withdrawals: [...(a.withdrawals ?? []), ...(b.withdrawals ?? [])], + logs: [...(a.logs ?? []), ...(b.logs ?? [])], + transactions: [...(a.transactions ?? []), ...(b.transactions ?? [])], + }; +} + +function mergeHeaderFilter( + a?: HeaderFilter, + b?: HeaderFilter, +): HeaderFilter | undefined { + if (a === undefined) { + return b; + } + if (b === undefined) { + return a; + } + return { + always: a.always || b.always, + }; +} diff --git a/packages/evm/src/index.ts b/packages/evm/src/index.ts index 7a6e94d..0a2f5a7 100644 --- a/packages/evm/src/index.ts +++ b/packages/evm/src/index.ts @@ -1,6 +1,6 @@ import { StreamConfig } from "@apibara/protocol"; import { BlockFromBytes } from "./block"; -import { FilterFromBytes } from "./filter"; +import { FilterFromBytes, mergeFilter } from "./filter"; export * as proto from "./proto"; @@ -8,4 +8,8 @@ export * from "./common"; export * from "./filter"; export * from "./block"; -export const EvmStream = new StreamConfig(FilterFromBytes, BlockFromBytes); +export const EvmStream = new StreamConfig( + FilterFromBytes, + BlockFromBytes, + mergeFilter, +); diff --git a/packages/indexer/src/indexer.test.ts b/packages/indexer/src/indexer.test.ts index df2a655..e9f0396 100644 --- a/packages/indexer/src/indexer.test.ts +++ b/packages/indexer/src/indexer.test.ts @@ -5,7 +5,7 @@ import { generateMockMessages, vcr } from "./testing"; import { type MockRet, getMockIndexer } from "./testing/indexer"; describe("Run Test", () => { - const client = new MockClient(messages, [{}]); + const client = new MockClient(generateMockMessages(), [{}]); it("should stream messages", async () => { const sink = vcr(); @@ -16,7 +16,7 @@ describe("Run Test", () => { { "data": [ { - "blockNumber": 5000000, + "data": "5000000", }, ], "endCursor": { @@ -26,7 +26,7 @@ describe("Run Test", () => { { "data": [ { - "blockNumber": 5000001, + "data": "5000001", }, ], "endCursor": { @@ -36,7 +36,7 @@ describe("Run Test", () => { { "data": [ { - "blockNumber": 5000002, + "data": "5000002", }, ], "endCursor": { @@ -46,7 +46,7 @@ describe("Run Test", () => { { "data": [ { - "blockNumber": 5000003, + "data": "5000003", }, ], "endCursor": { @@ -56,7 +56,7 @@ describe("Run Test", () => { { "data": [ { - "blockNumber": 5000004, + "data": "5000004", }, ], "endCursor": { @@ -66,7 +66,7 @@ describe("Run Test", () => { { "data": [ { - "blockNumber": 5000005, + "data": "5000005", }, ], "endCursor": { @@ -76,7 +76,7 @@ describe("Run Test", () => { { "data": [ { - "blockNumber": 5000006, + "data": "5000006", }, ], "endCursor": { @@ -86,7 +86,7 @@ describe("Run Test", () => { { "data": [ { - "blockNumber": 5000007, + "data": "5000007", }, ], "endCursor": { @@ -96,7 +96,7 @@ describe("Run Test", () => { { "data": [ { - "blockNumber": 5000008, + "data": "5000008", }, ], "endCursor": { @@ -106,7 +106,7 @@ describe("Run Test", () => { { "data": [ { - "blockNumber": 5000009, + "data": "5000009", }, ], "endCursor": { @@ -117,5 +117,3 @@ describe("Run Test", () => { `); }); }); - -const messages = generateMockMessages(); diff --git a/packages/indexer/src/indexer.ts b/packages/indexer/src/indexer.ts index b9608f0..d7684e9 100644 --- a/packages/indexer/src/indexer.ts +++ b/packages/indexer/src/indexer.ts @@ -158,6 +158,8 @@ export async function run( } const block = blocks[0]; + // Until we implement factory mode, block should never be null. + assert(block !== null); const output = await tracer.startActiveSpan( "handler", diff --git a/packages/indexer/src/plugins/persistence.test.ts b/packages/indexer/src/plugins/persistence.test.ts index e10d6f6..f364064 100644 --- a/packages/indexer/src/plugins/persistence.test.ts +++ b/packages/indexer/src/plugins/persistence.test.ts @@ -1,5 +1,9 @@ import type { Cursor } from "@apibara/protocol"; -import { type MockBlock, MockClient } from "@apibara/protocol/testing"; +import { + type MockBlock, + MockClient, + type MockFilter, +} from "@apibara/protocol/testing"; import { klona } from "klona/full"; import { open } from "sqlite"; import sqlite3 from "sqlite3"; @@ -88,8 +92,7 @@ describe("Persistence", () => { it("should work with indexer and store cursor of last message", async () => { const client = new MockClient(messages, [{}]); - // biome-ignore lint/complexity/noBannedTypes: - const persistence = sqlitePersistence<{}, MockBlock, MockRet>({ + const persistence = sqlitePersistence({ driver: sqlite3.Database, filename: "file:memdb1?mode=memory&cache=shared", }); diff --git a/packages/indexer/src/testing/helper.ts b/packages/indexer/src/testing/helper.ts index 05ad061..85e53a9 100644 --- a/packages/indexer/src/testing/helper.ts +++ b/packages/indexer/src/testing/helper.ts @@ -1,14 +1,11 @@ -import type { StreamDataResponse } from "@apibara/protocol"; -import type { MockBlock } from "@apibara/protocol/testing"; +import type { MockStreamResponse } from "@apibara/protocol/testing"; -export function generateMockMessages( - count = 10, -): StreamDataResponse[] { +export function generateMockMessages(count = 10): MockStreamResponse[] { return [...Array(count)].map((_, i) => ({ _tag: "data", data: { finality: "accepted", - data: [{ blockNumber: BigInt(5_000_000 + i) }], + data: [{ data: `${5_000_000 + i}` }], endCursor: { orderKey: BigInt(5_000_000 + i) }, }, })); diff --git a/packages/indexer/src/testing/indexer.ts b/packages/indexer/src/testing/indexer.ts index f92d163..a1392f7 100644 --- a/packages/indexer/src/testing/indexer.ts +++ b/packages/indexer/src/testing/indexer.ts @@ -1,25 +1,28 @@ -import { type MockBlock, MockStream } from "@apibara/protocol/testing"; +import { + type MockBlock, + type MockFilter, + MockStream, +} from "@apibara/protocol/testing"; import { createIndexer, defineIndexer } from "../indexer"; import type { IndexerPlugin } from "../plugins"; export const getMockIndexer = ( - // biome-ignore lint/complexity/noBannedTypes: - plugins: ReadonlyArray> = [], + plugins: ReadonlyArray> = [], ) => createIndexer( defineIndexer(MockStream)({ streamUrl: "https://sepolia.ethereum.a5a.ch", finality: "accepted", filter: {}, - transform({ block: { blockNumber } }) { - if (!blockNumber) return []; + transform({ block: { data } }) { + if (!data) return []; - return [{ blockNumber: Number(blockNumber) }]; + return [{ data }]; }, plugins, }), ); export type MockRet = { - blockNumber: number; + data: string; }; diff --git a/packages/protocol/proto/testing.proto b/packages/protocol/proto/testing.proto index 85843b6..3f72e02 100644 --- a/packages/protocol/proto/testing.proto +++ b/packages/protocol/proto/testing.proto @@ -2,8 +2,10 @@ syntax = "proto3"; package dna.v2.testing; -message MockFilter {} +message MockFilter { + optional string filter = 1; +} message MockBlock { - uint64 block_number = 1; -} \ No newline at end of file + optional string data = 1; +} diff --git a/packages/protocol/src/client.ts b/packages/protocol/src/client.ts index bcf7646..118754e 100644 --- a/packages/protocol/src/client.ts +++ b/packages/protocol/src/client.ts @@ -88,7 +88,7 @@ export class GrpcClient implements Client { export class StreamDataIterable { constructor( private it: AsyncIterable, - private schema: Schema.Schema, + private schema: Schema.Schema, private options?: StreamDataOptions, ) {} diff --git a/packages/protocol/src/config.ts b/packages/protocol/src/config.ts index 01f9dc2..40660cd 100644 --- a/packages/protocol/src/config.ts +++ b/packages/protocol/src/config.ts @@ -9,7 +9,8 @@ export class StreamConfig { constructor( private filter: Schema.Schema, - private block: Schema.Schema, + private block: Schema.Schema, + public mergeFilter: (a: TFilter, b: TFilter) => TFilter, ) { this.request = StreamDataRequest(this.filter); this.response = StreamDataResponse(this.block); diff --git a/packages/protocol/src/proto/testing.ts b/packages/protocol/src/proto/testing.ts index 07a1720..a5ca2a2 100644 --- a/packages/protocol/src/proto/testing.ts +++ b/packages/protocol/src/proto/testing.ts @@ -5,24 +5,27 @@ // source: testing.proto /* eslint-disable */ -import Long from "long"; import _m0 from "protobufjs/minimal"; export const protobufPackage = "dna.v2.testing"; export interface MockFilter { + readonly filter?: string | undefined; } export interface MockBlock { - readonly blockNumber: bigint; + readonly data?: string | undefined; } function createBaseMockFilter(): MockFilter { - return {}; + return { filter: undefined }; } export const MockFilter = { - encode(_: MockFilter, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + encode(message: MockFilter, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.filter !== undefined) { + writer.uint32(10).string(message.filter); + } return writer; }, @@ -33,6 +36,13 @@ export const MockFilter = { while (reader.pos < end) { const tag = reader.uint32(); switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.filter = reader.string(); + continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -42,35 +52,36 @@ export const MockFilter = { return message; }, - fromJSON(_: any): MockFilter { - return {}; + fromJSON(object: any): MockFilter { + return { filter: isSet(object.filter) ? globalThis.String(object.filter) : undefined }; }, - toJSON(_: MockFilter): unknown { + toJSON(message: MockFilter): unknown { const obj: any = {}; + if (message.filter !== undefined) { + obj.filter = message.filter; + } return obj; }, create(base?: DeepPartial): MockFilter { return MockFilter.fromPartial(base ?? {}); }, - fromPartial(_: DeepPartial): MockFilter { + fromPartial(object: DeepPartial): MockFilter { const message = createBaseMockFilter() as any; + message.filter = object.filter ?? undefined; return message; }, }; function createBaseMockBlock(): MockBlock { - return { blockNumber: BigInt("0") }; + return { data: undefined }; } export const MockBlock = { encode(message: MockBlock, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { - if (message.blockNumber !== BigInt("0")) { - if (BigInt.asUintN(64, message.blockNumber) !== message.blockNumber) { - throw new globalThis.Error("value provided for field message.blockNumber of type uint64 too large"); - } - writer.uint32(8).uint64(message.blockNumber.toString()); + if (message.data !== undefined) { + writer.uint32(10).string(message.data); } return writer; }, @@ -83,11 +94,11 @@ export const MockBlock = { const tag = reader.uint32(); switch (tag >>> 3) { case 1: - if (tag !== 8) { + if (tag !== 10) { break; } - message.blockNumber = longToBigint(reader.uint64() as Long); + message.data = reader.string(); continue; } if ((tag & 7) === 4 || tag === 0) { @@ -99,13 +110,13 @@ export const MockBlock = { }, fromJSON(object: any): MockBlock { - return { blockNumber: isSet(object.blockNumber) ? BigInt(object.blockNumber) : BigInt("0") }; + return { data: isSet(object.data) ? globalThis.String(object.data) : undefined }; }, toJSON(message: MockBlock): unknown { const obj: any = {}; - if (message.blockNumber !== BigInt("0")) { - obj.blockNumber = message.blockNumber.toString(); + if (message.data !== undefined) { + obj.data = message.data; } return obj; }, @@ -115,7 +126,7 @@ export const MockBlock = { }, fromPartial(object: DeepPartial): MockBlock { const message = createBaseMockBlock() as any; - message.blockNumber = object.blockNumber ?? BigInt("0"); + message.data = object.data ?? undefined; return message; }, }; @@ -130,15 +141,6 @@ export type DeepPartial = T extends Builtin ? T : T extends {} ? { [K in keyof T]?: DeepPartial } : Partial; -function longToBigint(long: Long) { - return BigInt(long.toString()); -} - -if (_m0.util.Long !== Long) { - _m0.util.Long = Long as any; - _m0.configure(); -} - function isSet(value: any): boolean { return value !== null && value !== undefined; } diff --git a/packages/protocol/src/stream.test-d.ts b/packages/protocol/src/stream.test-d.ts new file mode 100644 index 0000000..37b68fb --- /dev/null +++ b/packages/protocol/src/stream.test-d.ts @@ -0,0 +1,33 @@ +import { Schema } from "@effect/schema"; +import { test } from "vitest"; + +import { Data } from "./stream"; + +const Inner = Schema.Struct({ + data: Schema.String, +}); + +const Good = Schema.transform(Schema.Uint8ArrayFromSelf, Schema.NullOr(Inner), { + decode(value) { + throw new Error("not implemented"); + }, + encode(value) { + throw new Error("not implemented"); + }, +}); + +const Bad = Schema.transform(Schema.Uint8ArrayFromSelf, Inner, { + decode(value) { + throw new Error("not implemented"); + }, + encode(value) { + throw new Error("not implemented"); + }, +}); + +test("Data", () => { + const GoodData = Data(Good); + + // @ts-expect-error + const BadData = Data(Bad); +}); diff --git a/packages/protocol/src/stream.test.ts b/packages/protocol/src/stream.test.ts index bebef1a..0654296 100644 --- a/packages/protocol/src/stream.test.ts +++ b/packages/protocol/src/stream.test.ts @@ -14,15 +14,22 @@ const InnerData = Schema.Struct({ value: Schema.String, }); -const TestData = Schema.transform(Schema.Uint8ArrayFromSelf, InnerData, { - decode(bytes) { - const value = new TextDecoder().decode(bytes); - return { value }; - }, - encode({ value }) { - return new TextEncoder().encode(value); +const TestData = Schema.transform( + Schema.Uint8ArrayFromSelf, + Schema.NullOr(InnerData), + { + decode(bytes) { + const value = new TextDecoder().decode(bytes); + return { value }; + }, + encode(value) { + if (value === null) { + return new Uint8Array(); + } + return new TextEncoder().encode(value.value); + }, }, -}); +); const TestStreamDataRequest = StreamDataRequest(TestData); diff --git a/packages/protocol/src/stream.ts b/packages/protocol/src/stream.ts index c074926..9f32cb3 100644 --- a/packages/protocol/src/stream.ts +++ b/packages/protocol/src/stream.ts @@ -88,7 +88,9 @@ export const SystemMessage = Schema.Struct({ export type SystemMessage = typeof SystemMessage.Type; -export const Data = (schema: Schema.Schema) => +export const Data = ( + schema: Schema.Schema, +) => Schema.Struct({ _tag: tag("data"), data: Schema.Struct({ @@ -100,7 +102,7 @@ export const Data = (schema: Schema.Schema) => }); export const StreamDataResponse = ( - data: Schema.Schema, + data: Schema.Schema, ) => Schema.Union(Data(data), Invalidate, Heartbeat, SystemMessage); const ResponseWithoutData = Schema.Union(Invalidate, Heartbeat, SystemMessage); @@ -114,7 +116,7 @@ export type StreamDataResponse = cursor?: Cursor | undefined; endCursor?: Cursor | undefined; finality: DataFinality; - data: readonly TA[]; + data: readonly (TA | null)[]; }; }; diff --git a/packages/protocol/src/testing/client.test.ts b/packages/protocol/src/testing/client.test.ts new file mode 100644 index 0000000..663954a --- /dev/null +++ b/packages/protocol/src/testing/client.test.ts @@ -0,0 +1,71 @@ +import { describe, expect, it } from "vitest"; +import { MockClient } from "./client"; + +describe("MockClient", () => { + it("returns a stream of messages", async () => { + const client = new MockClient( + [ + { + _tag: "data", + data: { finality: "finalized", data: [{ data: "hello" }] }, + }, + ], + [], + ); + + const output = []; + for await (const m of client.streamData({ filter: [] })) { + output.push(m); + } + + expect(output).toMatchInlineSnapshot(` + [ + { + "_tag": "data", + "data": { + "data": [ + { + "data": "hello", + }, + ], + "finality": "finalized", + }, + }, + ] + `); + }); + + it("supports factory messages", async () => { + const client = new MockClient( + [ + { + _tag: "data", + data: { finality: "finalized", data: [{ data: "hello" }, null] }, + }, + ], + [], + ); + + const output = []; + for await (const m of client.streamData({ filter: [] })) { + output.push(m); + } + + expect(output).toMatchInlineSnapshot(` + [ + { + "_tag": "data", + "data": { + "data": [ + { + "data": "hello", + }, + null, + ], + "finality": "finalized", + }, + }, + ] + `); + }); +}); diff --git a/packages/protocol/src/testing/mock.test.ts b/packages/protocol/src/testing/mock.test.ts new file mode 100644 index 0000000..4649444 --- /dev/null +++ b/packages/protocol/src/testing/mock.test.ts @@ -0,0 +1,35 @@ +import { Schema } from "@effect/schema"; +import { describe, expect, it } from "vitest"; + +import { type MockBlock, MockBlockFromBytes, MockStream } from "./mock"; + +describe("MockBlock", () => { + const encode = Schema.encodeSync(MockBlockFromBytes); + const decode = Schema.decodeSync(MockBlockFromBytes); + + it("can be encoded and decoded", () => { + const block = { data: "hello" } satisfies MockBlock; + + const proto = encode(block); + const back = decode(proto); + + expect(back).toEqual(block); + }); + + it("encodes null as empty data", () => { + const proto = encode(null); + expect(proto).toEqual(new Uint8Array()); + }); + + it("decodes empty data as null", () => { + const block = decode(new Uint8Array()); + expect(block).toBe(null); + }); +}); + +describe("MockStream", () => { + it("allow filters to be merged", () => { + const f = MockStream.mergeFilter({ filter: "hello" }, { filter: "world" }); + expect(f).toEqual({ filter: "helloworld" }); + }); +}); diff --git a/packages/protocol/src/testing/mock.ts b/packages/protocol/src/testing/mock.ts index fa33475..7cefc35 100644 --- a/packages/protocol/src/testing/mock.ts +++ b/packages/protocol/src/testing/mock.ts @@ -1,10 +1,17 @@ import { Schema } from "@effect/schema"; import { StreamConfig } from "../config"; import * as proto from "../proto"; +import { StreamDataResponse } from "../stream"; -export const MockFilter = Schema.transform( +export const MockFilter = Schema.Struct({ + filter: Schema.optional(Schema.String), +}); + +export type MockFilter = typeof MockFilter.Type; + +export const MockFilterFromBytes = Schema.transform( Schema.Uint8ArrayFromSelf, - Schema.Struct({}), + MockFilter, { strict: false, decode(value) { @@ -16,23 +23,49 @@ export const MockFilter = Schema.transform( }, ); -export type MockFilter = typeof MockFilter.Type; +const MockBlock = Schema.Struct({ + data: Schema.optional(Schema.String), +}); + +export type MockBlock = typeof MockBlock.Type; -export const MockBlock = Schema.transform( +export const MockBlockFromBytes = Schema.transform( Schema.Uint8ArrayFromSelf, - Schema.Struct({ - blockNumber: Schema.BigIntFromSelf, - }), + Schema.NullOr(MockBlock), { + strict: false, decode(value) { + if (value.length === 0) { + return null; + } return proto.testing.MockBlock.decode(value); }, encode(value) { - return proto.testing.MockFilter.encode(value).finish(); + if (value === null) { + return new Uint8Array(); + } + return proto.testing.MockBlock.encode(value).finish(); }, }, ); -export type MockBlock = typeof MockBlock.Type; +/** For testing, simply concatenate the values of `.filter` */ +function mergeMockFilter(a: MockFilter, b: MockFilter): MockFilter { + let filter = ""; + if (a.filter) { + filter += a.filter; + } + if (b.filter) { + filter += b.filter; + } + return { filter }; +} + +export const MockStream = new StreamConfig( + MockFilterFromBytes, + MockBlockFromBytes, + mergeMockFilter, +); -export const MockStream = new StreamConfig(MockFilter, MockBlock); +export const MockStreamResponse = StreamDataResponse(MockBlockFromBytes); +export type MockStreamResponse = typeof MockStreamResponse.Type; diff --git a/packages/starknet/src/block.ts b/packages/starknet/src/block.ts index ac2ddcc..c35b1a3 100644 --- a/packages/starknet/src/block.ts +++ b/packages/starknet/src/block.ts @@ -473,13 +473,19 @@ export type Block = typeof Block.Type; export const BlockFromBytes = Schema.transform( Schema.Uint8ArrayFromSelf, - Block, + Schema.NullOr(Block), { strict: false, decode(value) { + if (value.length === 0) { + return null; + } return proto.data.Block.decode(value); }, encode(value) { + if (value === null) { + return new Uint8Array(); + } return proto.data.Block.encode(value).finish(); }, }, diff --git a/packages/starknet/src/filter.test.ts b/packages/starknet/src/filter.test.ts index daf06ef..c365766 100644 --- a/packages/starknet/src/filter.test.ts +++ b/packages/starknet/src/filter.test.ts @@ -1,7 +1,13 @@ import { Schema } from "@effect/schema"; import { describe, expect, it } from "vitest"; -import { EventFilter, HeaderFilter, Key, TransactionFilter } from "./filter"; +import { + EventFilter, + HeaderFilter, + Key, + TransactionFilter, + mergeFilter, +} from "./filter"; describe("HeaderFilter", () => { const encode = Schema.encodeSync(HeaderFilter); @@ -461,3 +467,118 @@ describe("TransactionFilter", () => { `); }); }); + +describe("mergeFilter", () => { + it("returns header.always if any has it", () => { + const fa = mergeFilter({}, { header: { always: true } }); + expect(fa).toMatchInlineSnapshot(` + { + "events": [], + "header": { + "always": true, + }, + "messages": [], + "transactions": [], + } + `); + const fb = mergeFilter({ header: { always: true } }, {}); + expect(fb).toMatchInlineSnapshot(` + { + "events": [], + "header": { + "always": true, + }, + "messages": [], + "transactions": [], + } + `); + }); + + it("returns an empty header by default", () => { + const f = mergeFilter({}, {}); + expect(f).toMatchInlineSnapshot(` + { + "events": [], + "header": undefined, + "messages": [], + "transactions": [], + } + `); + }); + + it("concatenates transactions", () => { + const f = mergeFilter( + { + transactions: [{ transactionType: { _tag: "invokeV0", invokeV0: {} } }], + }, + { + transactions: [{ transactionType: { _tag: "invokeV3", invokeV3: {} } }], + }, + ); + expect(f).toMatchInlineSnapshot(` + { + "events": [], + "header": undefined, + "messages": [], + "transactions": [ + { + "transactionType": { + "_tag": "invokeV0", + "invokeV0": {}, + }, + }, + { + "transactionType": { + "_tag": "invokeV3", + "invokeV3": {}, + }, + }, + ], + } + `); + }); + + it("concatenates events", () => { + const f = mergeFilter( + { events: [{ fromAddress: "0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" }] }, + { events: [{ fromAddress: "0xBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB" }] }, + ); + expect(f).toMatchInlineSnapshot(` + { + "events": [ + { + "fromAddress": "0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", + }, + { + "fromAddress": "0xBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB", + }, + ], + "header": undefined, + "messages": [], + "transactions": [], + } + `); + }); + + it("concatenates messages", () => { + const f = mergeFilter( + { messages: [{ fromAddress: "0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" }] }, + { messages: [{ fromAddress: "0xBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB" }] }, + ); + expect(f).toMatchInlineSnapshot(` + { + "events": [], + "header": undefined, + "messages": [ + { + "fromAddress": "0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", + }, + { + "fromAddress": "0xBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB", + }, + ], + "transactions": [], + } + `); + }); +}); diff --git a/packages/starknet/src/filter.ts b/packages/starknet/src/filter.ts index 43bbbdb..0fafed4 100644 --- a/packages/starknet/src/filter.ts +++ b/packages/starknet/src/filter.ts @@ -197,3 +197,28 @@ export const FilterFromBytes = Schema.transform( export const filterToBytes = Schema.encodeSync(FilterFromBytes); export const filterFromBytes = Schema.decodeSync(FilterFromBytes); + +export function mergeFilter(a: Filter, b: Filter): Filter { + const header = mergeHeaderFilter(a.header, b.header); + return { + header, + transactions: [...(a.transactions ?? []), ...(b.transactions ?? [])], + events: [...(a.events ?? []), ...(b.events ?? [])], + messages: [...(a.messages ?? []), ...(b.messages ?? [])], + }; +} + +function mergeHeaderFilter( + a?: HeaderFilter, + b?: HeaderFilter, +): HeaderFilter | undefined { + if (a === undefined) { + return b; + } + if (b === undefined) { + return a; + } + return { + always: a.always || b.always, + }; +} diff --git a/packages/starknet/src/index.ts b/packages/starknet/src/index.ts index 0227c45..2d74796 100644 --- a/packages/starknet/src/index.ts +++ b/packages/starknet/src/index.ts @@ -1,6 +1,6 @@ import { StreamConfig } from "@apibara/protocol"; import { BlockFromBytes } from "./block"; -import { FilterFromBytes } from "./filter"; +import { FilterFromBytes, mergeFilter } from "./filter"; export * as proto from "./proto"; @@ -8,4 +8,8 @@ export * from "./common"; export * from "./filter"; export * from "./block"; -export const StarknetStream = new StreamConfig(FilterFromBytes, BlockFromBytes); +export const StarknetStream = new StreamConfig( + FilterFromBytes, + BlockFromBytes, + mergeFilter, +);