Skip to content

Commit

Permalink
Cache pipeline behaviours on mediatorImpl (#370)
Browse files Browse the repository at this point in the history
  • Loading branch information
osoykan authored Oct 30, 2024
1 parent 7aa3525 commit 94090d9
Showing 1 changed file with 15 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,50 +4,30 @@ class MediatorImpl(
private val registry: Registry,
private val defaultPublishStrategy: PublishStrategy = StopOnExceptionPublishStrategy()
) : Mediator {
override suspend fun <TQuery : Query<TResponse>, TResponse> send(query: TQuery): TResponse =
processPipeline(
registry.getPipelineBehaviors(),
query
) {
registry.resolveQueryHandler(query.javaClass).handle(query)
}
private val sortedPipelineBehaviors by lazy { registry.getPipelineBehaviors().sortedByDescending { it.order } }

override suspend fun <TCommand : Command> send(command: TCommand) =
processPipeline(
registry.getPipelineBehaviors(),
command
) {
registry.resolveCommandHandler(command.javaClass).handle(command)
}
override suspend fun <TQuery : Query<TResponse>, TResponse> send(
query: TQuery
): TResponse = handle(query) { registry.resolveQueryHandler(query.javaClass).handle(query) }

override suspend fun <TCommand : CommandWithResult<TResult>, TResult> send(command: TCommand): TResult =
processPipeline(
registry.getPipelineBehaviors(),
command
) {
registry.resolveCommandWithResultHandler(command.javaClass).handle(command)
}
override suspend fun <TCommand : Command> send(
command: TCommand
) = handle(command) { registry.resolveCommandHandler(command.javaClass).handle(command) }

override suspend fun <TCommand : CommandWithResult<TResult>, TResult> send(
command: TCommand
): TResult = handle(command) { registry.resolveCommandWithResultHandler(command.javaClass).handle(command) }

override suspend fun <T : Notification> publish(notification: T) = publish(notification, defaultPublishStrategy)

override suspend fun <T : Notification> publish(
notification: T,
publishStrategy: PublishStrategy
) = processPipeline(
registry.getPipelineBehaviors(),
notification
) {
publishStrategy.publish(notification, registry.resolveNotificationHandlers(notification.javaClass))
}
) = handle(notification) { publishStrategy.publish(notification, registry.resolveNotificationHandlers(notification.javaClass)) }

private suspend fun <TRequest, TResponse> processPipeline(
pipelineBehaviors: Collection<PipelineBehavior>,
private suspend fun <TRequest, TResponse> handle(
request: TRequest,
handler: RequestHandlerDelegate<TRequest, TResponse>
): TResponse =
pipelineBehaviors
.sortedByDescending { it.order }
.fold(handler) { next, pipeline ->
{ pipeline.handle(request) { next(it) } }
}(request)
): TResponse = sortedPipelineBehaviors
.fold(handler) { next, pipeline -> { pipeline.handle(request) { next(it) } } }(request)
}

0 comments on commit 94090d9

Please sign in to comment.