Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ordered pipeline behaviour implementation fixes #270 #368

Merged
merged 1 commit into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class MediatorImpl(
handler: RequestHandlerDelegate<TRequest, TResponse>
): TResponse =
pipelineBehaviors
.reversed()
.sortedByDescending { it.order }
.fold(handler) { next, pipeline ->
{ pipeline.handle(request) { next(it) } }
}(request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,37 @@ package com.trendyol.kediatr
/**
* Interface to be implemented for a non-blocking pipeline behavior
*
* @since 1.0.12
*/
interface PipelineBehavior {
companion object {
/**
* Useful constant for the highest precedence value.
* @see java.lang.Integer.MIN_VALUE
*/
const val HIGHEST_PRECEDENCE = Int.MIN_VALUE

/**
* Useful constant for the lowest precedence value.
* @see java.lang.Integer.MAX_VALUE
*/
const val LOWEST_PRECEDENCE = Int.MAX_VALUE
}

/**
* Get the order value of this object.
*
* Higher values are interpreted as lower priority. As a consequence,
* the object with the lowest value has the highest priority.
*
* Same order values will result in arbitrary sort positions for the
* affected objects.
* @return the order value
* @see .HIGHEST_PRECEDENCE
*
* @see .LOWEST_PRECEDENCE
*/
val order: Int get() = HIGHEST_PRECEDENCE

/**
* Process to invoke before handling any query, command or notification
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ class MediatorTests : MediatorUseCases() {
ExceptionPipelineBehavior(),
LoggingPipelineBehavior(),
InheritedPipelineBehaviour(),
ParameterizedQueryHandler<Long, String>()
ParameterizedQueryHandler<Long, String>(),
FirstPipelineBehaviour(),
SecondPipelineBehaviour(),
ThirdPipelineBehaviour(),
CommandHandlerThatPassesThroughOrderedPipelineBehaviours(),
QueryHandlerThatPassesThroughOrderedPipelineBehaviours(),
NotificationHandlerThatPassesThroughOrderedPipelineBehaviours()
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,37 @@ abstract class MediatorUseCases : MediatorTestConvention() {
result shouldBe "60"
query.invocationCount() shouldBe 1
}

@Test
fun ordered_pipeline_behaviours_should_be_executed_in_order_for_command() = runTest {
val command = CommandThatPassesThroughOrderedPipelineBehaviours()
testMediator.send(command)
command.visitedPipelines() shouldBe listOf(
FirstPipelineBehaviour::class.simpleName,
SecondPipelineBehaviour::class.simpleName,
ThirdPipelineBehaviour::class.simpleName
)
}

@Test
fun ordered_pipeline_behaviours_should_be_executed_in_order_for_query() = runTest {
val query = QueryThatPassesThroughOrderedPipelineBehaviours()
testMediator.send(query)
query.visitedPipelines() shouldBe listOf(
FirstPipelineBehaviour::class.simpleName,
SecondPipelineBehaviour::class.simpleName,
ThirdPipelineBehaviour::class.simpleName
)
}

@Test
fun ordered_pipeline_behaviours_should_be_executed_in_order_for_notification() = runTest {
val notification = NotificationThatPassesThroughOrderedPipelineBehaviours()
testMediator.publish(notification)
notification.visitedPipelines() shouldBe listOf(
FirstPipelineBehaviour::class.simpleName,
SecondPipelineBehaviour::class.simpleName,
ThirdPipelineBehaviour::class.simpleName
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,12 @@ class TestCommandWithResultCommandHandler(val mediator: MediatorAccessor) : Comm
}
}

class CommandThatPassesThroughPipelineBehaviours : Command, EnrichedWithMetadata()
class CommandThatPassesThroughPipelineBehaviours :
Command,
EnrichedWithMetadata(),
CanPassLoggingPipelineBehaviour,
CanPassExceptionPipelineBehaviour,
CanPassInheritedPipelineBehaviour

class TestPipelineCommandHandler(
private val mediator: MediatorAccessor
Expand All @@ -163,7 +168,12 @@ class TestPipelineCommandHandler(
}
}

class CommandForWithoutInjectionThatPassesThroughPipelineBehaviours : Command, EnrichedWithMetadata()
class CommandForWithoutInjectionThatPassesThroughPipelineBehaviours :
Command,
EnrichedWithMetadata(),
CanPassLoggingPipelineBehaviour,
CanPassExceptionPipelineBehaviour,
CanPassInheritedPipelineBehaviour

class TestPipelineCommandHandlerWithoutInjection : CommandHandler<CommandForWithoutInjectionThatPassesThroughPipelineBehaviours> {
override suspend fun handle(command: CommandForWithoutInjectionThatPassesThroughPipelineBehaviours) {
Expand Down Expand Up @@ -293,42 +303,135 @@ class ParameterizedQueryHandler<TParam, TResponse> : QueryHandler<ParameterizedQ
/**
* Pipeline Behaviors
*/
interface CanPassExceptionPipelineBehaviour

class ExceptionPipelineBehavior : PipelineBehavior {
override suspend fun <TRequest, TResponse> handle(
request: TRequest,
next: RequestHandlerDelegate<TRequest, TResponse>
): TResponse = try {
when (request) {
is EnrichedWithMetadata -> request.visitedPipeline(this::class.java.simpleName)
is CanPassExceptionPipelineBehaviour -> {
request as EnrichedWithMetadata
request.visitedPipeline(this::class.java.simpleName)
}
}
next(request)
} catch (ex: Exception) {
throw ex
}
}

interface CanPassLoggingPipelineBehaviour

class LoggingPipelineBehavior : PipelineBehavior {
override suspend fun <TRequest, TResponse> handle(
request: TRequest,
next: RequestHandlerDelegate<TRequest, TResponse>
): TResponse {
when (request) {
is EnrichedWithMetadata -> request.visitedPipeline(this::class.java.simpleName)
is CanPassLoggingPipelineBehaviour -> {
request as EnrichedWithMetadata
request.visitedPipeline(this::class.java.simpleName)
}
}
return next(request)
}
}

abstract class MyBasePipelineBehaviour : PipelineBehavior

interface CanPassInheritedPipelineBehaviour

class InheritedPipelineBehaviour : MyBasePipelineBehaviour() {
override suspend fun <TRequest, TResponse> handle(
request: TRequest,
next: RequestHandlerDelegate<TRequest, TResponse>
): TResponse {
when (request) {
is EnrichedWithMetadata -> request.visitedPipeline(this::class.java.simpleName)
is CanPassInheritedPipelineBehaviour -> {
request as EnrichedWithMetadata
request.visitedPipeline(this::class.java.simpleName)
}
}
return next(request)
}
}

interface OrderedPipelineUseCase

class CommandThatPassesThroughOrderedPipelineBehaviours : Command, EnrichedWithMetadata(), OrderedPipelineUseCase

class QueryThatPassesThroughOrderedPipelineBehaviours : Query<String>, EnrichedWithMetadata(), OrderedPipelineUseCase

class NotificationThatPassesThroughOrderedPipelineBehaviours : Notification, EnrichedWithMetadata(), OrderedPipelineUseCase

class CommandHandlerThatPassesThroughOrderedPipelineBehaviours : CommandHandler<CommandThatPassesThroughOrderedPipelineBehaviours> {
override suspend fun handle(command: CommandThatPassesThroughOrderedPipelineBehaviours) {
command.incrementInvocationCount()
}
}

class QueryHandlerThatPassesThroughOrderedPipelineBehaviours : QueryHandler<QueryThatPassesThroughOrderedPipelineBehaviours, String> {
override suspend fun handle(query: QueryThatPassesThroughOrderedPipelineBehaviours): String {
query.incrementInvocationCount()
return "hello"
}
}

class NotificationHandlerThatPassesThroughOrderedPipelineBehaviours :
NotificationHandler<NotificationThatPassesThroughOrderedPipelineBehaviours> {
override suspend fun handle(notification: NotificationThatPassesThroughOrderedPipelineBehaviours) {
notification.incrementInvocationCount()
}
}

class FirstPipelineBehaviour : PipelineBehavior {
override val order: Int = 1

override suspend fun <TRequest, TResponse> handle(
request: TRequest,
next: RequestHandlerDelegate<TRequest, TResponse>
): TResponse {
when (request) {
is OrderedPipelineUseCase -> {
request as EnrichedWithMetadata
request.visitedPipeline(this::class.java.simpleName)
}
}
return next(request)
}
}

class SecondPipelineBehaviour : PipelineBehavior {
override val order: Int = 2

override suspend fun <TRequest, TResponse> handle(
request: TRequest,
next: RequestHandlerDelegate<TRequest, TResponse>
): TResponse {
when (request) {
is OrderedPipelineUseCase -> {
request as EnrichedWithMetadata
request.visitedPipeline(this::class.java.simpleName)
}
}
return next(request)
}
}

class ThirdPipelineBehaviour : PipelineBehavior {
override val order: Int = 3

override suspend fun <TRequest, TResponse> handle(
request: TRequest,
next: RequestHandlerDelegate<TRequest, TResponse>
): TResponse {
when (request) {
is OrderedPipelineUseCase -> {
request as EnrichedWithMetadata
request.visitedPipeline(this::class.java.simpleName)
}
}
return next(request)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,16 @@ class MediatorTests : KoinTest, MediatorUseCases() {
modules(
module {
single { KediatRKoin.getMediator() }

// Pipeline behaviours
single { InheritedPipelineBehaviour() }
single { ExceptionPipelineBehavior() }
single { LoggingPipelineBehavior() }
single { FirstPipelineBehaviour() }
single { SecondPipelineBehaviour() }
single { ThirdPipelineBehaviour() }

// Handlers
single { TestCommandHandler(get()) }
single { TestCommandWithResultCommandHandler(get()) } bind CommandWithResultHandler::class
single { TestQueryHandler(get()) } bind QueryHandler::class
Expand All @@ -40,6 +47,11 @@ class MediatorTests : KoinTest, MediatorUseCases() {
single { TestPipelineCommandHandlerWithoutInjection() } bind CommandHandler::class
single { TestPipelineCommandHandlerThatFails() } bind CommandHandler::class
single { ParameterizedQueryHandler<Long, String>() } bind QueryHandler::class
single { CommandHandlerThatPassesThroughOrderedPipelineBehaviours() } bind CommandHandler::class
single { QueryHandlerThatPassesThroughOrderedPipelineBehaviours() } bind QueryHandler::class
single { NotificationHandlerThatPassesThroughOrderedPipelineBehaviours() } bind NotificationHandler::class

// Extra
single<MediatorAccessor> { { get<Mediator>() } }
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,22 @@ class MediatorTests : MediatorUseCases() {

@Produces
fun <T, R> handler22() = ParameterizedQueryHandler<T, R>()

@Produces
fun pipeline4() = FirstPipelineBehaviour()

@Produces
fun pipeline5() = SecondPipelineBehaviour()

@Produces
fun pipeline6() = ThirdPipelineBehaviour()

@Produces
fun handler23() = CommandHandlerThatPassesThroughOrderedPipelineBehaviours()

@Produces
fun handler24() = QueryHandlerThatPassesThroughOrderedPipelineBehaviours()

@Produces
fun handler25() = NotificationHandlerThatPassesThroughOrderedPipelineBehaviours()
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,13 @@ import org.springframework.context.annotation.*
TestPipelineCommandHandlerWithoutInjection::class,
TestPipelineCommandHandlerThatFails::class,
InheritedPipelineBehaviour::class,
ParameterizedQueryHandler::class
ParameterizedQueryHandler::class,
FirstPipelineBehaviour::class,
SecondPipelineBehaviour::class,
ThirdPipelineBehaviour::class,
CommandHandlerThatPassesThroughOrderedPipelineBehaviours::class,
QueryHandlerThatPassesThroughOrderedPipelineBehaviours::class,
NotificationHandlerThatPassesThroughOrderedPipelineBehaviours::class
]
)
class MediatorTests : MediatorUseCases() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,13 @@ import org.springframework.context.annotation.*
TestPipelineCommandHandlerWithoutInjection::class,
TestPipelineCommandHandlerThatFails::class,
InheritedPipelineBehaviour::class,
ParameterizedQueryHandler::class
ParameterizedQueryHandler::class,
FirstPipelineBehaviour::class,
SecondPipelineBehaviour::class,
ThirdPipelineBehaviour::class,
CommandHandlerThatPassesThroughOrderedPipelineBehaviours::class,
QueryHandlerThatPassesThroughOrderedPipelineBehaviours::class,
NotificationHandlerThatPassesThroughOrderedPipelineBehaviours::class
]
)
class MediatorTests : MediatorUseCases() {
Expand Down