Skip to content

Commit

Permalink
chore(replicache): Test for subscribe/watch when closed (#2605)
Browse files Browse the repository at this point in the history
This is a test for #2591
  • Loading branch information
arv authored Oct 9, 2024
1 parent 3138e8e commit df92994
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 14 deletions.
26 changes: 13 additions & 13 deletions packages/replicache/src/replicache-impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ export class ReplicacheImpl<MD extends MutatorDefs = {}> {
/** The name of the Replicache database. Populated by {@link ReplicacheOptions#name}. */
readonly name: string;

readonly subscriptions: SubscriptionsManager;
readonly #subscriptions: SubscriptionsManager;
readonly #mutationRecovery: MutationRecovery;

/**
Expand Down Expand Up @@ -423,7 +423,7 @@ export class ReplicacheImpl<MD extends MutatorDefs = {}> {
'replicache version': version,
});

this.subscriptions = new SubscriptionsManagerImpl(
this.#subscriptions = new SubscriptionsManagerImpl(
this.#queryInternal,
this.#lc,
this.#closeAbortController.signal,
Expand Down Expand Up @@ -701,7 +701,7 @@ export class ReplicacheImpl<MD extends MutatorDefs = {}> {
this.#pullConnectionLoop.close();
this.#pushConnectionLoop.close();

this.subscriptions.clear();
this.#subscriptions.clear();

await Promise.all(closingPromises);
closingInstances.delete(this.name);
Expand All @@ -724,13 +724,13 @@ export class ReplicacheImpl<MD extends MutatorDefs = {}> {
lc,
syncHead,
clientID,
this.subscriptions,
this.#subscriptions,
FormatVersion.Latest,
);

if (!replayMutations || replayMutations.length === 0) {
// All done.
await this.subscriptions.fire(diffs);
await this.#subscriptions.fire(diffs);
void this.#schedulePersist();
return;
}
Expand All @@ -740,7 +740,7 @@ export class ReplicacheImpl<MD extends MutatorDefs = {}> {
// TODO(greg): I'm not sure why this was in Replicache#_mutate...
// Ensure that we run initial pending subscribe functions before starting a
// write transaction.
if (this.subscriptions.hasPendingSubscriptionRuns) {
if (this.#subscriptions.hasPendingSubscriptionRuns) {
await Promise.resolve();
}
const {meta} = mutation;
Expand Down Expand Up @@ -1176,7 +1176,7 @@ export class ReplicacheImpl<MD extends MutatorDefs = {}> {
this.perdag,
clientID,
this.#mutatorRegistry,
this.subscriptions,
this.#subscriptions,
() => this.closed,
FormatVersion.Latest,
);
Expand All @@ -1190,7 +1190,7 @@ export class ReplicacheImpl<MD extends MutatorDefs = {}> {
}
}
if (diffs !== undefined) {
await this.subscriptions.fire(diffs);
await this.#subscriptions.fire(diffs);
}
}

Expand Down Expand Up @@ -1318,7 +1318,7 @@ export class ReplicacheImpl<MD extends MutatorDefs = {}> {
}

const {onData, onError, onDone, isEqual} = options;
return this.subscriptions.add(
return this.#subscriptions.add(
new SubscriptionImpl(body, onData, onError, onDone, isEqual),
);
}
Expand Down Expand Up @@ -1347,7 +1347,7 @@ export class ReplicacheImpl<MD extends MutatorDefs = {}> {
callback: WatchCallbackForOptions<Options>,
options?: Options,
): () => void {
return this.subscriptions.add(
return this.#subscriptions.add(
new WatchSubscription(callback as WatchCallback, options),
);
}
Expand Down Expand Up @@ -1433,7 +1433,7 @@ export class ReplicacheImpl<MD extends MutatorDefs = {}> {

// Ensure that we run initial pending subscribe functions before starting a
// write transaction.
if (this.subscriptions.hasPendingSubscriptionRuns) {
if (this.#subscriptions.hasPendingSubscriptionRuns) {
await Promise.resolve();
}

Expand Down Expand Up @@ -1467,15 +1467,15 @@ export class ReplicacheImpl<MD extends MutatorDefs = {}> {
const lastMutationID = await dbWrite.getMutationID();
const diffs = await dbWrite.commitWithDiffs(
DEFAULT_HEAD_NAME,
this.subscriptions,
this.#subscriptions,
);

// Update this after the commit in case the commit fails.
this.lastMutationID = lastMutationID;

// Send is not supposed to reject
this.#pushConnectionLoop.send(false).catch(() => void 0);
await this.subscriptions.fire(diffs);
await this.#subscriptions.fire(diffs);
void this.#schedulePersist();
return result;
} catch (ex) {
Expand Down
27 changes: 26 additions & 1 deletion packages/replicache/src/replicache.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import type {Context, LogLevel} from '@rocicorp/logger';
import {resolver} from '@rocicorp/resolver';
import {assert as chaiAssert, expect} from 'chai';
import * as sinon from 'sinon';
import {assert} from '../../shared/src/asserts.js';
import type {JSONValue, ReadonlyJSONValue} from '../../shared/src/json.js';
import {promiseVoid} from '../../shared/src/resolved-promises.js';
import {sleep} from '../../shared/src/sleep.js';
import * as sinon from 'sinon';
import {asyncIterableToArray} from './async-iterable-to-array.js';
import {Write} from './db/write.js';
import {TestMemStore} from './kv/test-mem-store.js';
Expand Down Expand Up @@ -2412,3 +2412,28 @@ test('set with undefined key', async () => {
// eslint-disable-next-line no-sparse-arrays
expect(await set([1, , 2])).instanceOf(TypeError);
});

test('subscribe while closing', async () => {
// This tests that we do not try to open an IndexedDB transaction after the
// database has been closed.
const rep = await replicacheForTesting('subscribe-while-closing', {
mutators: {addData},
});
await rep.mutate.addData({a: 1});
const p = rep.close();
const query = sinon.fake();
const onData = sinon.fake();
const watchCallback = sinon.fake();
const unsubscribe = rep.subscribe(query, onData);
const unwatch = rep.experimentalWatch(watchCallback);

await clock.tickAsync(10);

await p;
unsubscribe();
unwatch();

expect(query.callCount).to.equal(0);
expect(onData.callCount).to.equal(0);
expect(watchCallback.callCount).to.equal(0);
});

0 comments on commit df92994

Please sign in to comment.