Skip to content

Commit

Permalink
refactor(Stream): use more common Types.TupleOf instead of Stream.Dyn…
Browse files Browse the repository at this point in the history
…amicTuple (#2958)
  • Loading branch information
dilame authored and gcanti committed Jul 8, 2024
1 parent 24f3a75 commit 4dbadc3
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 23 deletions.
5 changes: 5 additions & 0 deletions .changeset/stream-tuple.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": minor
---

refactor(Stream): use new built-in `Types.TupleOf` instead of `Stream.DynamicTuple` and deprecate it
16 changes: 9 additions & 7 deletions packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import type * as Emit from "./StreamEmit.js"
import type * as HaltStrategy from "./StreamHaltStrategy.js"
import type * as Take from "./Take.js"
import type * as Tracer from "./Tracer.js"
import type { Covariant, NoInfer } from "./Types.js"
import type { Covariant, NoInfer, TupleOf } from "./Types.js"
import type * as Unify from "./Unify.js"

/**
Expand Down Expand Up @@ -146,13 +146,15 @@ export declare namespace Stream {
/**
* @since 2.0.0
* @category models
* @deprecated use Types.TupleOf instead
*/
export type DynamicTuple<T, N extends number> = N extends N ? number extends N ? Array<T> : DynamicTupleOf<T, N, []>
: never

/**
* @since 2.0.0
* @category models
* @deprecated use Types.TupleOf instead
*/
export type DynamicTupleOf<T, N extends number, R extends Array<unknown>> = R["length"] extends N ? R
: DynamicTupleOf<T, N, [T, ...R]>
Expand Down Expand Up @@ -359,12 +361,12 @@ export const broadcast: {
maximumLag: number
): <A, E, R>(
self: Stream<A, E, R>
) => Effect.Effect<Stream.DynamicTuple<Stream<A, E>, N>, never, Scope.Scope | R>
) => Effect.Effect<TupleOf<N, Stream<A, E>>, never, Scope.Scope | R>
<A, E, R, N extends number>(
self: Stream<A, E, R>,
n: N,
maximumLag: number
): Effect.Effect<Stream.DynamicTuple<Stream<A, E>, N>, never, Scope.Scope | R>
): Effect.Effect<TupleOf<N, Stream<A, E>>, never, Scope.Scope | R>
} = internal.broadcast

