From 5a04758e3d35ff0164341beadff89d585d3caf45 Mon Sep 17 00:00:00 2001 From: Darick Tong <132324914+darkgnotic@users.noreply.github.com> Date: Tue, 31 Dec 2024 17:02:49 +0900 Subject: [PATCH] feat(zero-cache): support invalidation of replication slots via `max_slot_wal_keep_size` (#3464) --- .../src/services/change-streamer/pg/change-source.ts | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/packages/zero-cache/src/services/change-streamer/pg/change-source.ts b/packages/zero-cache/src/services/change-streamer/pg/change-source.ts index 5eb810761..380045aa4 100644 --- a/packages/zero-cache/src/services/change-streamer/pg/change-source.ts +++ b/packages/zero-cache/src/services/change-streamer/pg/change-source.ts @@ -56,7 +56,7 @@ import type {Data, DownstreamChange} from '../change-streamer.js'; import type {DataChange, Identifier, MessageDelete} from '../schema/change.js'; import {AutoResetSignal, type ReplicationConfig} from '../schema/tables.js'; import {replicationSlot} from './initial-sync.js'; -import {fromLexiVersion, toLexiVersion} from './lsn.js'; +import {fromLexiVersion, toLexiVersion, type LSN} from './lsn.js'; import {replicationEventSchema, type DdlUpdateEvent} from './schema/ddl.js'; import {updateShardSchema} from './schema/init.js'; import {getPublicationInfo, type PublishedSchema} from './schema/published.js'; @@ -133,11 +133,17 @@ async function checkAndUpdateUpstream( shard: ShardConfig, ) { const slot = replicationSlot(shard.id); - const result = await db<{pid: string | null}[]>` - SELECT slot_name FROM pg_replication_slots WHERE slot_name = ${slot}`; + const result = await db<{restartLSN: LSN | null}[]>` + SELECT restart_lsn as "restartLSN" FROM pg_replication_slots WHERE slot_name = ${slot}`; if (result.length === 0) { throw new AutoResetSignal(`replication slot ${slot} is missing`); } + const [{restartLSN}] = result; + if (restartLSN === null) { + throw new AutoResetSignal( + `replication slot ${slot} has been invalidated for exceeding the max_slot_wal_keep_size`, + ); + } // Perform any shard schema updates await updateShardSchema(lc, db, { id: shard.id,