Skip to content

Commit

Permalink
Merge pull request #11 from lennybakkalian/dev
Browse files Browse the repository at this point in the history
feat: add signal support for subscriptions and improve typing
  • Loading branch information
lennybakkalian authored Oct 27, 2024
2 parents b5e6f0d + cf0cf85 commit e94724c
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 29 deletions.
29 changes: 18 additions & 11 deletions projects/ngx-trpc/src/lib/rxjs-proxy/createRxjsClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,33 @@ type ResolverDef = {
errorShape: any;
};

export type Resolver<TDef extends ResolverDef> = (
export type Resolver<TDef extends ResolverDef, TOutput = RxJSObservable<TDef['output']>> = (
input: TDef['input'],
opts?: ProcedureOptions
) => RxJSObservable<TDef['output']>;
) => TOutput;

export type SignalResolver<TDef extends ResolverDef> = (
input: TDef['input'],
opts?: ProcedureOptions
) => Signal<TDef['output']>;
export type SignalResolver<TDef extends ResolverDef> = Resolver<TDef, Signal<TDef['output']>>;

type SubscriptionResolver<TDef extends ResolverDef> = (
type SubscriptionResolver<
TDef extends ResolverDef,
TOutput = RxJSObservable<YieldType<TDef['output']>>
> = (
input: TDef['input'],
opts?: Partial<TRPCSubscriptionObserver<TDef['output'], TRPCClientError<TDef>>> & ProcedureOptions
) => RxJSObservable<YieldType<TDef['output']>>;
) => TOutput;

export type SubscriptionSignalResolver<TDef extends ResolverDef> = SubscriptionResolver<
TDef,
Signal<YieldType<TDef['output']>>
>;

type DecorateProcedure<
TType extends ProcedureType,
TDef extends ResolverDef
> = TType extends 'query'
? {
query: Resolver<TDef>;
createSignal: SignalResolver<TDef>;
querySignal: SignalResolver<TDef>;
}
: TType extends 'mutation'
? {
Expand All @@ -62,6 +67,7 @@ type DecorateProcedure<
: TType extends 'subscription'
? {
subscribe: SubscriptionResolver<TDef>;
subscribeSignal: SubscriptionSignalResolver<TDef>;
}
: never;

Expand All @@ -88,9 +94,10 @@ type DecoratedProcedureRecord<TRouter extends AnyRouter, TRecord extends RouterR

const clientCallTypeMap: Record<keyof DecorateProcedure<any, any>, ProcedureType> = {
query: 'query',
createSignal: 'createSignal',
querySignal: 'querySignal',
mutate: 'mutation',
subscribe: 'subscription'
subscribe: 'subscription',
subscribeSignal: 'subscriptionSignal'
};

/** @internal */
Expand Down
39 changes: 23 additions & 16 deletions projects/ngx-trpc/src/lib/rxjs-proxy/trpc-client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {AnyRouter} from '@trpc/server';
import {Observable as TrpcObservable, share} from '@trpc/server/observable';
import {Observable as TrpcObservable} from '@trpc/server/observable';
import {
CreateTRPCClientOptions,
OperationContext,
Expand Down Expand Up @@ -46,7 +46,7 @@ export class TRPCClient<TRouter extends AnyRouter> {
}
});

return trpcObservableToRxJsObservable(opts, chain$.pipe(share()));
return trpcObservableToRxJsObservable(opts, chain$);
}

public query(path: string, input?: unknown, opts?: TRPCRequestOptions) {
Expand All @@ -59,16 +59,8 @@ export class TRPCClient<TRouter extends AnyRouter> {
});
}

public createSignal(path: string, input?: unknown, opts?: TRPCRequestOptions) {
return toSignal(
this.$request({
type: 'query',
path,
input,
context: opts?.context,
signal: opts?.signal
})
);
public querySignal(path: string, input?: unknown, opts?: TRPCRequestOptions) {
return toSignal(this.query(path, input, opts));
}

public mutation(path: string, input?: unknown, opts?: TRPCRequestOptions) {
Expand Down Expand Up @@ -102,6 +94,15 @@ export class TRPCClient<TRouter extends AnyRouter> {
signal: opts?.signal
});
}

public subscriptionSignal(
path: string,
input: unknown,
opts: Partial<TRPCSubscriptionObserver<unknown, TRPCClientError<AnyRouter>>> &
TRPCRequestOptions
) {
return toSignal(this.subscription(path, input, opts));
}
}

function trpcObservableToRxJsObservable<TInput, TOutput>(
Expand All @@ -112,9 +113,9 @@ function trpcObservableToRxJsObservable<TInput, TOutput>(
>
): RxJSObservable<TOutput> {
return new RxJSObservable<TOutput>((subscriber) => {
// create a macroTask as long as the observable is not a subscription
// create a macroTask as long as the request type is not a subscription
// it will be invoked on data or errors
const macroTask = new MacroTask(opts.type !== 'subscription');
const macroTask = new MacroTask(opts.type !== 'subscription', opts.path);

const sub = observable.subscribe({
next: (value) => {
Expand All @@ -132,8 +133,14 @@ function trpcObservableToRxJsObservable<TInput, TOutput>(
break;
}
},
error: (err) => subscriber.error(err),
complete: () => subscriber.complete()
error: (err) => {
subscriber.error(err);
macroTask.invoke();
},
complete: () => {
subscriber.complete();
macroTask.invoke();
}
});
return () => sub.unsubscribe();
});
Expand Down
9 changes: 7 additions & 2 deletions projects/ngx-trpc/src/lib/utils/macro-task.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
declare const Zone: any;

let macroTaskId = 0;

export class MacroTask {
private _macroTask: any;

constructor(public enabled = true) {
constructor(
public enabled = true,
private _name: string
) {
if (typeof Zone === 'undefined') {
// If Zone is not available, just disable the macro task
this.enabled = false;
Expand All @@ -14,7 +19,7 @@ export class MacroTask {
}

this._macroTask = Zone.current.scheduleMacroTask(
`TrpcResolve-${Math.random()}`,
`TrpcResolve-${this._name}-${macroTaskId++}`,
() => {},
{},
() => {}
Expand Down

0 comments on commit e94724c

Please sign in to comment.