Skip to content

Commit

Permalink
feat(zero-cache): support invalidation of replication slots via `max_…
Browse files Browse the repository at this point in the history
…slot_wal_keep_size` (#3464)
  • Loading branch information
darkgnotic authored Dec 31, 2024
1 parent 331235b commit 5a04758
Showing 1 changed file with 9 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import type {Data, DownstreamChange} from '../change-streamer.js';
import type {DataChange, Identifier, MessageDelete} from '../schema/change.js';
import {AutoResetSignal, type ReplicationConfig} from '../schema/tables.js';
import {replicationSlot} from './initial-sync.js';
import {fromLexiVersion, toLexiVersion} from './lsn.js';
import {fromLexiVersion, toLexiVersion, type LSN} from './lsn.js';
import {replicationEventSchema, type DdlUpdateEvent} from './schema/ddl.js';
import {updateShardSchema} from './schema/init.js';
import {getPublicationInfo, type PublishedSchema} from './schema/published.js';
Expand Down Expand Up @@ -133,11 +133,17 @@ async function checkAndUpdateUpstream(
shard: ShardConfig,
) {
const slot = replicationSlot(shard.id);
const result = await db<{pid: string | null}[]>`
SELECT slot_name FROM pg_replication_slots WHERE slot_name = ${slot}`;
const result = await db<{restartLSN: LSN | null}[]>`
SELECT restart_lsn as "restartLSN" FROM pg_replication_slots WHERE slot_name = ${slot}`;
if (result.length === 0) {
throw new AutoResetSignal(`replication slot ${slot} is missing`);
}
const [{restartLSN}] = result;
if (restartLSN === null) {
throw new AutoResetSignal(
`replication slot ${slot} has been invalidated for exceeding the max_slot_wal_keep_size`,
);
}
// Perform any shard schema updates
await updateShardSchema(lc, db, {
id: shard.id,
Expand Down

0 comments on commit 5a04758

Please sign in to comment.