From 2806076d48803d3dd39789eeb71d1ec75ef70f86 Mon Sep 17 00:00:00 2001 From: kyscott18 <43524469+kyscott18@users.noreply.github.com> Date: Mon, 18 Nov 2024 16:58:17 -0500 Subject: [PATCH] fix: OOM issues with memory optimized flush (#1251) * start copy * use temp table to update * improved flush * cleanup * cleanup * cleanup * pgltie * cleanup * chore: changeset * nits --- .changeset/clean-months-end.md | 5 + packages/core/package.json | 2 + packages/core/src/common/options.ts | 4 +- packages/core/src/database/index.ts | 4 + .../core/src/indexing-store/historical.ts | 329 ++++++++++++------ pnpm-lock.yaml | 29 +- 6 files changed, 259 insertions(+), 114 deletions(-) create mode 100644 .changeset/clean-months-end.md diff --git a/.changeset/clean-months-end.md b/.changeset/clean-months-end.md new file mode 100644 index 000000000..1b4636648 --- /dev/null +++ b/.changeset/clean-months-end.md @@ -0,0 +1,5 @@ +--- +"@ponder/core": patch +--- + +Improved memory usage during historical indexing. diff --git a/packages/core/package.json b/packages/core/package.json index 4e2a41bfc..ab34324c3 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -67,6 +67,7 @@ "kysely-pglite": "^0.6.0", "pg": "^8.11.3", "pg-connection-string": "^2.6.2", + "pg-copy-streams": "^6.0.6", "picocolors": "^1.0.0", "pino": "^8.16.2", "prom-client": "^15.0.0", @@ -81,6 +82,7 @@ "@types/glob": "^8.1.0", "@types/node": "^20.10.0", "@types/pg": "^8.10.9", + "@types/pg-copy-streams": "^1.2.5", "@types/react": "^18.2.38", "@viem/anvil": "^0.0.6", "@wagmi/cli": "^1.5.2", diff --git a/packages/core/src/common/options.ts b/packages/core/src/common/options.ts index f22793e67..fad33c173 100644 --- a/packages/core/src/common/options.ts +++ b/packages/core/src/common/options.ts @@ -103,14 +103,14 @@ export const buildOptions = ({ cliOptions }: { cliOptions: CliOptions }) => { factoryAddressCountThreshold: 1_000, - // v8.getHeapStatistics().heap_size_limit / 4, bucketed closest to 128, 256, 512, 1024, 2048 mB + // v8.getHeapStatistics().heap_size_limit / 8, bucketed closest to 128, 256, 512, 1024, 2048 mB indexingCacheMaxBytes: 2 ** Math.min( Math.max( Math.round( Math.log2( - v8.getHeapStatistics().heap_size_limit / 1_024 / 1_024 / 4, + v8.getHeapStatistics().heap_size_limit / 1_024 / 1_024 / 8, ), ), 7, diff --git a/packages/core/src/database/index.ts b/packages/core/src/database/index.ts index 128886068..d895e87f1 100644 --- a/packages/core/src/database/index.ts +++ b/packages/core/src/database/index.ts @@ -45,6 +45,8 @@ import prometheus from "prom-client"; import { HeadlessKysely } from "./kysely.js"; export type Database = { + dialect: "pglite" | "postgres"; + driver: PGliteDriver | PostgresDriver; qb: QueryBuilder; drizzle: Drizzle; migrateSync(): Promise; @@ -452,6 +454,8 @@ export const createDatabase = (args: { }; const database = { + dialect: dialect === "postgres" ? "postgres" : "pglite", + driver, qb, drizzle, async migrateSync() { diff --git a/packages/core/src/indexing-store/historical.ts b/packages/core/src/indexing-store/historical.ts index 611758255..5ae0532d3 100644 --- a/packages/core/src/indexing-store/historical.ts +++ b/packages/core/src/indexing-store/historical.ts @@ -1,3 +1,5 @@ +import { Readable } from "node:stream"; +import { pipeline } from "node:stream/promises"; import type { Common } from "@/common/common.js"; import { BigIntSerializationError, @@ -18,6 +20,8 @@ import { import { getColumnCasing } from "@/drizzle/kit/index.js"; import { encodeCheckpoint, zeroCheckpoint } from "@/utils/checkpoint.js"; import { prettyPrint } from "@/utils/print.js"; +import type { PGlite } from "@electric-sql/pglite"; +import { createQueue } from "@ponder/common"; import { type Column, type QueryWithTypings, @@ -27,11 +31,12 @@ import { and, eq, getTableColumns, - sql, + getTableName, } from "drizzle-orm"; import { type PgTable, getTableConfig } from "drizzle-orm/pg-core"; import { drizzle } from "drizzle-orm/pg-proxy"; -import { createQueue } from "../../../common/src/queue.js"; +import type { Pool } from "pg"; +import copy from "pg-copy-streams"; import { type IndexingStore, parseSqlError } from "./index.js"; enum EntryType { @@ -279,6 +284,40 @@ export const createHistoricalIndexingStore = ({ } }; + const getCopyCSV = ( + table: Table, + entries: (InsertEntry | UpdateEntry)["row"][], + ) => { + const columns = Object.entries(getTableColumns(table)); + let result = ""; + + while (entries.length > 0) { + const entry = entries.pop()!; + for (let j = 0; j < columns.length; j++) { + const [columnName, column] = columns[j]!; + const value = entry[columnName]; + + if (value === null || value === undefined) { + result += "\\N"; + } else if (column.mapToDriverValue === undefined) { + result += `"${String(value).replace(/"/g, '""')}"`; + } else { + const mappedValue = column.mapToDriverValue(value); + if (mappedValue === null || mappedValue === undefined) { + result += "\\N"; + } else { + result += `"${String(mappedValue).replace(/"/g, '""')}"`; + } + } + // Add comma if not last column + if (j < columns.length - 1) result += ","; + } + + result += "\n"; + } + return result; + }; + const getBytes = (value: unknown) => { // size of metadata let size = 13; @@ -753,133 +792,207 @@ export const createHistoricalIndexingStore = ({ ), async flush() { await queue.add(async () => { - const promises: Promise[] = []; + let cacheSize = 0; + for (const c of cache.values()) cacheSize += c.size; + + const flushIndex = + totalCacheOps - + cacheSize * (1 - common.options.indexingCacheFlushRatio); + const shouldDelete = cacheBytes > maxBytes; + if (shouldDelete) isDatabaseEmpty = false; + + const promises: Promise[] = []; for (const [table, tableCache] of cache) { - const entries = Array.from(tableCache.values()); + const insertValues: InsertEntry["row"][] = []; + const updateValues: UpdateEntry["row"][] = []; + + for (const [key, entry] of tableCache) { + if (entry.type === EntryType.INSERT) { + insertValues.push(entry.row); + } + + if (entry.type === EntryType.UPDATE) { + updateValues.push(entry.row); + } - const batchSize = Math.round( - common.options.databaseMaxQueryParameters / - Object.keys(getTableColumns(table)).length, - ); + if (shouldDelete && entry.operationIndex < flushIndex) { + tableCache.delete(key); + cacheBytes -= entry.bytes; + } - const insertValues = entries - .filter((e) => e.type === EntryType.INSERT) - .map((e) => e.row); + entry.type = EntryType.FIND; + } - const updateValues = entries - .filter((e) => e.type === EntryType.UPDATE) - .map((e) => e.row); + const insertSize = insertValues.length; + const updateSize = updateValues.length; + // `insertValues` and `updateValues` are mutated, so that the entries may be garbage collected + const insertCSV = getCopyCSV(table, insertValues); + const updateCSV = getCopyCSV(table, updateValues); - if (insertValues.length > 0) { + // Steps for flushing "insert" entries: + // 1. Copy into target table + if (insertSize > 0) { common.logger.debug({ service: "indexing", - msg: `Inserting ${insertValues.length} cached '${tableNameCache.get(table)}' rows into the database`, + msg: `Inserting ${insertSize} cached '${tableNameCache.get(table)}' rows into the database`, }); - for ( - let i = 0, len = insertValues.length; - i < len; - i += batchSize - ) { - promises.push( - database.qb.user.wrap( - { - method: `${tableNameCache.get(table)}.flush()`, - }, - async () => - await database.drizzle - .insert(table) - .values(insertValues.slice(i, i + batchSize)) - .catch((_error) => { - const error = _error as Error; - common.logger.error({ - service: "indexing", - msg: "Internal error occurred while flushing cache. Please report this error here: https://github.com/ponder-sh/ponder/issues", - }); - throw new FlushError(error.message); - }), - ), - ); - } + promises.push( + database.qb.user.wrap( + { method: `${tableNameCache.get(table)}.flush()` }, + async () => { + if (database.dialect === "pglite") { + try { + const client = (database.driver as { instance: PGlite }) + .instance; + + await client.query( + `COPY "${getTableConfig(table).schema ?? "public"}"."${getTableName(table)}" FROM '/dev/blob' WITH (FORMAT csv)`, + [], + { + blob: new Blob([insertCSV]), + }, + ); + } catch (_error) { + const error = _error as Error; + common.logger.error({ + service: "indexing", + msg: "Internal error occurred while flushing cache. Please report this error here: https://github.com/ponder-sh/ponder/issues", + }); + throw new FlushError(error.message); + } + } else { + const client = await ( + database.driver as { internal: Pool } + ).internal.connect(); + + try { + await pipeline( + Readable.from(insertCSV), + client.query( + copy.from( + `COPY "${getTableConfig(table).schema ?? "public"}"."${getTableName(table)}" FROM STDIN WITH (FORMAT csv)`, + ), + ), + ); + } catch (_error) { + const error = _error as Error; + common.logger.error({ + service: "indexing", + msg: "Internal error occurred while flushing cache. Please report this error here: https://github.com/ponder-sh/ponder/issues", + }); + throw new FlushError(error.message); + } finally { + client.release(); + } + } + }, + ), + ); } - if (updateValues.length > 0) { + // Steps for flushing "update" entries: + // 1. Create temp table + // 2. Copy into temp table + // 3. Update target table with data from temp table + // 4. Drop temp table + if (updateSize > 0) { common.logger.debug({ service: "indexing", - msg: `Updating ${updateValues.length} cached '${tableNameCache.get(table)}' records in the database`, + msg: `Updating ${updateSize} cached '${tableNameCache.get(table)}' rows in the database`, }); const primaryKeys = primaryKeysCache.get(table)!; - - const set: { [column: string]: SQL } = {}; - for (const [columnName, column] of Object.entries( - getTableColumns(table), - )) { - set[columnName] = sql.raw( - `excluded."${getColumnCasing(column, "snake_case")}"`, - ); - } - - for ( - let i = 0, len = updateValues.length; - i < len; - i += batchSize - ) { - promises.push( - database.qb.user.wrap( - { - method: `${tableNameCache.get(table)}.flush()`, - }, - async () => - await database.drizzle - .insert(table) - .values(updateValues.slice(i, i + batchSize)) - .onConflictDoUpdate({ - // @ts-ignore - target: primaryKeys.map(({ js }) => table[js]), - set, - }) - .catch((_error) => { - const error = _error as Error; - common.logger.error({ - service: "indexing", - msg: "Internal error occurred while flushing cache. Please report this error here: https://github.com/ponder-sh/ponder/issues", - }); - throw new FlushError(error.message); - }), - ), - ); - } + const set = Object.values(getTableColumns(table)) + .map( + (column) => + `"${getColumnCasing(column, "snake_case")}" = source."${getColumnCasing(column, "snake_case")}"`, + ) + .join(",\n"); + + const createTempTableQuery = ` +CREATE TEMP TABLE "${tableNameCache.get(table)}" AS +SELECT * FROM "${getTableConfig(table).schema ?? "public"}"."${getTableName(table)}" +WITH NO DATA; +`; + + const updateQuery = ` +UPDATE "${getTableConfig(table).schema ?? "public"}"."${getTableName(table)}" as target +SET ${set} +FROM "${tableNameCache.get(table)}" as source +WHERE ${primaryKeys.map(({ sql }) => `target."${sql}" = source."${sql}"`).join(" AND ")}; +`; + + const dropTempTableQuery = `DROP TABLE IF EXISTS "${tableNameCache.get(table)}"`; + + promises.push( + database.qb.user.wrap( + { method: `${tableNameCache.get(table)}.flush()` }, + async () => { + if (database.dialect === "pglite") { + try { + const client = (database.driver as { instance: PGlite }) + .instance; + + await client.query(createTempTableQuery); + + await client.query( + `COPY "${tableNameCache.get(table)}" FROM '/dev/blob' WITH (FORMAT csv)`, + [], + { + blob: new Blob([updateCSV]), + }, + ); + + await client.query(updateQuery); + await client.query(dropTempTableQuery); + } catch (_error) { + const error = _error as Error; + common.logger.error({ + service: "indexing", + msg: "Internal error occurred while flushing cache. Please report this error here: https://github.com/ponder-sh/ponder/issues", + }); + throw new FlushError(error.message); + } + } else { + const client = await ( + database.driver as { internal: Pool } + ).internal.connect(); + + try { + await client.query(createTempTableQuery); + + await pipeline( + Readable.from(updateCSV), + client.query( + copy.from( + `COPY "${tableNameCache.get(table)}" FROM STDIN WITH (FORMAT csv)`, + ), + ), + ); + + await client.query(updateQuery); + // temp table is not dropped automatically when using pg.Pool + await client.query(dropTempTableQuery); + } catch (_error) { + const error = _error as Error; + common.logger.error({ + service: "indexing", + msg: "Internal error occurred while flushing cache. Please report this error here: https://github.com/ponder-sh/ponder/issues", + }); + throw new FlushError(error.message); + } finally { + client.release(); + } + } + }, + ), + ); } } await Promise.all(promises); - - let cacheSize = 0; - - for (const c of cache.values()) cacheSize += c.size; - - const flushIndex = - totalCacheOps - - cacheSize * (1 - common.options.indexingCacheFlushRatio); - - const shouldDelete = cacheBytes > maxBytes; - - for (const tableCache of cache.values()) { - for (const [key, entry] of tableCache) { - entry.type = EntryType.FIND; - - if (shouldDelete && entry.operationIndex < flushIndex) { - tableCache.delete(key); - cacheBytes -= entry.bytes; - } - } - } - - if (shouldDelete) { - isDatabaseEmpty = false; - } }); }, isCacheFull() { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 87356f4e2..10ed42b59 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -557,7 +557,7 @@ importers: dependencies: forge-std: specifier: github:foundry-rs/forge-std - version: https://codeload.github.com/foundry-rs/forge-std/tar.gz/da591f56d8884c5824c0c1b3103fbcfd81123c4c + version: https://codeload.github.com/foundry-rs/forge-std/tar.gz/0e7097750918380d84dd3cfdef595bee74dabb70 examples/with-foundry/ponder: dependencies: @@ -797,6 +797,9 @@ importers: pg-connection-string: specifier: ^2.6.2 version: 2.6.2 + pg-copy-streams: + specifier: ^6.0.6 + version: 6.0.6 picocolors: specifier: ^1.0.0 version: 1.0.0 @@ -834,6 +837,9 @@ importers: '@types/pg': specifier: ^8.10.9 version: 8.10.9 + '@types/pg-copy-streams': + specifier: ^1.2.5 + version: 1.2.5 '@types/react': specifier: ^18.2.38 version: 18.2.46 @@ -3120,6 +3126,9 @@ packages: '@types/pbkdf2@3.1.2': resolution: {integrity: sha512-uRwJqmiXmh9++aSu1VNEn3iIxWOhd8AHXNSdlaLfdAAdSTY9jYVeGWnzejM3dvrkbqE3/hyQkQQ29IFATEGlew==} + '@types/pg-copy-streams@1.2.5': + resolution: {integrity: sha512-7D6/GYW2uHIaVU6S/5omI+6RZnwlZBpLQDZAH83xX1rjxAOK0f6/deKyyUTewxqts145VIGn6XWYz1YGf50G5g==} + '@types/pg@8.10.9': resolution: {integrity: sha512-UksbANNE/f8w0wOMxVKKIrLCbEMV+oM1uKejmwXr39olg4xqcfBDbXxObJAt6XxHbDa4XTKOlUEcEltXDX+XLQ==} @@ -5009,8 +5018,8 @@ packages: forever-agent@0.6.1: resolution: {integrity: sha512-j0KLYPhm6zeac4lz3oJ3o65qvgQCcPubiyotZrXqEaG4hNagNYO8qdlUrX5vwqv9ohqeT/Z3j6+yW067yWWdUw==} - forge-std@https://codeload.github.com/foundry-rs/forge-std/tar.gz/da591f56d8884c5824c0c1b3103fbcfd81123c4c: - resolution: {tarball: https://codeload.github.com/foundry-rs/forge-std/tar.gz/da591f56d8884c5824c0c1b3103fbcfd81123c4c} + forge-std@https://codeload.github.com/foundry-rs/forge-std/tar.gz/0e7097750918380d84dd3cfdef595bee74dabb70: + resolution: {tarball: https://codeload.github.com/foundry-rs/forge-std/tar.gz/0e7097750918380d84dd3cfdef595bee74dabb70} version: 1.9.4 form-data@2.3.3: @@ -6986,6 +6995,9 @@ packages: pg-connection-string@2.6.2: resolution: {integrity: sha512-ch6OwaeaPYcova4kKZ15sbJ2hKb/VP48ZD2gE7i1J+L4MspCtBMAx8nMgz7bksc7IojCIIWuEhHibSMFH8m8oA==} + pg-copy-streams@6.0.6: + resolution: {integrity: sha512-Z+Dd2C2NIDTsjyFKmc6a9QLlpM8tjpERx+43RSx0WmL7j3uNChERi3xSvZUL0hWJ1oRUn4S3fhyt3apdSrTyKQ==} + pg-int8@1.0.1: resolution: {integrity: sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==} engines: {node: '>=4.0.0'} @@ -11039,6 +11051,11 @@ snapshots: dependencies: '@types/node': 20.11.24 + '@types/pg-copy-streams@1.2.5': + dependencies: + '@types/node': 20.11.24 + '@types/pg': 8.10.9 + '@types/pg@8.10.9': dependencies: '@types/node': 20.11.24 @@ -13269,7 +13286,7 @@ snapshots: forever-agent@0.6.1: {} - forge-std@https://codeload.github.com/foundry-rs/forge-std/tar.gz/da591f56d8884c5824c0c1b3103fbcfd81123c4c: {} + forge-std@https://codeload.github.com/foundry-rs/forge-std/tar.gz/0e7097750918380d84dd3cfdef595bee74dabb70: {} form-data@2.3.3: dependencies: @@ -15909,6 +15926,10 @@ snapshots: pg-connection-string@2.6.2: {} + pg-copy-streams@6.0.6: + dependencies: + obuf: 1.1.2 + pg-int8@1.0.1: {} pg-numeric@1.0.2: {}