Skip to content

Commit

Permalink
feat(zero-cache): make initial-sync parameters configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
darkgnotic committed Jan 3, 2025
1 parent cb1a9de commit f29af29
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 13 deletions.
13 changes: 13 additions & 0 deletions packages/zero-cache/src/config/zero-config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,19 @@ test('zero-cache --help', () => {
ZERO_STORAGE_DB_TMP_DIR env
tmp directory for IVM operator storage. Leave unset to use os.tmpdir()
--initial-sync-table-copy-workers number default: 5
ZERO_INITIAL_SYNC_TABLE_COPY_WORKERS env
The number of parallel workers used to copy tables during initial sync.
Each worker copies a single table at a time, fetching rows in batches of
of rowBatchSize.
--initial-sync-row-batch-size number default: 10000
ZERO_INITIAL_SYNC_ROW_BATCH_SIZE env
The number of rows each table copy worker fetches at a time during
initial sync. This can be increased to speed up initial sync, or decreased
to reduce the amount of heap memory used during initial sync (e.g. for tables
with large rows).
"
`);
});
21 changes: 21 additions & 0 deletions packages/zero-cache/src/config/zero-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,27 @@ export const zeroOptions = {
],
},

initialSync: {
tableCopyWorkers: {
type: v.number().default(5),
desc: [
`The number of parallel workers used to copy tables during initial sync.`,
`Each worker copies a single table at a time, fetching rows in batches of`,
`of {bold rowBatchSize}.`,
],
},

rowBatchSize: {
type: v.number().default(10000),
desc: [
`The number of rows each table copy worker fetches at a time during`,
`initial sync. This can be increased to speed up initial sync, or decreased`,
`to reduce the amount of heap memory used during initial sync (e.g. for tables`,
`with large rows).`,
],
},
},

tenantID: {
type: v.string().optional(),
desc: ['Passed by multi/main.ts to tag the LogContext of zero-caches'],
Expand Down
1 change: 1 addition & 0 deletions packages/zero-cache/src/server/change-streamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ export default async function runWorker(
config.upstream.db,
config.shard,
config.replicaFile,
config.initialSync,
);

changeStreamer = await initializeStreamer(
Expand Down
19 changes: 19 additions & 0 deletions packages/zero-cache/src/server/multi/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ test('parse options', () => {
"db": "foo",
"maxConns": 30,
},
"initialSync": {
"rowBatchSize": 10000,
"tableCopyWorkers": 5,
},
"log": {
"format": "text",
"level": "info",
Expand Down Expand Up @@ -100,6 +104,8 @@ test('parse options', () => {
"ZERO_CHANGE_MAX_CONNS": "1",
"ZERO_CVR_DB": "foo",
"ZERO_CVR_MAX_CONNS": "30",
"ZERO_INITIAL_SYNC_ROW_BATCH_SIZE": "10000",
"ZERO_INITIAL_SYNC_TABLE_COPY_WORKERS": "5",
"ZERO_LOG_FORMAT": "text",
"ZERO_LOG_LEVEL": "info",
"ZERO_PER_USER_MUTATION_LIMIT_WINDOW_MS": "60000",
Expand Down Expand Up @@ -358,6 +364,19 @@ test('zero-cache --help', () => {
ZERO_STORAGE_DB_TMP_DIR env
tmp directory for IVM operator storage. Leave unset to use os.tmpdir()
--initial-sync-table-copy-workers number default: 5
ZERO_INITIAL_SYNC_TABLE_COPY_WORKERS env
The number of parallel workers used to copy tables during initial sync.
Each worker copies a single table at a time, fetching rows in batches of
of rowBatchSize.
--initial-sync-row-batch-size number default: 10000
ZERO_INITIAL_SYNC_ROW_BATCH_SIZE env
The number of rows each table copy worker fetches at a time during
initial sync. This can be increased to speed up initial sync, or decreased
to reduce the amount of heap memory used during initial sync (e.g. for tables
with large rows).
--tenants-json string optional
ZERO_TENANTS_JSON env
JSON encoding of per-tenant configs for running the server in multi-tenant mode:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ describe('change-source/pg/end-to-mid-test', () => {
upstreamURI,
{id: SHARD_ID, publications: ['zero_some_public', 'zero_all_test']},
replicaDbFile.path,
{tableCopyWorkers: 5, rowBatchSize: 10000},
)
).changeSource;
const stream = await source.startStream('00');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ describe('change-source/pg', () => {
upstreamURI,
{id: SHARD_ID, publications: ['zero_foo', 'zero_zero']},
replicaDbFile.path,
{tableCopyWorkers: 5, rowBatchSize: 10000},
)
).changeSource;
});
Expand Down Expand Up @@ -611,6 +612,7 @@ describe('change-source/pg', () => {
upstreamURI,
{id: SHARD_ID, publications: ['zero_different_publication']},
replicaDbFile.path,
{tableCopyWorkers: 5, rowBatchSize: 10000},
);
} catch (e) {
err = e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ import type {
import type {Data, DownstreamChange} from '../change-streamer.js';
import type {DataChange, Identifier, MessageDelete} from '../schema/change.js';
import {AutoResetSignal, type ReplicationConfig} from '../schema/tables.js';
import {replicationSlot} from './initial-sync.js';
import {replicationSlot, type InitialSyncOptions} from './initial-sync.js';
import {fromLexiVersion, toLexiVersion, type LSN} from './lsn.js';
import {replicationEventSchema, type DdlUpdateEvent} from './schema/ddl.js';
import {updateShardSchema} from './schema/init.js';
Expand All @@ -82,13 +82,15 @@ export async function initializeChangeSource(
upstreamURI: string,
shard: ShardConfig,
replicaDbFile: string,
syncOptions: InitialSyncOptions,
): Promise<{replicationConfig: ReplicationConfig; changeSource: ChangeSource}> {
await initSyncSchema(
lc,
`replica-${shard.id}`,
shard,
replicaDbFile,
upstreamURI,
syncOptions,
);

const replica = new Database(lc, replicaDbFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,7 @@ describe('replicator/initial-sync', () => {
{id: SHARD_ID, publications: c.requestedPublications ?? []},
replica,
getConnectionURI(upstream),
{tableCopyWorkers: 5, rowBatchSize: 10000},
);

const result = await upstream.unsafe(
Expand Down Expand Up @@ -1004,7 +1005,10 @@ describe('replicator/initial-sync', () => {

let result;
try {
await initialSync(lc, shardConfig, replica, getConnectionURI(upstream));
await initialSync(lc, shardConfig, replica, getConnectionURI(upstream), {
tableCopyWorkers: 5,
rowBatchSize: 10000,
});
} catch (e) {
result = e;
}
Expand All @@ -1025,6 +1029,7 @@ describe('replicator/initial-sync', () => {
{id, publications: []},
replica,
getConnectionURI(upstream),
{tableCopyWorkers: 5, rowBatchSize: 10000},
);
} catch (e) {
result = e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ import {
} from './schema/shard.js';
import type {ShardConfig} from './shard-config.js';

export type InitialSyncOptions = {
tableCopyWorkers: number;
rowBatchSize: number;
};

// https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION
const ALLOWED_SHARD_ID_CHARACTERS = /^[a-z0-9_]+$/;

Expand All @@ -46,14 +51,16 @@ export async function initialSync(
shard: ShardConfig,
tx: Database,
upstreamURI: string,
syncOptions: InitialSyncOptions,
) {
if (!ALLOWED_SHARD_ID_CHARACTERS.test(shard.id)) {
throw new Error(
'A shard ID may only consist of lower-case letters, numbers, and the underscore character',
);
}
const {tableCopyWorkers: numWorkers, rowBatchSize} = syncOptions;
const upstreamDB = pgClient(lc, upstreamURI, {
max: MAX_WORKERS,
max: numWorkers,
});
const replicationSession = pgClient(lc, upstreamURI, {
// eslint-disable-next-line @typescript-eslint/naming-convention
Expand All @@ -71,7 +78,7 @@ export async function initialSync(
await createReplicationSlot(lc, shard.id, replicationSession);

// Run up to MAX_WORKERS to copy of tables at the replication slot's snapshot.
const copiers = startTableCopyWorkers(lc, upstreamDB, snapshot);
const copiers = startTableCopyWorkers(lc, upstreamDB, snapshot, numWorkers);
let published: PublicationInfo;
try {
// Retrieve the published schema at the consistent_point.
Expand All @@ -87,7 +94,9 @@ export async function initialSync(
createLiteIndices(tx, indexes);
await Promise.all(
tables.map(table =>
copiers.process(db => copy(lc, table, db, tx).then(() => [])),
copiers.process(db =>
copy(lc, table, db, tx, rowBatchSize).then(() => []),
),
),
);
} finally {
Expand Down Expand Up @@ -182,17 +191,13 @@ async function createReplicationSlot(
return slot;
}

// TODO: Consider parameterizing these.
const MAX_WORKERS = 5;
const BATCH_SIZE = 100_000;

function startTableCopyWorkers(
lc: LogContext,
db: PostgresDB,
snapshot: string,
numWorkers: number,
): TransactionPool {
const {init} = importSnapshot(snapshot);
const numWorkers = MAX_WORKERS;
const tableCopiers = new TransactionPool(
lc,
Mode.READONLY,
Expand Down Expand Up @@ -223,6 +228,7 @@ async function copy(
table: PublishedTableSpec,
from: PostgresDB,
to: Database,
rowBatchSize: number,
) {
let totalRows = 0;
const tableName = liteTableName(table);
Expand Down Expand Up @@ -252,7 +258,7 @@ async function copy(

lc.info?.(`Starting copy of ${tableName}:`, selectStmt);

const cursor = from.unsafe(selectStmt).cursor(BATCH_SIZE);
const cursor = from.unsafe(selectStmt).cursor(rowBatchSize);
for await (const rows of cursor) {
for (const row of rows) {
insertStmt.run([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ describe('change-streamer/pg/sync-schema', () => {
{id: SHARD_ID, publications: c.requestedPublications ?? []},
replicaFile.path,
getConnectionURI(upstream),
{tableCopyWorkers: 5, rowBatchSize: 10000},
);

await expectTables(upstream, c.upstreamPostState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
type IncrementalMigrationMap,
type Migration,
} from '../../../db/migration-lite.js';
import {initialSync} from './initial-sync.js';
import {initialSync, type InitialSyncOptions} from './initial-sync.js';
import type {ShardConfig} from './shard-config.js';

export async function initSyncSchema(
Expand All @@ -14,9 +14,11 @@ export async function initSyncSchema(
shard: ShardConfig,
dbPath: string,
upstreamURI: string,
syncOptions: InitialSyncOptions,
): Promise<void> {
const setupMigration: Migration = {
migrateSchema: (log, tx) => initialSync(log, shard, tx, upstreamURI),
migrateSchema: (log, tx) =>
initialSync(log, shard, tx, upstreamURI, syncOptions),
minSafeVersion: 1,
};

Expand Down

0 comments on commit f29af29

Please sign in to comment.