From 242cbb41f98a9bb0495933319366ae6f976f7faf Mon Sep 17 00:00:00 2001 From: Francesco Ceccon Date: Sat, 21 Dec 2024 12:48:48 +0100 Subject: [PATCH] plugin-drizzle: move drizzle sink to placeholder package --- examples/indexer/package.json | 35 --- examples/indexer/src/indexer.ts | 65 ----- examples/indexer/src/instrumentation.ts | 43 ---- examples/indexer/src/main.ts | 40 --- packages/cli/src/runtime/internal/app.ts | 6 +- packages/cli/src/types/config.ts | 2 +- packages/indexer/package.json | 10 - packages/indexer/src/plugins/index.ts | 7 +- .../indexer/src/plugins/persistence.test.ts | 147 ----------- packages/indexer/src/plugins/persistence.ts | 200 --------------- packages/indexer/src/sinks/csv.test.ts | 63 ----- packages/indexer/src/sinks/csv.ts | 166 ------------ .../indexer/src/sinks/drizzle/Int8Range.ts | 52 ---- packages/indexer/src/sinks/drizzle/delete.ts | 44 ---- .../indexer/src/sinks/drizzle/drizzle.test.ts | 238 ------------------ packages/indexer/src/sinks/drizzle/drizzle.ts | 133 ---------- packages/indexer/src/sinks/drizzle/index.ts | 6 - packages/indexer/src/sinks/drizzle/insert.ts | 42 ---- packages/indexer/src/sinks/drizzle/select.ts | 44 ---- .../indexer/src/sinks/drizzle/transaction.ts | 49 ---- packages/indexer/src/sinks/drizzle/update.ts | 70 ------ packages/indexer/src/sinks/drizzle/utils.ts | 103 -------- packages/indexer/src/sinks/sqlite.test.ts | 97 ------- packages/indexer/src/sinks/sqlite.ts | 182 -------------- packages/plugin-drizzle/README.md | 7 + packages/plugin-drizzle/build.config.ts | 11 + packages/plugin-drizzle/package.json | 41 +++ packages/plugin-drizzle/src/index.ts | 5 + .../src/persistence.ts} | 0 packages/plugin-drizzle/src/utils.ts | 6 + .../plugin-drizzle}/tsconfig.json | 0 pnpm-lock.yaml | 82 +++--- pnpm-workspace.yaml | 1 + 33 files changed, 127 insertions(+), 1870 deletions(-) delete mode 100644 examples/indexer/package.json delete mode 100644 examples/indexer/src/indexer.ts delete mode 100644 examples/indexer/src/instrumentation.ts delete mode 100644 examples/indexer/src/main.ts delete mode 100644 packages/indexer/src/plugins/persistence.test.ts delete mode 100644 packages/indexer/src/sinks/csv.test.ts delete mode 100644 packages/indexer/src/sinks/csv.ts delete mode 100644 packages/indexer/src/sinks/drizzle/Int8Range.ts delete mode 100644 packages/indexer/src/sinks/drizzle/delete.ts delete mode 100644 packages/indexer/src/sinks/drizzle/drizzle.test.ts delete mode 100644 packages/indexer/src/sinks/drizzle/drizzle.ts delete mode 100644 packages/indexer/src/sinks/drizzle/index.ts delete mode 100644 packages/indexer/src/sinks/drizzle/insert.ts delete mode 100644 packages/indexer/src/sinks/drizzle/select.ts delete mode 100644 packages/indexer/src/sinks/drizzle/transaction.ts delete mode 100644 packages/indexer/src/sinks/drizzle/update.ts delete mode 100644 packages/indexer/src/sinks/drizzle/utils.ts delete mode 100644 packages/indexer/src/sinks/sqlite.test.ts delete mode 100644 packages/indexer/src/sinks/sqlite.ts create mode 100644 packages/plugin-drizzle/README.md create mode 100644 packages/plugin-drizzle/build.config.ts create mode 100644 packages/plugin-drizzle/package.json create mode 100644 packages/plugin-drizzle/src/index.ts rename packages/{indexer/src/plugins/drizzle-persistence.ts => plugin-drizzle/src/persistence.ts} (100%) create mode 100644 packages/plugin-drizzle/src/utils.ts rename {examples/indexer => packages/plugin-drizzle}/tsconfig.json (100%) diff --git a/examples/indexer/package.json b/examples/indexer/package.json deleted file mode 100644 index 792a934..0000000 --- a/examples/indexer/package.json +++ /dev/null @@ -1,35 +0,0 @@ -{ - "name": "example-indexer", - "version": "1.0.0", - "private": true, - "scripts": { - "start": "jiti ./src/main.ts", - "typecheck": "tsc --noEmit", - "lint": "biome check .", - "lint:fix": "pnpm lint --write", - "format": "biome format . --write" - }, - "dependencies": { - "@apibara/evm": "workspace:*", - "@apibara/indexer": "workspace:*", - "@apibara/protocol": "workspace:*", - "@opentelemetry/api": "^1.9.0", - "@opentelemetry/exporter-trace-otlp-proto": "^0.52.0", - "@opentelemetry/resources": "^1.25.0", - "@opentelemetry/sdk-node": "^0.52.0", - "@opentelemetry/sdk-trace-base": "^1.25.0", - "@opentelemetry/semantic-conventions": "^1.25.0", - "better-sqlite3": "^11.5.0", - "citty": "^0.1.6", - "consola": "^3.2.3", - "csv-stringify": "^6.5.0", - "drizzle-orm": "^0.37.0", - "postgres": "^3.4.4", - "viem": "^2.12.4" - }, - "devDependencies": { - "@types/better-sqlite3": "^7.6.11", - "@types/node": "^20.12.12", - "jiti": "^1.21.0" - } -} diff --git a/examples/indexer/src/indexer.ts b/examples/indexer/src/indexer.ts deleted file mode 100644 index 1ac23b4..0000000 --- a/examples/indexer/src/indexer.ts +++ /dev/null @@ -1,65 +0,0 @@ -import { EvmStream } from "@apibara/evm"; -import { defineIndexer, useSink } from "@apibara/indexer"; -import { drizzleSink } from "@apibara/indexer/sinks/drizzle"; -import consola from "consola"; -import { pgTable, serial, text, varchar } from "drizzle-orm/pg-core"; -import { drizzle } from "drizzle-orm/postgres-js"; -import postgres from "postgres"; -import { encodeEventTopics, parseAbi } from "viem"; - -const abi = parseAbi([ - "event Transfer(address indexed from, address indexed to, uint256 value)", -]); - -const users = pgTable("users", { - id: serial("id").primaryKey(), - firstName: text("full_name"), - phone: varchar("phone", { length: 256 }), -}); - -export function createIndexerConfig(streamUrl: string) { - const pgClient = postgres("your_connection_string"); - const db = drizzle(pgClient); - - const sink = drizzleSink({ database: db, tables: [users] }); - - return defineIndexer(EvmStream)({ - streamUrl, - finality: "accepted", - startingCursor: { - orderKey: 5_000_000n, - }, - filter: { - logs: [ - { - strict: true, - topics: encodeEventTopics({ - abi, - eventName: "Transfer", - args: { from: null, to: null }, - }) as `0x${string}`[], - }, - ], - }, - sink, - async transform({ block: { header }, context }) { - const { db } = useSink({ context }); - - await db.insert(users).values([ - { - id: Number(header?.blockNumber), - firstName: `John Doe ${Number(header?.blockNumber)}`, - phone: "+91 1234567890", - }, - ]); - - consola.info("Transforming block", header?.blockNumber); - }, - hooks: { - "handler:after": ({ endCursor }) => { - consola.info("Transformed ", endCursor?.orderKey); - }, - }, - plugins: [], - }); -} diff --git a/examples/indexer/src/instrumentation.ts b/examples/indexer/src/instrumentation.ts deleted file mode 100644 index feeb160..0000000 --- a/examples/indexer/src/instrumentation.ts +++ /dev/null @@ -1,43 +0,0 @@ -import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-proto"; -import { Resource } from "@opentelemetry/resources"; -import * as opentelemetry from "@opentelemetry/sdk-node"; -import { - BatchSpanProcessor, - type SpanProcessor, -} from "@opentelemetry/sdk-trace-base"; -import { SEMRESATTRS_SERVICE_NAME } from "@opentelemetry/semantic-conventions"; -import consola from "consola"; - -let spanProcessor: SpanProcessor | undefined; - -const axiomApiKey = process.env.AXIOM_API_KEY; -const axiomDataset = process.env.AXIOM_DATASET ?? "evm-indexer-demo"; -if (axiomApiKey) { - const exporter = new OTLPTraceExporter({ - url: "https://api.axiom.co/v1/traces", - headers: { - Authorization: `Bearer ${axiomApiKey}`, - "X-Axiom-Dataset": axiomDataset, - }, - }); - spanProcessor = new BatchSpanProcessor(exporter); - consola.info("Sending traces to Axiom", axiomDataset); -} - -const resource = new Resource({ - [SEMRESATTRS_SERVICE_NAME]: "evm-indexer", -}); - -const sdk = new opentelemetry.NodeSDK( - spanProcessor - ? { - spanProcessor, - resource, - } - : { - resource, - }, -); - -consola.info("Starting OpenTelemetry SDK"); -sdk.start(); diff --git a/examples/indexer/src/main.ts b/examples/indexer/src/main.ts deleted file mode 100644 index e931224..0000000 --- a/examples/indexer/src/main.ts +++ /dev/null @@ -1,40 +0,0 @@ -import { createIndexer, run } from "@apibara/indexer"; -import { createClient } from "@apibara/protocol"; -import { defineCommand, runMain } from "citty"; -import consola from "consola"; - -import { createIndexerConfig } from "./indexer"; - -import "./instrumentation"; - -const command = defineCommand({ - meta: { - name: "example-indexer", - version: "1.0.0", - description: "Example showing how to run an indexer", - }, - args: { - stream: { - type: "string", - default: "https://sepolia.ethereum.a5a.ch", - description: "EVM stream URL", - }, - authToken: { - type: "string", - description: "DNA auth token", - }, - }, - async run({ args }) { - consola.info("Connecting to EVM stream", args.stream); - - const indexer = createIndexer(createIndexerConfig(args.stream)); - const client = createClient( - indexer.streamConfig, - indexer.options.streamUrl, - ); - - await run(client, indexer); - }, -}); - -runMain(command); diff --git a/packages/cli/src/runtime/internal/app.ts b/packages/cli/src/runtime/internal/app.ts index fa5ff16..bed70a0 100644 --- a/packages/cli/src/runtime/internal/app.ts +++ b/packages/cli/src/runtime/internal/app.ts @@ -1,9 +1,5 @@ import { createIndexer as _createIndexer } from "@apibara/indexer"; -import { - type ConsolaReporter, - inMemoryPersistence, - logger, -} from "@apibara/indexer/plugins"; +import { type ConsolaReporter, logger, inMemoryPersistence } from "@apibara/indexer/plugins"; import { config } from "#apibara-internal-virtual/config"; import { indexers } from "#apibara-internal-virtual/indexers"; diff --git a/packages/cli/src/types/config.ts b/packages/cli/src/types/config.ts index 5fa138e..66132cf 100644 --- a/packages/cli/src/types/config.ts +++ b/packages/cli/src/types/config.ts @@ -1,4 +1,4 @@ -import type { ConsolaReporter } from "@apibara/indexer/plugins/logger"; +import type { ConsolaReporter } from "@apibara/indexer/plugins"; import type { RollupCommonJSOptions } from "@rollup/plugin-commonjs"; import type { C12InputConfig, diff --git a/packages/indexer/package.json b/packages/indexer/package.json index f5b3ff8..ba8a094 100644 --- a/packages/indexer/package.json +++ b/packages/indexer/package.json @@ -54,11 +54,6 @@ "@types/better-sqlite3": "^7.6.11", "@types/node": "^20.14.0", "@types/pg": "^8.11.10", - "better-sqlite3": "^11.5.0", - "csv-stringify": "^6.5.0", - "drizzle-orm": "^0.37.0", - "pg": "^8.13.1", - "postgres-range": "^1.1.4", "unbuild": "^2.0.0", "vitest": "^1.6.0" }, @@ -73,11 +68,6 @@ "unctx": "^2.3.1" }, "peerDependencies": { - "@electric-sql/pglite": "^0.2.14", - "better-sqlite3": "^11.5.0", - "csv-stringify": "^6.5.0", - "drizzle-orm": "^0.37.0", - "postgres-range": "^1.1.4", "vitest": "^1.6.0" } } diff --git a/packages/indexer/src/plugins/index.ts b/packages/indexer/src/plugins/index.ts index 4916406..934fd51 100644 --- a/packages/indexer/src/plugins/index.ts +++ b/packages/indexer/src/plugins/index.ts @@ -1,8 +1,3 @@ export { defineIndexerPlugin, type IndexerPlugin } from "./config"; -export { - type ConsolaInstance, - type ConsolaReporter, - logger, - useLogger, -} from "./logger"; +export { type ConsolaInstance, type ConsolaReporter, logger, useLogger } from "./logger"; export { inMemoryPersistence } from "./persistence"; diff --git a/packages/indexer/src/plugins/persistence.test.ts b/packages/indexer/src/plugins/persistence.test.ts deleted file mode 100644 index 1b3672d..0000000 --- a/packages/indexer/src/plugins/persistence.test.ts +++ /dev/null @@ -1,147 +0,0 @@ -/* -import type { Cursor } from "@apibara/protocol"; -import { - type MockBlock, - MockClient, - type MockFilter, -} from "@apibara/protocol/testing"; -import Database from "better-sqlite3"; -import { klona } from "klona/full"; -import { describe, expect, it } from "vitest"; -import { run } from "../indexer"; -import { generateMockMessages, getMockIndexer } from "../internal/testing"; - -describe("Persistence", () => { - const initDB = () => { - const db = new Database(":memory:"); - SqlitePersistence.initialize(db); - return db; - }; - - it("should handle storing and updating a cursor & filter", () => { - const db = initDB(); - const store = new SqlitePersistence(db); - - // Assert there's no data - let latest = store.get(); - - expect(latest.cursor).toBeUndefined(); - expect(latest.filter).toBeUndefined(); - - // Insert value - const cursor: Cursor = { - orderKey: 5_000_000n, - }; - const filter: MockFilter = { - filter: "X", - }; - store.put({ cursor, filter }); - - // Check that value was stored - latest = store.get(); - - expect(latest.cursor).toEqual({ - orderKey: 5_000_000n, - uniqueKey: null, - }); - expect(latest.filter).toEqual({ - filter: "X", - }); - - // Update value - const updatedCursor: Cursor = { - orderKey: 5_000_010n, - uniqueKey: "0x1234567890", - }; - const updatedFilter: MockFilter = { - filter: "Y", - }; - - store.put({ cursor: updatedCursor, filter: updatedFilter }); - - // Check that value was updated - latest = store.get(); - - expect(latest.cursor).toEqual({ - orderKey: 5_000_010n, - uniqueKey: "0x1234567890", - }); - expect(latest.filter).toEqual({ - filter: "Y", - }); - - db.close(); - }); - - it("should handle storing and deleting a cursor & filter", () => { - const db = initDB(); - const store = new SqlitePersistence(db); - - // Assert there's no data - let latest = store.get(); - expect(latest.cursor).toBeUndefined(); - expect(latest.filter).toBeUndefined(); - - // Insert value - const cursor: Cursor = { - orderKey: 5_000_000n, - }; - const filter: MockFilter = { - filter: "X", - }; - store.put({ cursor, filter }); - - // Check that value was stored - latest = store.get(); - expect(latest.cursor).toEqual({ - orderKey: 5_000_000n, - uniqueKey: null, - }); - expect(latest.filter).toEqual({ - filter: "X", - }); - - // Delete value - store.del(); - - // Check there's no data - latest = store.get(); - expect(latest.cursor).toBeUndefined(); - expect(latest.filter).toBeUndefined(); - - db.close(); - }); - - it("should work with indexer and store cursor of last message", async () => { - const client = new MockClient((request, options) => { - return messages; - }); - - const db = new Database(":memory:"); - - // create mock indexer with persistence plugin - const indexer = klona( - getMockIndexer({ - plugins: [ - sqlitePersistence({ - database: db, - }), - ], - }), - ); - - await run(client, indexer); - - const store = new SqlitePersistence(db); - - const latest = store.get(); - - expect(latest.cursor).toMatchInlineSnapshot(`undefined`); - - db.close(); - }); -}); - -const messages = generateMockMessages(); - -*/ diff --git a/packages/indexer/src/plugins/persistence.ts b/packages/indexer/src/plugins/persistence.ts index 70aadf4..a3d188b 100644 --- a/packages/indexer/src/plugins/persistence.ts +++ b/packages/indexer/src/plugins/persistence.ts @@ -37,203 +37,3 @@ export function inMemoryPersistence() { }); }); } - -/* -export function sqlitePersistence({ - database, -}: { database: SqliteDatabase }) { - return defineIndexerPlugin((indexer) => { - let store: SqlitePersistence; - - indexer.hooks.hook("run:before", () => { - SqlitePersistence.initialize(database); - - store = new SqlitePersistence(database); - }); - - indexer.hooks.hook("connect:before", ({ request }) => { - const { cursor, filter } = store.get(); - - if (cursor) { - request.startingCursor = cursor; - } - - if (filter) { - request.filter[1] = filter; - } - }); - - indexer.hooks.hook("transaction:commit", ({ endCursor }) => { - if (endCursor) { - store.put({ cursor: endCursor }); - } - }); - - indexer.hooks.hook("connect:factory", ({ request, endCursor }) => { - if (request.filter[1]) { - store.put({ cursor: endCursor, filter: request.filter[1] }); - } - }); - }); -} - -export class SqlitePersistence { - private _getCheckpointQuery: Statement; - private _putCheckpointQuery: Statement< - [string, number, `0x${string}` | undefined] - >; - private _delCheckpointQuery: Statement; - private _getFilterQuery: Statement; - private _updateFilterToBlockQuery: Statement<[number, string]>; - private _insertFilterQuery: Statement<[string, string, number]>; - private _delFilterQuery: Statement; - - constructor(private _db: SqliteDatabase) { - this._getCheckpointQuery = this._db.prepare(statements.getCheckpoint); - this._putCheckpointQuery = this._db.prepare(statements.putCheckpoint); - this._delCheckpointQuery = this._db.prepare(statements.delCheckpoint); - this._getFilterQuery = this._db.prepare(statements.getFilter); - this._updateFilterToBlockQuery = this._db.prepare( - statements.updateFilterToBlock, - ); - this._insertFilterQuery = this._db.prepare(statements.insertFilter); - this._delFilterQuery = this._db.prepare(statements.delFilter); - } - - static initialize(db: SqliteDatabase) { - db.prepare(statements.createCheckpointsTable).run(); - db.prepare(statements.createFiltersTable).run(); - } - - public get(): { cursor?: Cursor; filter?: TFilter } { - const cursor = this._getCheckpoint(); - const filter = this._getFilter(); - - return { cursor, filter }; - } - - public put({ cursor, filter }: { cursor?: Cursor; filter?: TFilter }) { - if (cursor) { - this._putCheckpoint(cursor); - - if (filter) { - this._putFilter(filter, cursor); - } - } - } - - public del() { - this._delCheckpoint(); - this._delFilter(); - } - - // --- CHECKPOINTS TABLE METHODS --- - - private _getCheckpoint(): Cursor | undefined { - const row = this._getCheckpointQuery.get("default"); - - if (!row) return undefined; - - return { orderKey: BigInt(row.order_key), uniqueKey: row.unique_key }; - } - - private _putCheckpoint(cursor: Cursor) { - this._putCheckpointQuery.run( - "default", - Number(cursor.orderKey), - cursor.uniqueKey, - ); - } - - private _delCheckpoint() { - this._delCheckpointQuery.run("default"); - } - - // --- FILTERS TABLE METHODS --- - - private _getFilter(): TFilter | undefined { - const row = this._getFilterQuery.get("default"); - - if (!row) return undefined; - - return deserialize(row.filter) as TFilter; - } - - private _putFilter(filter: TFilter, endCursor: Cursor) { - this._updateFilterToBlockQuery.run(Number(endCursor.orderKey), "default"); - this._insertFilterQuery.run( - "default", - serialize(filter as Record), - Number(endCursor.orderKey), - ); - } - - private _delFilter() { - this._delFilterQuery.run("default"); - } -} - -const statements = { - beginTxn: "BEGIN TRANSACTION", - commitTxn: "COMMIT TRANSACTION", - rollbackTxn: "ROLLBACK TRANSACTION", - createCheckpointsTable: ` - CREATE TABLE IF NOT EXISTS checkpoints ( - id TEXT NOT NULL PRIMARY KEY, - order_key INTEGER NOT NULL, - unique_key TEXT - );`, - createFiltersTable: ` - CREATE TABLE IF NOT EXISTS filters ( - id TEXT NOT NULL, - filter BLOB NOT NULL, - from_block INTEGER NOT NULL, - to_block INTEGER, - PRIMARY KEY (id, from_block) - );`, - getCheckpoint: ` - SELECT * - FROM checkpoints - WHERE id = ?`, - putCheckpoint: ` - INSERT INTO checkpoints (id, order_key, unique_key) - VALUES (?, ?, ?) - ON CONFLICT(id) DO UPDATE SET - order_key = excluded.order_key, - unique_key = excluded.unique_key`, - delCheckpoint: ` - DELETE FROM checkpoints - WHERE id = ?`, - getFilter: ` - SELECT * - FROM filters - WHERE id = ? AND to_block IS NULL`, - updateFilterToBlock: ` - UPDATE filters - SET to_block = ? - WHERE id = ? AND to_block IS NULL`, - insertFilter: ` - INSERT INTO filters (id, filter, from_block) - VALUES (?, ?, ?) - ON CONFLICT(id, from_block) DO UPDATE SET - filter = excluded.filter, - from_block = excluded.from_block`, - delFilter: ` - DELETE FROM filters - WHERE id = ?`, -}; - -export type CheckpointRow = { - id: string; - order_key: number; - unique_key?: `0x${string}`; -}; - -export type FilterRow = { - id: string; - filter: string; - from_block: number; - to_block?: number; -}; - -*/ diff --git a/packages/indexer/src/sinks/csv.test.ts b/packages/indexer/src/sinks/csv.test.ts deleted file mode 100644 index 8ac222f..0000000 --- a/packages/indexer/src/sinks/csv.test.ts +++ /dev/null @@ -1,63 +0,0 @@ -import fs from "node:fs/promises"; -import { - type MockBlock, - MockClient, - type MockFilter, -} from "@apibara/protocol/testing"; -import { afterEach, beforeEach, describe, expect, it } from "vitest"; -import { run } from "../indexer"; -import { generateMockMessages, getMockIndexer } from "../internal/testing"; -import { useSink } from "../sink"; -import { csv } from "./csv"; - -describe("Run Test", () => { - async function cleanup() { - try { - await fs.unlink("test.csv"); - } catch {} - } - - beforeEach(async () => { - await cleanup(); - }); - - afterEach(async () => { - await cleanup(); - }); - - it("should store in csv file via csv sink", async () => { - const client = new MockClient((request, options) => { - return generateMockMessages(); - }); - - const sink = csv({ filepath: "test.csv" }); - await run( - client, - getMockIndexer({ - sink, - override: { - transform: async ({ context, endCursor, block: { data } }) => { - const { writer } = useSink({ context }); - writer.insert([{ data }]); - }, - }, - }), - ); - - const csvData = await fs.readFile("test.csv", "utf-8"); - - expect(csvData).toMatchInlineSnapshot(` - "5000000,5000000 - 5000001,5000001 - 5000002,5000002 - 5000003,5000003 - 5000004,5000004 - 5000005,5000005 - 5000006,5000006 - 5000007,5000007 - 5000008,5000008 - 5000009,5000009 - " - `); - }); -}); diff --git a/packages/indexer/src/sinks/csv.ts b/packages/indexer/src/sinks/csv.ts deleted file mode 100644 index df64a7d..0000000 --- a/packages/indexer/src/sinks/csv.ts +++ /dev/null @@ -1,166 +0,0 @@ -import fs from "node:fs"; -import type { Cursor } from "@apibara/protocol"; -import { type Options, type Stringifier, stringify } from "csv-stringify"; -import { Sink, type SinkCursorParams, type SinkData } from "../sink"; - -export type CsvArgs = { - /** - * csv-stringy options - * @reference https://csv.js.org/stringify/options/ - */ - csvOptions?: Options; - /** - * filepath for your csv file - */ - filepath: string; -}; - -export type CsvSinkOptions = { - /** - * An optional column name used to store the cursor value. If specified, - * the value of this column must match the `endCursor.orderKey` for each row. - */ - cursorColumn?: string; -}; - -type TxnContext = { - buffer: SinkData[]; -}; - -type TxnParams = { - writer: { - insert: (data: SinkData[]) => void; - }; -}; - -const transactionHelper = (context: TxnContext) => { - return { - insert: (data: SinkData[]) => { - context.buffer.push(...data); - }, - }; -}; - -/** - * A sink that writes data to a CSV file. - * - * @example - * - * ```ts - * const sink = csv({ - * filepath: "./data.csv", - * csvOptions: { - * header: true, - * }, - * }); - * - * ... - * async transform({context, endCursor}){ - * const { writer } = useSink(context); - * const insertHelper = writer(endCursor); - * - * insertHelper.insert([ - * { id: 1, name: "John" }, - * { id: 2, name: "Jane" }, - * ]); - * } - * - * ``` - */ -export class CsvSink extends Sink { - constructor( - private _stringifier: Stringifier, - private _config: CsvSinkOptions, - ) { - super(); - } - - private async write({ - data, - endCursor, - }: { data: SinkData[]; endCursor?: Cursor }) { - // adds a "_cursor" property if "cursorColumn" is not specified by user - data = this.processCursorColumn(data, endCursor); - // Insert the data into csv - await this.insertToCSV(data); - } - - async transaction( - { cursor, endCursor, finality }: SinkCursorParams, - cb: (params: TxnParams) => Promise, - ) { - const context: TxnContext = { - buffer: [], - }; - - const writer = transactionHelper(context); - - await cb({ writer }); - await this.write({ data: context.buffer, endCursor }); - } - - async invalidateOnRestart(cursor?: Cursor) { - // No Implementation required - } - - async invalidate(cursor?: Cursor) { - throw new Error("Reorg for CSV Sink is not implemented"); - } - - async finalize(cursor?: Cursor) { - // No Implementation required - } - - private async insertToCSV(data: SinkData[]) { - if (data.length === 0) return; - - return await new Promise((resolve, reject) => { - for (const row of data) { - this._stringifier.write(row, (err) => { - if (err) throw new Error(err.message); - - // resolve when all rows are inserted into csv - if (row === data[data.length - 1]) { - resolve(); - } - }); - } - }); - } - - private processCursorColumn( - data: SinkData[], - endCursor?: Cursor, - ): SinkData[] { - const { cursorColumn } = this._config; - - if ( - cursorColumn && - data.some( - (row) => Number(row[cursorColumn]) !== Number(endCursor?.orderKey), - ) - ) { - throw new Error( - `Mismatch of ${cursorColumn} and Cursor ${Number(endCursor?.orderKey)}`, - ); - } - - if (cursorColumn) { - return data; - } - - return data.map((row) => ({ - ...row, - _cursor: Number(endCursor?.orderKey), - })); - } -} - -export const csv = (args: CsvArgs & CsvSinkOptions) => { - const { csvOptions, filepath, ...sinkOptions } = args; - const stringifier = stringify({ ...csvOptions }); - - const writeStream = fs.createWriteStream(filepath, { flags: "a" }); - stringifier.pipe(writeStream); - return new CsvSink(stringifier, sinkOptions); -}; diff --git a/packages/indexer/src/sinks/drizzle/Int8Range.ts b/packages/indexer/src/sinks/drizzle/Int8Range.ts deleted file mode 100644 index 620a081..0000000 --- a/packages/indexer/src/sinks/drizzle/Int8Range.ts +++ /dev/null @@ -1,52 +0,0 @@ -import { customType } from "drizzle-orm/pg-core"; -import type { Range } from "postgres-range"; -import { - parse as rangeParse, - serialize as rangeSerialize, -} from "postgres-range"; - -type Comparable = string | number; - -type RangeBound = - | T - | { - value: T; - inclusive: boolean; - }; - -export class Int8Range { - constructor(public readonly range: Range) {} - - get start(): RangeBound | null { - return this.range.lower != null - ? { - value: this.range.lower, - inclusive: this.range.isLowerBoundClosed(), - } - : null; - } - - get end(): RangeBound | null { - return this.range.upper != null - ? { - value: this.range.upper, - inclusive: this.range.isUpperBoundClosed(), - } - : null; - } -} - -export const int8range = customType<{ - data: Int8Range; -}>({ - dataType: () => "int8range", - fromDriver: (value: unknown): Int8Range => { - if (typeof value !== "string") { - throw new Error("Expected string"); - } - - const parsed = rangeParse(value, (val) => Number.parseInt(val, 10)); - return new Int8Range(parsed); - }, - toDriver: (value: Int8Range): string => rangeSerialize(value.range), -}); diff --git a/packages/indexer/src/sinks/drizzle/delete.ts b/packages/indexer/src/sinks/drizzle/delete.ts deleted file mode 100644 index 48f7c81..0000000 --- a/packages/indexer/src/sinks/drizzle/delete.ts +++ /dev/null @@ -1,44 +0,0 @@ -import type { Cursor } from "@apibara/protocol"; -import { - type ExtractTablesWithRelations, - type SQL, - type TablesRelationalConfig, - sql, -} from "drizzle-orm"; -import type { - PgQueryResultHKT, - PgTable, - PgTransaction, - PgUpdateBase, - PgUpdateWithout, -} from "drizzle-orm/pg-core"; - -export class DrizzleSinkDelete< - TTable extends PgTable, - TQueryResult extends PgQueryResultHKT, - TFullSchema extends Record = Record, - TSchema extends - TablesRelationalConfig = ExtractTablesWithRelations, -> { - constructor( - private db: PgTransaction, - private table: TTable, - private endCursor?: Cursor, - ) {} - - where( - where: SQL, - ): PgUpdateWithout, false, "where"> { - return this.db - .update(this.table) - .set({ - // @ts-ignore - _cursor: sql`int8range(lower(_cursor), ${Number(this.endCursor?.orderKey!)}, '[)')`, - }) - .where(where) as PgUpdateWithout< - PgUpdateBase, - false, - "where" - >; - } -} diff --git a/packages/indexer/src/sinks/drizzle/drizzle.test.ts b/packages/indexer/src/sinks/drizzle/drizzle.test.ts deleted file mode 100644 index f2c2ff6..0000000 --- a/packages/indexer/src/sinks/drizzle/drizzle.test.ts +++ /dev/null @@ -1,238 +0,0 @@ -import type { Cursor } from "@apibara/protocol"; -import { - type MockBlock, - MockClient, - type MockFilter, -} from "@apibara/protocol/testing"; -import { asc, eq, sql } from "drizzle-orm"; -import { drizzle } from "drizzle-orm/node-postgres"; -import { serial, text } from "drizzle-orm/pg-core"; -import { Client } from "pg"; -import { beforeAll, beforeEach, describe, expect, it } from "vitest"; -import { run } from "../../indexer"; -import { generateMockMessages, getMockIndexer } from "../../internal/testing"; -import { useSink } from "../../sink"; -import type { Int8Range } from "./Int8Range"; -import { drizzleSink } from "./drizzle"; -import { getDrizzleCursor, pgIndexerTable } from "./utils"; - -const testTable = pgIndexerTable("test_table", { - id: serial("id").primaryKey(), - data: text("data"), -}); - -const client = new Client({ - connectionString: "postgres://postgres:postgres@localhost:5432/postgres", -}); - -await client.connect(); - -const db = drizzle(client); - -describe("Drizzle Test", () => { - beforeAll(async () => { - // drop test_table if exists - await db.execute(sql`DROP TABLE IF EXISTS test_table`); - // create test_table with db - await db.execute( - sql`CREATE TABLE test_table (id SERIAL, data TEXT, _cursor INT8RANGE)`, - ); - }); - - beforeEach(async () => { - await db.delete(testTable).execute(); - }); - - it("should insert data", async () => { - const client = new MockClient((request, options) => { - return generateMockMessages(5); - }); - - const sink = drizzleSink({ database: db, tables: [testTable] }); - - const indexer = getMockIndexer({ - sink, - override: { - transform: async ({ context, endCursor, block: { data } }) => { - const { db } = useSink({ context }); - // Insert a new row into the test_table - // The id is set to the current cursor's orderKey - // The data is set to the block data - await db - .insert(testTable) - .values([{ id: Number(endCursor?.orderKey), data }]); - }, - }, - }); - - await run(client, indexer); - - const result = await db.select().from(testTable).orderBy(asc(testTable.id)); - - expect(result).toHaveLength(5); - expect(result[0].data).toBe("5000000"); - expect(result[2].data).toBe("5000002"); - }); - - it("should update data", async () => { - const client = new MockClient((request, options) => { - return generateMockMessages(5); - }); - - const sink = drizzleSink({ database: db, tables: [testTable] }); - - const indexer = getMockIndexer({ - sink, - override: { - transform: async ({ context, endCursor, block: { data } }) => { - const { db } = useSink({ context }); - - // insert data for each message in db - await db - .insert(testTable) - .values([{ id: Number(endCursor?.orderKey), data }]); - - // update data for id 5000002 when orderKey is 5000004 - // this is to test if the update query is working - if (endCursor?.orderKey === 5000004n) { - await db - .update(testTable) - .set({ data: "0000000" }) - .where(eq(testTable.id, 5000002)); - } - }, - }, - }); - - await run(client, indexer); - - const result = await db.select().from(testTable).orderBy(asc(testTable.id)); - - expect(result).toHaveLength(6); - expect( - result.find((r) => r.id === 5000002 && r._cursor?.range.upper === null) - ?.data, - ).toBe("0000000"); - }); - - it("should soft delete data", async () => { - const client = new MockClient((request, options) => { - return generateMockMessages(5); - }); - - const sink = drizzleSink({ database: db, tables: [testTable] }); - - const indexer = getMockIndexer({ - sink, - override: { - transform: async ({ context, endCursor, block: { data } }) => { - const { db } = useSink({ context }); - - // insert data for each message in db - await db - .insert(testTable) - .values([{ id: Number(endCursor?.orderKey), data }]); - - // delete data for id 5000002 when orderKey is 5000004 - // this is to test if the delete query is working - if (endCursor?.orderKey === 5000004n) { - await db.delete(testTable).where(eq(testTable.id, 5000002)); - } - }, - }, - }); - - await run(client, indexer); - - const result = await db.select().from(testTable).orderBy(asc(testTable.id)); - - expect(result).toHaveLength(5); - - // as when you run delete query on a data, it isnt literally deleted from the db, - // instead, we just update the upper bound of that row to the current cursor - // check if the cursor upper bound has been set correctly - // biome-ignore lint/suspicious/noExplicitAny: - expect(((result[2] as any)._cursor as Int8Range).range.upper).toBe(5000004); - }); - - it("should select data", async () => { - const client = new MockClient((request, options) => { - return generateMockMessages(5); - }); - - const sink = drizzleSink({ database: db, tables: [testTable] }); - - let result: (typeof testTable.$inferSelect)[] = []; - - const indexer = getMockIndexer({ - sink, - override: { - transform: async ({ context, endCursor, block: { data } }) => { - const { db } = useSink({ context }); - - // insert data for each message in db - await db - .insert(testTable) - .values([{ id: Number(endCursor?.orderKey), data }]); - - // delete data for id 5000002 when orderKey is 5000004 - // this will update the upper bound of the row with id 5000002 from infinity to 5000004 - // so when we select all rows, row with id 5000002 will not be included - // as when we run select query it should only return rows with upper bound infinity - if (endCursor?.orderKey === 5000003n) { - await db.delete(testTable).where(eq(testTable.id, 5000002)); - } - - // when on last message of mock stream, select all rows from db - if (endCursor?.orderKey === 5000004n) { - result = await db - .select() - .from(testTable) - .orderBy(asc(testTable.id)); - } - }, - }, - }); - - await run(client, indexer); - - expect(result).toHaveLength(4); - expect(result.find((r) => r.id === 5000002)).toBeUndefined(); - // check if all rows are still in db - const allRows = await db.select().from(testTable); - expect(allRows).toHaveLength(5); - }); - - it("should invalidate data correctly", async () => { - const sink = drizzleSink({ database: db, tables: [testTable] }); - - // Insert some test data - await db.insert(testTable).values( - // @ts-ignore - [ - { id: 1, data: "data1", _cursor: getDrizzleCursor([1n, 5n]) }, - { id: 2, data: "data2", _cursor: getDrizzleCursor([2n, 5n]) }, - { id: 3, data: "data3", _cursor: getDrizzleCursor(3n) }, - { id: 4, data: "data4", _cursor: getDrizzleCursor(4n) }, - { id: 5, data: "data5", _cursor: getDrizzleCursor(5n) }, - ], - ); - - // Create a cursor at position 3 - const cursor: Cursor = { orderKey: 3n }; - - // Invalidate data - await sink.invalidate(cursor); - - // Check the results - const result = await db.select().from(testTable).orderBy(asc(testTable.id)); - - expect(result).toHaveLength(3); - // biome-ignore lint/suspicious/noExplicitAny: - expect(((result[0] as any)._cursor as Int8Range).range.upper).toBe(null); - // biome-ignore lint/suspicious/noExplicitAny: - expect(((result[1] as any)._cursor as Int8Range).range.upper).toBe(null); - // biome-ignore lint/suspicious/noExplicitAny: - expect(((result[2] as any)._cursor as Int8Range).range.upper).toBe(null); - }); -}); diff --git a/packages/indexer/src/sinks/drizzle/drizzle.ts b/packages/indexer/src/sinks/drizzle/drizzle.ts deleted file mode 100644 index 79cc8bf..0000000 --- a/packages/indexer/src/sinks/drizzle/drizzle.ts +++ /dev/null @@ -1,133 +0,0 @@ -import type { Cursor } from "@apibara/protocol"; -import { - type ExtractTablesWithRelations, - type TablesRelationalConfig, - gt, - sql, -} from "drizzle-orm"; -import type { - AnyPgTable, - PgDatabase, - PgQueryResultHKT, - PgTableWithColumns, - TableConfig, -} from "drizzle-orm/pg-core"; -import { Sink, type SinkCursorParams } from "../../sink"; -import { DrizzleSinkTransaction } from "./transaction"; - -export type DrizzleSinkTables< - TTableConfig extends Record, -> = { - [K in keyof TTableConfig]: PgTableWithColumns; -}; - -export type DrizzleSinkOptions< - TQueryResult extends PgQueryResultHKT, - TFullSchema extends Record = Record, - TSchema extends - TablesRelationalConfig = ExtractTablesWithRelations, -> = { - /** - * Database instance of drizzle-orm - */ - database: PgDatabase; - tables: AnyPgTable[]; -}; - -/** - * A sink that writes data to a PostgreSQL database using Drizzle ORM. - * - * @example - * - * ```ts - * const sink = drizzle({ - * database: db, - * }); - * - * ... - * async transform({context, endCursor}){ - * const { transaction } = useSink(context); - * const db = transaction(endCursor); - * - * db.insert(users).values([ - * { id: 1, name: "John" }, - * { id: 2, name: "Jane" }, - * ]); - * } - * - * ``` - */ -export class DrizzleSink< - TQueryResult extends PgQueryResultHKT, - TFullSchema extends Record = Record, - TSchema extends - TablesRelationalConfig = ExtractTablesWithRelations, -> extends Sink { - private _db: PgDatabase; - private _tables: AnyPgTable[]; - constructor(options: DrizzleSinkOptions) { - super(); - const { database, tables } = options; - this._db = database; - this._tables = tables; - } - - async transaction( - { cursor, endCursor, finality }: SinkCursorParams, - cb: (params: { - db: DrizzleSinkTransaction; - }) => Promise, - ): Promise { - await this._db.transaction(async (db) => { - await cb({ db: new DrizzleSinkTransaction(db, endCursor) }); - }); - } - - async invalidateOnRestart(cursor?: Cursor) { - await this.invalidate(cursor); - } - - async invalidate(cursor?: Cursor) { - if (cursor?.orderKey === undefined) return; - - await this._db.transaction(async (db) => { - for (const table of this._tables) { - // delete all rows whose lowerbound of "_cursor" (int8range) column is greater than the invalidate cursor - await db - .delete(table) - .where(gt(sql`lower(_cursor)`, sql`${Number(cursor?.orderKey)}`)); - // and for rows whose upperbound of "_cursor" (int8range) column is greater than the invalidate cursor, set the upperbound to infinity - await db - .update(table) - .set({ - _cursor: sql`int8range(lower(_cursor), NULL, '[)')`, - }) - .where(gt(sql`upper(_cursor)`, sql`${Number(cursor?.orderKey)}`)); - } - }); - } - - async finalize(cursor?: Cursor) { - if (cursor?.orderKey === undefined) return; - - await this._db.transaction(async (db) => { - for (const table of this._tables) { - // delete all rows where the upper bound of "_cursor" is less than the finalize cursor - await db - .delete(table) - .where(sql`upper(_cursor) < ${Number(cursor?.orderKey)}`); - } - }); - } -} - -export const drizzleSink = < - TQueryResult extends PgQueryResultHKT, - TFullSchema extends Record = Record, - TSchema extends - TablesRelationalConfig = ExtractTablesWithRelations, ->( - args: DrizzleSinkOptions, -) => { - return new DrizzleSink(args); -}; diff --git a/packages/indexer/src/sinks/drizzle/index.ts b/packages/indexer/src/sinks/drizzle/index.ts deleted file mode 100644 index a560244..0000000 --- a/packages/indexer/src/sinks/drizzle/index.ts +++ /dev/null @@ -1,6 +0,0 @@ -export * from "./drizzle"; -export * from "./Int8Range"; -export * from "./utils"; -export * from "./transaction"; -export * from "./update"; -export * from "./delete"; diff --git a/packages/indexer/src/sinks/drizzle/insert.ts b/packages/indexer/src/sinks/drizzle/insert.ts deleted file mode 100644 index 77a6b2b..0000000 --- a/packages/indexer/src/sinks/drizzle/insert.ts +++ /dev/null @@ -1,42 +0,0 @@ -import type { Cursor } from "@apibara/protocol"; -import type { - ExtractTablesWithRelations, - TablesRelationalConfig, -} from "drizzle-orm"; -import type { - PgInsertValue as DrizzleInsertValue, - PgQueryResultHKT, - PgTable, - PgTransaction, -} from "drizzle-orm/pg-core"; -import { type PgInsertValue, getDrizzleCursor } from "./utils"; - -export class DrizzleSinkInsert< - TTable extends PgTable, - TQueryResult extends PgQueryResultHKT, - TFullSchema extends Record = Record, - TSchema extends - TablesRelationalConfig = ExtractTablesWithRelations, -> { - constructor( - private db: PgTransaction, - private table: TTable, - private endCursor?: Cursor, - ) {} - - values(values: PgInsertValue | PgInsertValue[]) { - const originalInsert = this.db.insert(this.table); - const cursoredValues = (Array.isArray(values) ? values : [values]).map( - (v) => { - return { - ...v, - _cursor: getDrizzleCursor(this.endCursor?.orderKey), - }; - }, - ); - - return originalInsert.values( - cursoredValues as DrizzleInsertValue[], - ); - } -} diff --git a/packages/indexer/src/sinks/drizzle/select.ts b/packages/indexer/src/sinks/drizzle/select.ts deleted file mode 100644 index 98e2f96..0000000 --- a/packages/indexer/src/sinks/drizzle/select.ts +++ /dev/null @@ -1,44 +0,0 @@ -import type { Cursor } from "@apibara/protocol"; -import { - type ExtractTablesWithRelations, - type SQL, - type Subquery, - type TablesRelationalConfig, - sql, -} from "drizzle-orm"; -import type { - PgQueryResultHKT, - PgTable, - PgTransaction, - SelectedFields, -} from "drizzle-orm/pg-core"; -import type { PgViewBase } from "drizzle-orm/pg-core/view-base"; - -export class DrizzleSinkSelect< - TSelection extends SelectedFields, - TQueryResult extends PgQueryResultHKT, - TFullSchema extends Record = Record, - TSchema extends - TablesRelationalConfig = ExtractTablesWithRelations, -> { - constructor( - private db: PgTransaction, - private fields?: TSelection, - private endCursor?: Cursor, - ) {} - - from(source: TFrom) { - if (this.fields) { - const originalFrom = this.db.select(this.fields).from(source); - return { - ...originalFrom, - where: (where?: SQL) => { - const combinedWhere = sql`${where ? sql`${where} AND ` : sql``}upper_inf(_cursor)`; - return originalFrom.where(combinedWhere); - }, - }; - } - - return this.db.select().from(source).where(sql`upper_inf(_cursor)`); - } -} diff --git a/packages/indexer/src/sinks/drizzle/transaction.ts b/packages/indexer/src/sinks/drizzle/transaction.ts deleted file mode 100644 index 84afabc..0000000 --- a/packages/indexer/src/sinks/drizzle/transaction.ts +++ /dev/null @@ -1,49 +0,0 @@ -import type { Cursor } from "@apibara/protocol"; -import type { - ExtractTablesWithRelations, - TablesRelationalConfig, -} from "drizzle-orm"; -import type { - PgQueryResultHKT, - PgSelectBuilder, - PgTable, - PgTransaction, - SelectedFields, -} from "drizzle-orm/pg-core"; -import { DrizzleSinkDelete } from "./delete"; -import { DrizzleSinkInsert } from "./insert"; -import { DrizzleSinkSelect } from "./select"; -import { DrizzleSinkUpdate } from "./update"; - -export class DrizzleSinkTransaction< - TQueryResult extends PgQueryResultHKT, - TFullSchema extends Record = Record, - TSchema extends - TablesRelationalConfig = ExtractTablesWithRelations, -> { - constructor( - private db: PgTransaction, - private endCursor?: Cursor, - ) {} - - insert(table: TTable) { - return new DrizzleSinkInsert(this.db, table, this.endCursor); - } - - update(table: TTable) { - return new DrizzleSinkUpdate(this.db, table, this.endCursor); - } - - delete(table: TTable) { - return new DrizzleSinkDelete(this.db, table, this.endCursor); - } - - // @ts-ignore - select(): PgSelectBuilder; - select( - fields: TSelection, - ): PgSelectBuilder; - select(fields?: SelectedFields) { - return new DrizzleSinkSelect(this.db, fields, this.endCursor); - } -} diff --git a/packages/indexer/src/sinks/drizzle/update.ts b/packages/indexer/src/sinks/drizzle/update.ts deleted file mode 100644 index 3cea116..0000000 --- a/packages/indexer/src/sinks/drizzle/update.ts +++ /dev/null @@ -1,70 +0,0 @@ -import type { Cursor } from "@apibara/protocol"; -import { - type ExtractTablesWithRelations, - type SQL, - type TablesRelationalConfig, - sql, -} from "drizzle-orm"; -import type { - PgQueryResultHKT, - PgTable, - PgTransaction, - PgUpdateBase, - PgUpdateSetSource, -} from "drizzle-orm/pg-core"; -import type { Int8Range } from "./Int8Range"; -import { getDrizzleCursor } from "./utils"; - -export class DrizzleSinkUpdate< - TTable extends PgTable, - TQueryResult extends PgQueryResultHKT, - TFullSchema extends Record = Record, - TSchema extends - TablesRelationalConfig = ExtractTablesWithRelations, -> { - constructor( - private db: PgTransaction, - private table: TTable, - private endCursor?: Cursor, - ) {} - - set(values: PgUpdateSetSource): PgUpdateBase { - const originalUpdate = this.db.update(this.table); - const originalSet = originalUpdate.set(values); - return { - ...originalSet, - where: async (where: SQL | undefined) => { - // 1. Find and store old versions of matching records - const oldRecords = await this.db - .select() - .from(this.table) - .where(sql`${where ? sql`${where} AND ` : sql``}upper_inf(_cursor)`) - .execute(); - - // 2. Insert old versions with updated upperbound cursor - if (oldRecords.length > 0) { - const oldRecordsWithNewCursor = oldRecords.map((record) => ({ - ...record, - _cursor: getDrizzleCursor([ - BigInt((record._cursor as Int8Range).range.lower!), - this.endCursor?.orderKey, - ]), - })); - - await this.db - .insert(this.table) - .values(oldRecordsWithNewCursor) - .execute(); - } - - // 3. Update matching records with new values and new 'lowerbound' cursor - return originalUpdate - .set({ - ...values, - _cursor: sql`int8range(${Number(this.endCursor?.orderKey!)}, NULL, '[)')`, - } as PgUpdateSetSource) - .where(sql`${where ? sql`${where} AND ` : sql``}upper_inf(_cursor)`); - }, - } as PgUpdateBase; - } -} diff --git a/packages/indexer/src/sinks/drizzle/utils.ts b/packages/indexer/src/sinks/drizzle/utils.ts deleted file mode 100644 index dead2aa..0000000 --- a/packages/indexer/src/sinks/drizzle/utils.ts +++ /dev/null @@ -1,103 +0,0 @@ -import type { - BuildColumns, - BuildExtraConfigColumns, - NotNull, - Placeholder, - SQL, -} from "drizzle-orm"; -import type { - PgColumnBuilderBase, - PgCustomColumnBuilder, - PgTable, - PgTableExtraConfigValue, - PgTableWithColumns, -} from "drizzle-orm/pg-core"; -import { pgTable as drizzlePgTable } from "drizzle-orm/pg-core"; -import range from "postgres-range"; -import { Int8Range, int8range } from "./Int8Range"; - -export type CursorColumnBuilder = NotNull< - PgCustomColumnBuilder<{ - name: "_cursor"; - dataType: "custom"; - columnType: "PgCustomColumn"; - data: Int8Range; - driverParam: undefined; - enumValues: undefined; - generated: undefined; - }> ->; - -// Redefining the type of `pgTable` to include the `_cursor` column. -export type PgIndexerTableWithCursorFn< - TSchema extends string | undefined = undefined, -> = < - TTableName extends string, - TColumnsMap extends Record, ->( - name: TTableName, - columns: TColumnsMap, - extraConfig?: ( - self: BuildExtraConfigColumns< - TTableName, - TColumnsMap & { _cursor: CursorColumnBuilder }, - "pg" - >, - ) => PgTableExtraConfigValue[], -) => PgTableWithColumns<{ - name: TTableName; - schema: TSchema; - columns: BuildColumns< - TTableName, - TColumnsMap & { _cursor: CursorColumnBuilder }, - "pg" - >; - dialect: "pg"; -}>; - -// Same as the drizzle's `PgInsertValue` type, but without the `_cursor` column. -export type PgInsertValue = Omit< - { - [Key in keyof TTable["$inferInsert"]]: - | TTable["$inferInsert"][Key] - | SQL - | Placeholder; - } & {}, - "_cursor" ->; - -export const pgIndexerTable: PgIndexerTableWithCursorFn = ( - name, - columns, - extraConfig?, -) => { - return drizzlePgTable( - name, - { - ...columns, - _cursor: int8range("_cursor").notNull(), - }, - extraConfig, - ); -}; - -export const getDrizzleCursor = ( - cursor_range: [bigint | undefined, bigint | undefined] | bigint | undefined, -) => { - const isArray = Array.isArray(cursor_range); - const [lower, upper] = isArray ? cursor_range : [cursor_range, undefined]; - let isNoUpperBound = false; - if (lower === undefined) { - throw new Error("Lower bound cursor is required"); - } - if (upper === undefined) { - isNoUpperBound = true; - } - return new Int8Range( - new range.Range( - Number(lower), - Number(upper), - range.RANGE_LB_INC | (isNoUpperBound ? range.RANGE_UB_INF : 0), - ), - ); -}; diff --git a/packages/indexer/src/sinks/sqlite.test.ts b/packages/indexer/src/sinks/sqlite.test.ts deleted file mode 100644 index 731e549..0000000 --- a/packages/indexer/src/sinks/sqlite.test.ts +++ /dev/null @@ -1,97 +0,0 @@ -import { - type MockBlock, - MockClient, - type MockFilter, -} from "@apibara/protocol/testing"; -import Database from "better-sqlite3"; -import { describe, expect, it } from "vitest"; -import { run } from "../indexer"; -import { generateMockMessages, getMockIndexer } from "../internal/testing"; -import { useSink } from "../sink"; -import { sqlite } from "./sqlite"; - -describe("Run Test", () => { - it("should store in sqlite db via sqlitesink", async () => { - const client = new MockClient((request, options) => { - return generateMockMessages(); - }); - - const db = new Database(":memory:"); - - db.prepare("DROP TABLE IF EXISTS test;").run(); - - db.prepare( - ` - CREATE TABLE IF NOT EXISTS test ( - data BLOB, - _cursor BIGINT - );`, - ).run(); - - const sink = sqlite({ - database: db, - tableName: "test", - }); - await run( - client, - getMockIndexer({ - sink, - override: { - transform: async ({ context, endCursor, block: { data } }) => { - const { writer } = useSink({ context }); - writer.insert([{ data }]); - }, - }, - }), - ); - - const sinkData = db.prepare("SELECT * FROM test").all(); - - expect(sinkData).toMatchInlineSnapshot(` - [ - { - "_cursor": 5000000, - "data": "5000000", - }, - { - "_cursor": 5000001, - "data": "5000001", - }, - { - "_cursor": 5000002, - "data": "5000002", - }, - { - "_cursor": 5000003, - "data": "5000003", - }, - { - "_cursor": 5000004, - "data": "5000004", - }, - { - "_cursor": 5000005, - "data": "5000005", - }, - { - "_cursor": 5000006, - "data": "5000006", - }, - { - "_cursor": 5000007, - "data": "5000007", - }, - { - "_cursor": 5000008, - "data": "5000008", - }, - { - "_cursor": 5000009, - "data": "5000009", - }, - ] - `); - - db.close(); - }); -}); diff --git a/packages/indexer/src/sinks/sqlite.ts b/packages/indexer/src/sinks/sqlite.ts deleted file mode 100644 index 4cd4830..0000000 --- a/packages/indexer/src/sinks/sqlite.ts +++ /dev/null @@ -1,182 +0,0 @@ -import type { Cursor } from "@apibara/protocol"; -import type { Database as SqliteDatabase } from "better-sqlite3"; -import { Sink, type SinkCursorParams, type SinkData } from "../sink"; - -export type SqliteSinkOptions = { - /** - * Database instance of better-sqlite3 - */ - database: SqliteDatabase; - /** - * The name of the table where data will be inserted. - */ - tableName: string; - /** - * An optional column name used to store the cursor value. If specified, - * the value of this column must match the `endCursor.orderKey` for each row. - */ - cursorColumn?: string; - /** - * An optional configuration to handle conflicts during data insertion. - * - `on`: The column name on which conflicts are detected. - * - `update`: An array of column names to be updated if a conflict occurs. - */ - onConflict?: { on: string; update: string[] }; -}; - -type TxnContext = { - buffer: SinkData[]; -}; - -type TxnParams = { - writer: { - insert: (data: SinkData[]) => void; - }; -}; -const transactionHelper = (context: TxnContext) => { - return { - insert: (data: SinkData[]) => { - context.buffer.push(...data); - }, - }; -}; - -/** - * A sink that writes data to a SQLite database. - * - * @example - * - * ```ts - * const sink = sqlite({ - * database: db, - * tableName: "test", - * }); - * - * ... - * async transform({context, endCursor}){ - * const { writer } = useSink(context); - * const insertHelper = writer(endCursor); - * - * insertHelper.insert([ - * { id: 1, name: "John" }, - * { id: 2, name: "Jane" }, - * ]); - * } - * - * ``` - */ - -export class SqliteSink extends Sink { - private _config: Omit; - private _db: SqliteDatabase; - - constructor(options: SqliteSinkOptions) { - super(); - const { database, ...config } = options; - this._config = config; - this._db = database; - } - - private async write({ - data, - endCursor, - }: { data: SinkData[]; endCursor?: Cursor }) { - data = this.processCursorColumn(data, endCursor); - await this.insertJsonArray(data); - } - - async transaction( - { cursor, endCursor, finality }: SinkCursorParams, - cb: (params: TxnParams) => Promise, - ) { - const context: TxnContext = { - buffer: [], - }; - - const writer = transactionHelper(context); - - await cb({ writer }); - await this.write({ data: context.buffer, endCursor }); - } - - async invalidateOnRestart(cursor?: Cursor) { - await this.invalidate(cursor); - } - - async invalidate(cursor?: Cursor) { - if (cursor?.orderKey === undefined) return; - - const cursorValue = Number(cursor.orderKey); - - const sql = `DELETE FROM ${this._config.tableName} WHERE ${this._config.cursorColumn ?? "_cursor"} > ?`; - this._db.prepare(sql).run(cursorValue); - } - - async finalize(cursor?: Cursor) { - // No Implementation required - } - - private async insertJsonArray(data: SinkData[]) { - if (data.length === 0) return; - - // Get columns from the first row of the object array - const columns = Object.keys(data[0]); - const columnNames = columns.join(", "); - const placeholders = columns.map(() => "?").join(", "); - - // Handle onConflict option - const conflictClause = this.buildConflictClause(); - - // Build the SQL insert statement with multiple rows - const insertSQL = `INSERT INTO ${this._config.tableName} (${columnNames}) VALUES `; - const valuePlaceholders = data.map(() => `(${placeholders})`).join(", "); - const statement = insertSQL + valuePlaceholders + conflictClause; - - // Prepare and execute the SQL statement - const values = data.flatMap((row) => columns.map((col) => row[col])); - - this._db.prepare(statement).run(values); - } - - private processCursorColumn( - data: SinkData[], - endCursor?: Cursor, - ): SinkData[] { - const { cursorColumn } = this._config; - - if ( - cursorColumn && - data.some( - (row) => Number(row[cursorColumn]) !== Number(endCursor?.orderKey), - ) - ) { - throw new Error( - `Mismatch of ${cursorColumn} and Cursor ${Number(endCursor?.orderKey)}`, - ); - } - - if (cursorColumn) { - return data; - } - - return data.map((row) => ({ - ...row, - _cursor: Number(endCursor?.orderKey), - })); - } - - private buildConflictClause(): string { - const { on, update } = this._config.onConflict || {}; - if (on && update && update.length > 0) { - const updateColumns = update - .map((col) => `${col}=excluded.${col}`) - .join(", "); - return ` ON CONFLICT(${on}) DO UPDATE SET ${updateColumns}`; - } - return ""; - } -} - -export const sqlite = (args: SqliteSinkOptions) => { - return new SqliteSink(args); -}; diff --git a/packages/plugin-drizzle/README.md b/packages/plugin-drizzle/README.md new file mode 100644 index 0000000..bc93d5a --- /dev/null +++ b/packages/plugin-drizzle/README.md @@ -0,0 +1,7 @@ +# `@apibara/plugin-drizzle` + +TODO + +## Installation + +TODO diff --git a/packages/plugin-drizzle/build.config.ts b/packages/plugin-drizzle/build.config.ts new file mode 100644 index 0000000..9aaddef --- /dev/null +++ b/packages/plugin-drizzle/build.config.ts @@ -0,0 +1,11 @@ +import { defineBuildConfig } from "unbuild"; + +export default defineBuildConfig({ + entries: ["./src/index.ts"], + clean: true, + outDir: "./dist", + declaration: true, + rollup: { + emitCJS: true, + }, +}); diff --git a/packages/plugin-drizzle/package.json b/packages/plugin-drizzle/package.json new file mode 100644 index 0000000..6a87687 --- /dev/null +++ b/packages/plugin-drizzle/package.json @@ -0,0 +1,41 @@ +{ + "name": "@apibara/plugin-drizzle", + "version": "2.0.0-beta.26", + "type": "module", + "files": [ + "dist", + "src", + "README.md" + ], + "main": "./dist/index.mjs", + "types": "./dist/index.d.ts", + "exports": { + ".": { + "types": "./dist/index.d.ts", + "import": "./dist/index.mjs", + "require": "./dist/index.cjs", + "default": "./dist/index.mjs" + } + }, + "scripts": { + "build": "unbuild", + "typecheck": "tsc --noEmit", + "lint": "biome check .", + "lint:fix": "pnpm lint --write", + "test": "vitest", + "test:ci": "vitest run" + }, + "devDependencies": { + "@electric-sql/pglite": "^0.2.14", + "@types/node": "^20.14.0", + "unbuild": "^2.0.0", + "vitest": "^1.6.0" + }, + "dependencies": { + "@apibara/indexer": "workspace:*", + "@apibara/protocol": "workspace:*", + "drizzle-orm": "^0.37.0", + "pg": "^8.13.1", + "postgres-range": "^1.1.4" + } +} diff --git a/packages/plugin-drizzle/src/index.ts b/packages/plugin-drizzle/src/index.ts new file mode 100644 index 0000000..a01dbf5 --- /dev/null +++ b/packages/plugin-drizzle/src/index.ts @@ -0,0 +1,5 @@ +import { DrizzleStorageError } from "./utils"; + +export function drizzleStorage() { + throw new DrizzleStorageError("Not implemented"); +} diff --git a/packages/indexer/src/plugins/drizzle-persistence.ts b/packages/plugin-drizzle/src/persistence.ts similarity index 100% rename from packages/indexer/src/plugins/drizzle-persistence.ts rename to packages/plugin-drizzle/src/persistence.ts diff --git a/packages/plugin-drizzle/src/utils.ts b/packages/plugin-drizzle/src/utils.ts new file mode 100644 index 0000000..7af22ac --- /dev/null +++ b/packages/plugin-drizzle/src/utils.ts @@ -0,0 +1,6 @@ +export class DrizzleStorageError extends Error { + constructor(message: string) { + super(message); + this.name = "DrizzleStorageError"; + } +} diff --git a/examples/indexer/tsconfig.json b/packages/plugin-drizzle/tsconfig.json similarity index 100% rename from examples/indexer/tsconfig.json rename to packages/plugin-drizzle/tsconfig.json diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ead51c9..052527b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -523,12 +523,21 @@ importers: '@types/pg': specifier: ^8.11.10 version: 8.11.10 - better-sqlite3: - specifier: ^11.5.0 - version: 11.5.0 - csv-stringify: - specifier: ^6.5.0 - version: 6.5.0 + unbuild: + specifier: ^2.0.0 + version: 2.0.0(typescript@5.6.2) + vitest: + specifier: ^1.6.0 + version: 1.6.0(@types/node@20.14.0) + + packages/plugin-drizzle: + dependencies: + '@apibara/indexer': + specifier: workspace:* + version: link:../indexer + '@apibara/protocol': + specifier: workspace:* + version: link:../protocol drizzle-orm: specifier: ^0.37.0 version: 0.37.0(@electric-sql/pglite@0.2.14)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.11)(@types/pg@8.11.10)(better-sqlite3@11.5.0)(pg@8.13.1)(postgres@3.4.4) @@ -538,6 +547,35 @@ importers: postgres-range: specifier: ^1.1.4 version: 1.1.4 + devDependencies: + '@electric-sql/pglite': + specifier: ^0.2.14 + version: 0.2.14 + '@types/node': + specifier: ^20.14.0 + version: 20.14.0 + unbuild: + specifier: ^2.0.0 + version: 2.0.0(typescript@5.6.2) + vitest: + specifier: ^1.6.0 + version: 1.6.0(@types/node@20.14.0) + + packages/plugin-mongo: + dependencies: + '@apibara/indexer': + specifier: workspace:* + version: link:../indexer + '@apibara/protocol': + specifier: workspace:* + version: link:../protocol + devDependencies: + '@types/node': + specifier: ^20.14.0 + version: 20.14.0 + mongodb: + specifier: ^6.12.0 + version: 6.12.0 unbuild: specifier: ^2.0.0 version: 2.0.0(typescript@5.6.2) @@ -555,7 +593,7 @@ importers: version: link:../protocol better-sqlite3: specifier: ^9.0.0 - version: 11.5.0 + version: 9.6.0 devDependencies: '@types/better-sqlite3': specifier: ^7.6.11 @@ -610,28 +648,6 @@ importers: specifier: ^1.6.0 version: 1.6.0(@types/node@20.12.13) - packages/sink-mongo: - dependencies: - '@apibara/indexer': - specifier: workspace:* - version: link:../indexer - '@apibara/protocol': - specifier: workspace:* - version: link:../protocol - devDependencies: - '@types/node': - specifier: ^20.14.0 - version: 20.14.0 - mongodb: - specifier: ^6.12.0 - version: 6.12.0 - unbuild: - specifier: ^2.0.0 - version: 2.0.0(typescript@5.6.2) - vitest: - specifier: ^1.6.0 - version: 1.6.0(@types/node@20.14.0) - packages/starknet: dependencies: '@apibara/protocol': @@ -2092,6 +2108,9 @@ packages: better-sqlite3@11.5.0: resolution: {integrity: sha512-e/6eggfOutzoK0JWiU36jsisdWoHOfN9iWiW/SieKvb7SAa6aGNmBM/UKyp+/wWSXpLlWNN8tCPwoDNPhzUvuQ==} + better-sqlite3@9.6.0: + resolution: {integrity: sha512-yR5HATnqeYNVnkaUTf4bOP2dJSnyhP4puJN/QPRyx4YkBEEUxib422n2XzPqDEHjQQqazoYoADdAm5vE15+dAQ==} + binary-extensions@2.3.0: resolution: {integrity: sha512-Ceh+7ox5qe7LJuLHoY0feh3pHuUDHAcRUeyL2VYghZwfpkNIy/+8Ocg0a3UuSoYzavmylwuLWQOf3hl0jjMMIw==} engines: {node: '>=8'} @@ -5216,6 +5235,11 @@ snapshots: bindings: 1.5.0 prebuild-install: 7.1.2 + better-sqlite3@9.6.0: + dependencies: + bindings: 1.5.0 + prebuild-install: 7.1.2 + binary-extensions@2.3.0: {} bindings@1.5.0: diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml index ba03beb..81b3965 100644 --- a/pnpm-workspace.yaml +++ b/pnpm-workspace.yaml @@ -1,3 +1,4 @@ packages: - "packages/*" - "examples/**" + - "!examples/cli" \ No newline at end of file