/**
Expand Down Expand Up @@ -396,12 +398,12 @@ export const broadcastedQueues: {
maximumLag: number
): <A, E, R>(
self: Stream<A, E, R>
) => Effect.Effect<Stream.DynamicTuple<Queue.Dequeue<Take.Take<A, E>>, N>, never, R | Scope.Scope>
) => Effect.Effect<TupleOf<N, Queue.Dequeue<Take.Take<A, E>>>, never, R | Scope.Scope>
<A, E, R, N extends number>(
self: Stream<A, E, R>,
n: N,
maximumLag: number
): Effect.Effect<Stream.DynamicTuple<Queue.Dequeue<Take.Take<A, E>>, N>, never, Scope.Scope | R>
): Effect.Effect<TupleOf<N, Queue.Dequeue<Take.Take<A, E>>>, never, Scope.Scope | R>
} = internal.broadcastedQueues

/**
Expand Down Expand Up @@ -893,15 +895,15 @@ export const distributedWith: {
}
): <E, R>(
self: Stream<A, E, R>
) => Effect.Effect<Stream.DynamicTuple<Queue.Dequeue<Exit.Exit<A, Option.Option<E>>>, N>, never, Scope.Scope | R>
) => Effect.Effect<TupleOf<N, Queue.Dequeue<Exit.Exit<A, Option.Option<E>>>>, never, Scope.Scope | R>
<A, E, R, N extends number>(
self: Stream<A, E, R>,
options: {
readonly size: N
readonly maximumLag: number
readonly decide: (a: A) => Effect.Effect<Predicate<number>>
}
): Effect.Effect<Stream.DynamicTuple<Queue.Dequeue<Exit.Exit<A, Option.Option<E>>>, N>, never, Scope.Scope | R>
): Effect.Effect<TupleOf<N, Queue.Dequeue<Exit.Exit<A, Option.Option<E>>>>, never, Scope.Scope | R>
} = internal.distributedWith

/**
Expand Down
29 changes: 13 additions & 16 deletions packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import * as HaltStrategy from "../StreamHaltStrategy.js"
import type * as Take from "../Take.js"
import type * as Tracer from "../Tracer.js"
import * as Tuple from "../Tuple.js"
import type { NoInfer } from "../Types.js"
import type { NoInfer, TupleOf } from "../Types.js"
import * as channel from "./channel.js"
import * as channelExecutor from "./channel/channelExecutor.js"
import * as MergeStrategy from "./channel/mergeStrategy.js"
Expand Down Expand Up @@ -685,25 +685,22 @@ export const broadcast = dual<
maximumLag: number
) => <A, E, R>(
self: Stream.Stream<A, E, R>
) => Effect.Effect<Stream.Stream.DynamicTuple<Stream.Stream<A, E>, N>, never, Scope.Scope | R>,
) => Effect.Effect<TupleOf<N, Stream.Stream<A, E>>, never, Scope.Scope | R>,
<A, E, R, N extends number>(
self: Stream.Stream<A, E, R>,
n: N,
maximumLag: number
) => Effect.Effect<Stream.Stream.DynamicTuple<Stream.Stream<A, E>, N>, never, Scope.Scope | R>
) => Effect.Effect<TupleOf<N, Stream.Stream<A, E>>, never, Scope.Scope | R>
>(3, <A, E, R, N extends number>(
self: Stream.Stream<A, E, R>,
n: N,
maximumLag: number
): Effect.Effect<Stream.Stream.DynamicTuple<Stream.Stream<A, E>, N>, never, Scope.Scope | R> =>
): Effect.Effect<TupleOf<N, Stream.Stream<A, E>>, never, Scope.Scope | R> =>
pipe(
self,
broadcastedQueues(n, maximumLag),
Effect.map((tuple) =>
tuple.map((queue) => flattenTake(fromQueue(queue, { shutdown: true }))) as Stream.Stream.DynamicTuple<
Stream.Stream<A, E>,
N
>
tuple.map((queue) => flattenTake(fromQueue(queue, { shutdown: true }))) as TupleOf<N, Stream.Stream<A, E>>
)
))

Expand Down Expand Up @@ -733,21 +730,21 @@ export const broadcastedQueues = dual<
maximumLag: number
) => <A, E, R>(
self: Stream.Stream<A, E, R>
) => Effect.Effect<Stream.Stream.DynamicTuple<Queue.Dequeue<Take.Take<A, E>>, N>, never, Scope.Scope | R>,
) => Effect.Effect<TupleOf<N, Queue.Dequeue<Take.Take<A, E>>>, never, Scope.Scope | R>,
<A, E, R, N extends number>(
self: Stream.Stream<A, E, R>,
n: N,
maximumLag: number
) => Effect.Effect<Stream.Stream.DynamicTuple<Queue.Dequeue<Take.Take<A, E>>, N>, never, Scope.Scope | R>
) => Effect.Effect<TupleOf<N, Queue.Dequeue<Take.Take<A, E>>>, never, Scope.Scope | R>
>(3, <A, E, R, N extends number>(
self: Stream.Stream<A, E, R>,
n: N,
maximumLag: number
): Effect.Effect<Stream.Stream.DynamicTuple<Queue.Dequeue<Take.Take<A, E>>, N>, never, Scope.Scope | R> =>
): Effect.Effect<TupleOf<N, Queue.Dequeue<Take.Take<A, E>>>, never, Scope.Scope | R> =>
Effect.flatMap(PubSub.bounded<Take.Take<A, E>>(maximumLag), (pubsub) =>
pipe(
Effect.all(Array.from({ length: n }, () => PubSub.subscribe(pubsub))) as Effect.Effect<
Stream.Stream.DynamicTuple<Queue.Dequeue<Take.Take<A, E>>, N>,
TupleOf<N, Queue.Dequeue<Take.Take<A, E>>>,
never,
R
>,
Expand Down Expand Up @@ -1764,7 +1761,7 @@ export const distributedWith = dual<
) => <E, R>(
self: Stream.Stream<A, E, R>
) => Effect.Effect<
Stream.Stream.DynamicTuple<Queue.Dequeue<Exit.Exit<A, Option.Option<E>>>, N>,
TupleOf<N, Queue.Dequeue<Exit.Exit<A, Option.Option<E>>>>,
never,
Scope.Scope | R
>,
Expand All @@ -1776,7 +1773,7 @@ export const distributedWith = dual<
readonly decide: (a: A) => Effect.Effect<Predicate<number>>
}
) => Effect.Effect<
Stream.Stream.DynamicTuple<Queue.Dequeue<Exit.Exit<A, Option.Option<E>>>, N>,
TupleOf<N, Queue.Dequeue<Exit.Exit<A, Option.Option<E>>>>,
never,
Scope.Scope | R
>
Expand All @@ -1790,7 +1787,7 @@ export const distributedWith = dual<
readonly decide: (a: A) => Effect.Effect<Predicate<number>>
}
): Effect.Effect<
Stream.Stream.DynamicTuple<Queue.Dequeue<Exit.Exit<A, Option.Option<E>>>, N>,
TupleOf<N, Queue.Dequeue<Exit.Exit<A, Option.Option<E>>>>,
never,
Scope.Scope | R
> =>
Expand Down Expand Up @@ -1829,7 +1826,7 @@ export const distributedWith = dual<
Deferred.succeed(deferred, (a: A) =>
Effect.map(options.decide(a), (f) => (key: number) => pipe(f(mappings.get(key)!)))),
Effect.as(
Array.from(queues) as Stream.Stream.DynamicTuple<Queue.Dequeue<Exit.Exit<A, Option.Option<E>>>, N>
Array.from(queues) as TupleOf<N, Queue.Dequeue<Exit.Exit<A, Option.Option<E>>>>
)
)
})
Expand Down

0 comments on commit 4dbadc3

Please sign in to comment.