From f29af291de2db9b0cf7e8dbe18cb4a18faa454e8 Mon Sep 17 00:00:00 2001 From: Darick Tong Date: Fri, 3 Jan 2025 11:54:32 +0900 Subject: [PATCH] feat(zero-cache): make initial-sync parameters configurable --- .../zero-cache/src/config/zero-config.test.ts | 13 ++++++++++ packages/zero-cache/src/config/zero-config.ts | 21 ++++++++++++++++ .../zero-cache/src/server/change-streamer.ts | 1 + .../src/server/multi/config.test.ts | 19 +++++++++++++++ .../pg/change-source.end-to-mid.pg-test.ts | 1 + .../pg/change-source.pg-test.ts | 2 ++ .../change-streamer/pg/change-source.ts | 4 +++- .../pg/initial-sync.pg-test.ts | 7 +++++- .../change-streamer/pg/initial-sync.ts | 24 ++++++++++++------- .../change-streamer/pg/sync-schema.pg-test.ts | 1 + .../change-streamer/pg/sync-schema.ts | 6 +++-- 11 files changed, 86 insertions(+), 13 deletions(-) diff --git a/packages/zero-cache/src/config/zero-config.test.ts b/packages/zero-cache/src/config/zero-config.test.ts index 03f985f945..2cd2bb9580 100644 --- a/packages/zero-cache/src/config/zero-config.test.ts +++ b/packages/zero-cache/src/config/zero-config.test.ts @@ -220,6 +220,19 @@ test('zero-cache --help', () => { ZERO_STORAGE_DB_TMP_DIR env tmp directory for IVM operator storage. Leave unset to use os.tmpdir() + --initial-sync-table-copy-workers number default: 5 + ZERO_INITIAL_SYNC_TABLE_COPY_WORKERS env + The number of parallel workers used to copy tables during initial sync. + Each worker copies a single table at a time, fetching rows in batches of + of rowBatchSize. + + --initial-sync-row-batch-size number default: 10000 + ZERO_INITIAL_SYNC_ROW_BATCH_SIZE env + The number of rows each table copy worker fetches at a time during + initial sync. This can be increased to speed up initial sync, or decreased + to reduce the amount of heap memory used during initial sync (e.g. for tables + with large rows). + " `); }); diff --git a/packages/zero-cache/src/config/zero-config.ts b/packages/zero-cache/src/config/zero-config.ts index d3a4e55730..47ca4f2f76 100644 --- a/packages/zero-cache/src/config/zero-config.ts +++ b/packages/zero-cache/src/config/zero-config.ts @@ -344,6 +344,27 @@ export const zeroOptions = { ], }, + initialSync: { + tableCopyWorkers: { + type: v.number().default(5), + desc: [ + `The number of parallel workers used to copy tables during initial sync.`, + `Each worker copies a single table at a time, fetching rows in batches of`, + `of {bold rowBatchSize}.`, + ], + }, + + rowBatchSize: { + type: v.number().default(10000), + desc: [ + `The number of rows each table copy worker fetches at a time during`, + `initial sync. This can be increased to speed up initial sync, or decreased`, + `to reduce the amount of heap memory used during initial sync (e.g. for tables`, + `with large rows).`, + ], + }, + }, + tenantID: { type: v.string().optional(), desc: ['Passed by multi/main.ts to tag the LogContext of zero-caches'], diff --git a/packages/zero-cache/src/server/change-streamer.ts b/packages/zero-cache/src/server/change-streamer.ts index 99c4ea0c1e..ecfb68f08d 100644 --- a/packages/zero-cache/src/server/change-streamer.ts +++ b/packages/zero-cache/src/server/change-streamer.ts @@ -53,6 +53,7 @@ export default async function runWorker( config.upstream.db, config.shard, config.replicaFile, + config.initialSync, ); changeStreamer = await initializeStreamer( diff --git a/packages/zero-cache/src/server/multi/config.test.ts b/packages/zero-cache/src/server/multi/config.test.ts index 5622946fb4..0d8ec7e624 100644 --- a/packages/zero-cache/src/server/multi/config.test.ts +++ b/packages/zero-cache/src/server/multi/config.test.ts @@ -50,6 +50,10 @@ test('parse options', () => { "db": "foo", "maxConns": 30, }, + "initialSync": { + "rowBatchSize": 10000, + "tableCopyWorkers": 5, + }, "log": { "format": "text", "level": "info", @@ -100,6 +104,8 @@ test('parse options', () => { "ZERO_CHANGE_MAX_CONNS": "1", "ZERO_CVR_DB": "foo", "ZERO_CVR_MAX_CONNS": "30", + "ZERO_INITIAL_SYNC_ROW_BATCH_SIZE": "10000", + "ZERO_INITIAL_SYNC_TABLE_COPY_WORKERS": "5", "ZERO_LOG_FORMAT": "text", "ZERO_LOG_LEVEL": "info", "ZERO_PER_USER_MUTATION_LIMIT_WINDOW_MS": "60000", @@ -358,6 +364,19 @@ test('zero-cache --help', () => { ZERO_STORAGE_DB_TMP_DIR env tmp directory for IVM operator storage. Leave unset to use os.tmpdir() + --initial-sync-table-copy-workers number default: 5 + ZERO_INITIAL_SYNC_TABLE_COPY_WORKERS env + The number of parallel workers used to copy tables during initial sync. + Each worker copies a single table at a time, fetching rows in batches of + of rowBatchSize. + + --initial-sync-row-batch-size number default: 10000 + ZERO_INITIAL_SYNC_ROW_BATCH_SIZE env + The number of rows each table copy worker fetches at a time during + initial sync. This can be increased to speed up initial sync, or decreased + to reduce the amount of heap memory used during initial sync (e.g. for tables + with large rows). + --tenants-json string optional ZERO_TENANTS_JSON env JSON encoding of per-tenant configs for running the server in multi-tenant mode: diff --git a/packages/zero-cache/src/services/change-streamer/pg/change-source.end-to-mid.pg-test.ts b/packages/zero-cache/src/services/change-streamer/pg/change-source.end-to-mid.pg-test.ts index 690425f67a..2913febbb0 100644 --- a/packages/zero-cache/src/services/change-streamer/pg/change-source.end-to-mid.pg-test.ts +++ b/packages/zero-cache/src/services/change-streamer/pg/change-source.end-to-mid.pg-test.ts @@ -74,6 +74,7 @@ describe('change-source/pg/end-to-mid-test', () => { upstreamURI, {id: SHARD_ID, publications: ['zero_some_public', 'zero_all_test']}, replicaDbFile.path, + {tableCopyWorkers: 5, rowBatchSize: 10000}, ) ).changeSource; const stream = await source.startStream('00'); diff --git a/packages/zero-cache/src/services/change-streamer/pg/change-source.pg-test.ts b/packages/zero-cache/src/services/change-streamer/pg/change-source.pg-test.ts index 917768acd3..24bbb62a8a 100644 --- a/packages/zero-cache/src/services/change-streamer/pg/change-source.pg-test.ts +++ b/packages/zero-cache/src/services/change-streamer/pg/change-source.pg-test.ts @@ -81,6 +81,7 @@ describe('change-source/pg', () => { upstreamURI, {id: SHARD_ID, publications: ['zero_foo', 'zero_zero']}, replicaDbFile.path, + {tableCopyWorkers: 5, rowBatchSize: 10000}, ) ).changeSource; }); @@ -611,6 +612,7 @@ describe('change-source/pg', () => { upstreamURI, {id: SHARD_ID, publications: ['zero_different_publication']}, replicaDbFile.path, + {tableCopyWorkers: 5, rowBatchSize: 10000}, ); } catch (e) { err = e; diff --git a/packages/zero-cache/src/services/change-streamer/pg/change-source.ts b/packages/zero-cache/src/services/change-streamer/pg/change-source.ts index 8ac173530e..df74d327ba 100644 --- a/packages/zero-cache/src/services/change-streamer/pg/change-source.ts +++ b/packages/zero-cache/src/services/change-streamer/pg/change-source.ts @@ -55,7 +55,7 @@ import type { import type {Data, DownstreamChange} from '../change-streamer.js'; import type {DataChange, Identifier, MessageDelete} from '../schema/change.js'; import {AutoResetSignal, type ReplicationConfig} from '../schema/tables.js'; -import {replicationSlot} from './initial-sync.js'; +import {replicationSlot, type InitialSyncOptions} from './initial-sync.js'; import {fromLexiVersion, toLexiVersion, type LSN} from './lsn.js'; import {replicationEventSchema, type DdlUpdateEvent} from './schema/ddl.js'; import {updateShardSchema} from './schema/init.js'; @@ -82,6 +82,7 @@ export async function initializeChangeSource( upstreamURI: string, shard: ShardConfig, replicaDbFile: string, + syncOptions: InitialSyncOptions, ): Promise<{replicationConfig: ReplicationConfig; changeSource: ChangeSource}> { await initSyncSchema( lc, @@ -89,6 +90,7 @@ export async function initializeChangeSource( shard, replicaDbFile, upstreamURI, + syncOptions, ); const replica = new Database(lc, replicaDbFile); diff --git a/packages/zero-cache/src/services/change-streamer/pg/initial-sync.pg-test.ts b/packages/zero-cache/src/services/change-streamer/pg/initial-sync.pg-test.ts index 9b58644fb5..79d7245ac1 100644 --- a/packages/zero-cache/src/services/change-streamer/pg/initial-sync.pg-test.ts +++ b/packages/zero-cache/src/services/change-streamer/pg/initial-sync.pg-test.ts @@ -914,6 +914,7 @@ describe('replicator/initial-sync', () => { {id: SHARD_ID, publications: c.requestedPublications ?? []}, replica, getConnectionURI(upstream), + {tableCopyWorkers: 5, rowBatchSize: 10000}, ); const result = await upstream.unsafe( @@ -1004,7 +1005,10 @@ describe('replicator/initial-sync', () => { let result; try { - await initialSync(lc, shardConfig, replica, getConnectionURI(upstream)); + await initialSync(lc, shardConfig, replica, getConnectionURI(upstream), { + tableCopyWorkers: 5, + rowBatchSize: 10000, + }); } catch (e) { result = e; } @@ -1025,6 +1029,7 @@ describe('replicator/initial-sync', () => { {id, publications: []}, replica, getConnectionURI(upstream), + {tableCopyWorkers: 5, rowBatchSize: 10000}, ); } catch (e) { result = e; diff --git a/packages/zero-cache/src/services/change-streamer/pg/initial-sync.ts b/packages/zero-cache/src/services/change-streamer/pg/initial-sync.ts index 68140bc5a9..8619b12525 100644 --- a/packages/zero-cache/src/services/change-streamer/pg/initial-sync.ts +++ b/packages/zero-cache/src/services/change-streamer/pg/initial-sync.ts @@ -34,6 +34,11 @@ import { } from './schema/shard.js'; import type {ShardConfig} from './shard-config.js'; +export type InitialSyncOptions = { + tableCopyWorkers: number; + rowBatchSize: number; +}; + // https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION const ALLOWED_SHARD_ID_CHARACTERS = /^[a-z0-9_]+$/; @@ -46,14 +51,16 @@ export async function initialSync( shard: ShardConfig, tx: Database, upstreamURI: string, + syncOptions: InitialSyncOptions, ) { if (!ALLOWED_SHARD_ID_CHARACTERS.test(shard.id)) { throw new Error( 'A shard ID may only consist of lower-case letters, numbers, and the underscore character', ); } + const {tableCopyWorkers: numWorkers, rowBatchSize} = syncOptions; const upstreamDB = pgClient(lc, upstreamURI, { - max: MAX_WORKERS, + max: numWorkers, }); const replicationSession = pgClient(lc, upstreamURI, { // eslint-disable-next-line @typescript-eslint/naming-convention @@ -71,7 +78,7 @@ export async function initialSync( await createReplicationSlot(lc, shard.id, replicationSession); // Run up to MAX_WORKERS to copy of tables at the replication slot's snapshot. - const copiers = startTableCopyWorkers(lc, upstreamDB, snapshot); + const copiers = startTableCopyWorkers(lc, upstreamDB, snapshot, numWorkers); let published: PublicationInfo; try { // Retrieve the published schema at the consistent_point. @@ -87,7 +94,9 @@ export async function initialSync( createLiteIndices(tx, indexes); await Promise.all( tables.map(table => - copiers.process(db => copy(lc, table, db, tx).then(() => [])), + copiers.process(db => + copy(lc, table, db, tx, rowBatchSize).then(() => []), + ), ), ); } finally { @@ -182,17 +191,13 @@ async function createReplicationSlot( return slot; } -// TODO: Consider parameterizing these. -const MAX_WORKERS = 5; -const BATCH_SIZE = 100_000; - function startTableCopyWorkers( lc: LogContext, db: PostgresDB, snapshot: string, + numWorkers: number, ): TransactionPool { const {init} = importSnapshot(snapshot); - const numWorkers = MAX_WORKERS; const tableCopiers = new TransactionPool( lc, Mode.READONLY, @@ -223,6 +228,7 @@ async function copy( table: PublishedTableSpec, from: PostgresDB, to: Database, + rowBatchSize: number, ) { let totalRows = 0; const tableName = liteTableName(table); @@ -252,7 +258,7 @@ async function copy( lc.info?.(`Starting copy of ${tableName}:`, selectStmt); - const cursor = from.unsafe(selectStmt).cursor(BATCH_SIZE); + const cursor = from.unsafe(selectStmt).cursor(rowBatchSize); for await (const rows of cursor) { for (const row of rows) { insertStmt.run([ diff --git a/packages/zero-cache/src/services/change-streamer/pg/sync-schema.pg-test.ts b/packages/zero-cache/src/services/change-streamer/pg/sync-schema.pg-test.ts index e5b2a11bb6..9fe9a4ae65 100644 --- a/packages/zero-cache/src/services/change-streamer/pg/sync-schema.pg-test.ts +++ b/packages/zero-cache/src/services/change-streamer/pg/sync-schema.pg-test.ts @@ -128,6 +128,7 @@ describe('change-streamer/pg/sync-schema', () => { {id: SHARD_ID, publications: c.requestedPublications ?? []}, replicaFile.path, getConnectionURI(upstream), + {tableCopyWorkers: 5, rowBatchSize: 10000}, ); await expectTables(upstream, c.upstreamPostState); diff --git a/packages/zero-cache/src/services/change-streamer/pg/sync-schema.ts b/packages/zero-cache/src/services/change-streamer/pg/sync-schema.ts index 8129894602..11ae6cfc2b 100644 --- a/packages/zero-cache/src/services/change-streamer/pg/sync-schema.ts +++ b/packages/zero-cache/src/services/change-streamer/pg/sync-schema.ts @@ -5,7 +5,7 @@ import { type IncrementalMigrationMap, type Migration, } from '../../../db/migration-lite.js'; -import {initialSync} from './initial-sync.js'; +import {initialSync, type InitialSyncOptions} from './initial-sync.js'; import type {ShardConfig} from './shard-config.js'; export async function initSyncSchema( @@ -14,9 +14,11 @@ export async function initSyncSchema( shard: ShardConfig, dbPath: string, upstreamURI: string, + syncOptions: InitialSyncOptions, ): Promise { const setupMigration: Migration = { - migrateSchema: (log, tx) => initialSync(log, shard, tx, upstreamURI), + migrateSchema: (log, tx) => + initialSync(log, shard, tx, upstreamURI, syncOptions), minSafeVersion: 1, };