Skip to content

Commit

Permalink
fix(store): prevent writing to state once action handler is unsubscri…
Browse files Browse the repository at this point in the history
…bed (#2231)

In this commit, we update the implementation of action invocation. The key addition is that
we now prevent writing to the state whenever an action handler is unsubscribed (completed or
cancelled). Since we have a "unique" state context object for each action being invoked, we
can set its `setState` and `patchState` functions to no-ops, essentially making them do
nothing when invoked.
  • Loading branch information
arturovt authored Oct 25, 2024
1 parent e347595 commit f22f4cb
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .bundlemonrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
},
{
"path": "./fesm2022/ngxs-store.mjs",
"maxSize": "102kB",
"maxSize": "103kB",
"maxPercentIncrease": 0.5
}
],
Expand Down
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ $ npm install @ngxs/store@dev

### To become next patch version

- ...
- Fix(store): Prevent writing to state once action handler is unsubscribed [#2231](https://github.com/ngxs/store/pull/2231)

### 18.1.4 2024-10-23

Expand Down
3 changes: 1 addition & 2 deletions packages/store/src/actions/action-registry.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { Injectable, type OnDestroy } from '@angular/core';
import type { Observable } from 'rxjs';

// action: Instance<ActionType>.
export type ActionHandlerFn = (action: any) => void | Promise<void> | Observable<unknown>;
export type ActionHandlerFn = (action: any) => Observable<unknown>;

@Injectable({ providedIn: 'root' })
export class NgxsActionRegistry implements OnDestroy {
Expand Down
55 changes: 41 additions & 14 deletions packages/store/src/internal/state-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import {
filter,
map,
mergeMap,
shareReplay,
takeUntil
takeUntil,
finalize,
Observable
} from 'rxjs';

import { NgxsConfig } from '../symbols';
Expand Down Expand Up @@ -260,9 +261,9 @@ export class StateFactory implements OnDestroy {
/**
* Invoke actions on the states.
*/
invokeActions(action: any) {
private invokeActions(action: any): Observable<unknown[]> {
const type = getActionTypeFromInstance(action)!;
const results = [];
const results: Observable<unknown>[] = [];

// Determines whether the dispatched action has been handled, this is assigned
// to `true` within the below `for` loop if any `actionMetas` has been found.
Expand All @@ -277,7 +278,7 @@ export class StateFactory implements OnDestroy {
try {
result = actionHandler(action);
} catch (e) {
result = throwError(e);
result = throwError(() => e);
}

results.push(result);
Expand All @@ -297,7 +298,7 @@ export class StateFactory implements OnDestroy {
}

if (!results.length) {
results.push(of({}));
results.push(of(undefined));
}

return forkJoin(results);
Expand Down Expand Up @@ -344,7 +345,8 @@ export class StateFactory implements OnDestroy {
const { dispatched$ } = this._actions;
for (const actionType of Object.keys(actions)) {
const actionHandlers = actions[actionType].map(actionMeta => {
// action: Instance<ActionType>
const cancelable = !!actionMeta.options.cancelUncompleted;

return (action: any) => {
const stateContext = this._stateContextFactory.createStateContext(path);

Expand All @@ -365,27 +367,47 @@ export class StateFactory implements OnDestroy {
mergeMap((value: any) => {
if (ɵisPromise(value)) {
return from(value);
}
if (isObservable(value)) {
} else if (isObservable(value)) {
return value;
} else {
return of(value);
}
return of(value);
}),
// If this observable has completed without emitting any values,
// we wouldn't want to complete the entire chain of actions.
// If any observable completes, then the action will be canceled.
// For instance, if any action handler had a statement like
// `handler(ctx) { return EMPTY; }`, then the action would be canceled.
// See https://github.com/ngxs/store/issues/1568
defaultIfEmpty({})
// Note that we actually don't care about the return type; we only care
// about emission, and thus `undefined` is applicable by the framework.
defaultIfEmpty(undefined)
);

if (actionMeta.options.cancelUncompleted) {
result = result.pipe(takeUntil(dispatched$.pipe(ofActionDispatched(action))));
if (cancelable) {
const notifier$ = dispatched$.pipe(ofActionDispatched(action));
result = result.pipe(takeUntil(notifier$));
}

result = result.pipe(
// Note that we use the `finalize` operator only when the action handler
// returns an observable. If the action handler is synchronous, we do not
// need to set the state context functions to `noop`, as the absence of a
// return value indicates no asynchronous functionality. If the handler's
// result is unsubscribed (either because the observable has completed or it
// was unsubscribed by `takeUntil` due to a new action being dispatched),
// we prevent writing to the state context.
finalize(() => {
stateContext.setState = noop;
stateContext.patchState = noop;
})
);
} else {
result = of({}).pipe(shareReplay());
// If the action handler is synchronous and returns nothing (`void`), we
// still have to convert the result to a synchronous observable.
result = of(undefined);
}

return result;
};
});
Expand All @@ -396,3 +418,8 @@ export class StateFactory implements OnDestroy {
}
}
}

// This is used to replace `setState` and `patchState` once the action
// handler has been unsubscribed or completed, to prevent writing
// to the state context.
function noop() {}
22 changes: 22 additions & 0 deletions packages/store/tests/helpers/promise-test-helper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
export function createPromiseTestHelper<T = void>() {
type MarkResolvedFn = (result: T | PromiseLike<T>) => void;
type MarkRejectedFn = (reason?: any) => void;
let resolveFn: MarkResolvedFn = () => {};
let rejectFn: MarkRejectedFn = () => {};

const promise = new Promise<T>((resolve, reject) => {
resolveFn = resolve;
rejectFn = reject;
});
return {
promise,
markPromiseResolved(...args: Parameters<MarkResolvedFn>) {
resolveFn(...args);
resolveFn = () => {};
},
markPromiseRejected(reason?: any) {
rejectFn(reason);
rejectFn = () => {};
}
};
}
128 changes: 128 additions & 0 deletions packages/store/tests/issues/canceling-promises.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import { Injectable } from '@angular/core';
import { TestBed } from '@angular/core/testing';
import { State, Action, Store, provideStore, StateContext } from '@ngxs/store';

import { createPromiseTestHelper } from '../helpers/promise-test-helper';

describe('Canceling promises (preventing state writes)', () => {
const recorder: string[] = [];

class IncrementWithAwait {
static readonly type = 'Increment with await';
}

class IncrementWithThen {
static readonly type = 'Increment with then';
}

const { promise: promiseAwaitReady, markPromiseResolved: markPromiseAwaitReady } =
createPromiseTestHelper();

const { promise: promiseThenReady, markPromiseResolved: markPromiseThenReady } =
createPromiseTestHelper();

@State<number>({
name: 'counter',
defaults: 0
})
@Injectable()
class CounterState {
@Action(IncrementWithAwait, { cancelUncompleted: true })
async incrementWithAwait(ctx: StateContext<number>) {
recorder.push('before promise await ready');
await promiseAwaitReady;
recorder.push('after promise await ready');
ctx.setState(value => value + 1);
recorder.push(`value: ${ctx.getState()}`);
}

@Action(IncrementWithThen, { cancelUncompleted: true })
incrementWithThen(ctx: StateContext<number>) {
recorder.push('before promise then ready');
return promiseThenReady.then(() => {
recorder.push('after promise then ready');
ctx.setState(value => value + 1);
recorder.push(`value: ${ctx.getState()}`);
});
}
}

beforeEach(() => {
recorder.length = 0;

TestBed.configureTestingModule({
providers: [provideStore([CounterState])]
});
});

it('canceling promises using `await`', async () => {
// Arrange
const store = TestBed.inject(Store);

// Act
store.dispatch(new IncrementWithAwait());

// Assert
expect(recorder).toEqual(['before promise await ready']);

// Act (dispatch another action to cancel the previous one)
// The promise is not resolved yet, as thus `await` is not executed.
store.dispatch(new IncrementWithAwait());

// Assert
expect(recorder).toEqual(['before promise await ready', 'before promise await ready']);

// Act
markPromiseAwaitReady();
await promiseAwaitReady;

// Assert
expect(store.snapshot()).toEqual({ counter: 1 });
expect(recorder).toEqual([
'before promise await ready',
'before promise await ready',
// Note that once the promise is resolved, the await has been executed,
// and both microtasks have also been executed (`recorder.push(...)` is a
// microtask because it is created by `await`).
'after promise await ready',
// Value has not been updated in the state.
'value: 0',
'after promise await ready',
'value: 1'
]);
});

it('canceling promises using `then(...)`', async () => {
// Arrange
const store = TestBed.inject(Store);

// Act
store.dispatch(new IncrementWithThen());

// Assert
expect(recorder).toEqual(['before promise then ready']);

// Act (dispatch another action to cancel the previous one)
// The promise is not resolved yet, as thus `then(...)` is not executed.
store.dispatch(new IncrementWithThen());

// Assert
expect(recorder).toEqual(['before promise then ready', 'before promise then ready']);

// Act
markPromiseThenReady();
await promiseThenReady;

// Assert
expect(store.snapshot()).toEqual({ counter: 1 });
expect(recorder).toEqual([
'before promise then ready',
'before promise then ready',
'after promise then ready',
// Value has not been updated in the state.
'value: 0',
'after promise then ready',
'value: 1'
]);
});
});

0 comments on commit f22f4cb

Please sign in to comment.