Skip to content

Commit

Permalink
Add a wrapped event scheduler to add information for the message flow.
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerard Klijs committed Jul 4, 2024
1 parent 364a690 commit d7046f8
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
package io.axoniq.console.framework.starter

import io.axoniq.console.framework.AxoniqConsoleConfigurerModule
import io.axoniq.console.framework.messaging.AxoniqConsoleWrappedEventScheduler
import io.axoniq.console.framework.messaging.HandlerMetricsRegistry
import io.axoniq.console.framework.messaging.SpanMatcher.Companion.getSpanMatcherPredicateMap
import io.axoniq.console.framework.messaging.SpanMatcherPredicateMap
import org.axonframework.config.ConfigurerModule
import org.axonframework.eventhandling.scheduling.EventScheduler
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.config.BeanPostProcessor
import org.springframework.boot.autoconfigure.AutoConfiguration
Expand Down Expand Up @@ -51,9 +54,7 @@ class AxoniqConsoleAutoConfiguration {
logger.warn("The credentials for the connection to AxonIQ Console don't have the right format. Please provide them as instructed through the 'axoniq.console.credentials' property.")
return ConfigurerModule { }
}
val applicationName = (properties.applicationName?.trim()?.ifEmpty { null })
?: (applicationContext.applicationName.trim().ifEmpty { null })
?: (applicationContext.id?.removeSuffix("-1"))
val applicationName = getApplicationName(properties, applicationContext)
if (applicationName == null) {
logger.warn("Was unable to determine your application's name. Please provide it through the 'axoniq.console.application-name' property.")
return ConfigurerModule { }
Expand Down Expand Up @@ -86,10 +87,40 @@ class AxoniqConsoleAutoConfiguration {
@Bean
@ConditionalOnProperty("axoniq.console.credentials", matchIfMissing = false)
fun axoniqConsoleSpanFactoryPostProcessor(
spanMatcherPredicateMap: SpanMatcherPredicateMap
spanMatcherPredicateMap: SpanMatcherPredicateMap,
configuration: org.axonframework.config.Configuration,
properties: AxoniqConsoleSpringProperties,
applicationContext: ApplicationContext
): BeanPostProcessor = object : BeanPostProcessor {
override fun postProcessAfterInitialization(bean: Any, beanName: String): Any {
return PostProcessHelper.enhance(bean, beanName, spanMatcherPredicateMap)
return when {
bean is EventScheduler -> enhanceEventScheduler(bean, configuration, properties, applicationContext)
else -> PostProcessHelper.enhance(bean, beanName, spanMatcherPredicateMap)
}
}
}

private fun enhanceEventScheduler(
eventScheduler: EventScheduler,
configuration: org.axonframework.config.Configuration,
properties: AxoniqConsoleSpringProperties,
applicationContext: ApplicationContext
): EventScheduler {
return if (eventScheduler is AxoniqConsoleWrappedEventScheduler) {
eventScheduler
} else {
getApplicationName(properties, applicationContext)?.let {
AxoniqConsoleWrappedEventScheduler(eventScheduler, configuration.getComponent(HandlerMetricsRegistry::class.java), it)
} ?: eventScheduler
}
}

private fun getApplicationName(
properties: AxoniqConsoleSpringProperties,
applicationContext: ApplicationContext
): String? {
return (properties.applicationName?.trim()?.ifEmpty { null })
?: (applicationContext.applicationName.trim().ifEmpty { null })
?: (applicationContext.id?.removeSuffix("-1"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.axoniq.console.framework.eventprocessor.metrics.ProcessorMetricsRegistry;
import io.axoniq.console.framework.messaging.AxoniqConsoleDispatchInterceptor;
import io.axoniq.console.framework.messaging.AxoniqConsoleSpanFactory;
import io.axoniq.console.framework.messaging.AxoniqConsoleWrappedEventScheduler;
import io.axoniq.console.framework.messaging.HandlerMetricsRegistry;
import io.axoniq.console.framework.messaging.SpanMatcher;
import io.axoniq.console.framework.messaging.SpanMatcherPredicateMap;
Expand All @@ -41,12 +42,14 @@
import org.axonframework.config.Configuration;
import org.axonframework.config.Configurer;
import org.axonframework.config.ConfigurerModule;
import org.axonframework.eventhandling.scheduling.EventScheduler;
import org.axonframework.tracing.SpanFactory;
import org.jetbrains.annotations.NotNull;

import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -76,6 +79,7 @@ public class AxoniqConsoleConfigurerModule implements ConfigurerModule {
private final ExecutorService managementTaskExecutor;
private final boolean configureSpanFactory;
private final SpanMatcherPredicateMap spanMatcherPredicateMap;
private final EventScheduler eventScheduler;

/**
* Creates the {@link AxoniqConsoleConfigurerModule} with the given {@code builder}.
Expand All @@ -96,6 +100,7 @@ protected AxoniqConsoleConfigurerModule(Builder builder) {
this.managementTaskExecutor = builder.managementTaskExecutor;
this.configureSpanFactory = !builder.disableSpanFactoryInConfiguration;
this.spanMatcherPredicateMap = builder.spanMatcherPredicateMap;
this.eventScheduler = builder.eventScheduler;
}

/**
Expand Down Expand Up @@ -205,6 +210,15 @@ public void configureModule(@NotNull Configurer configurer) {
configurer.registerComponent(SpanFactory.class, c -> new AxoniqConsoleSpanFactory(spanMatcherPredicateMap));
}

if (Objects.nonNull(eventScheduler)) {
configurer.registerComponent(
EventScheduler.class,
c -> new AxoniqConsoleWrappedEventScheduler(
eventScheduler,
c.getComponent(HandlerMetricsRegistry.class),
applicationName));
}

configurer.onInitialize(c -> {
c.getComponent(ServerProcessorReporter.class);
c.getComponent(RSocketProcessorResponder.class);
Expand Down Expand Up @@ -257,6 +271,7 @@ public static class Builder {

private ExecutorService managementTaskExecutor;
private Integer managementMaxThreadPoolSize = 5;
private EventScheduler eventScheduler;

/**
* Constructor to instantiate a {@link Builder} based on the fields contained in the
Expand Down Expand Up @@ -315,9 +330,9 @@ public Builder dlqMode(AxoniqConsoleDlqMode dlqMode) {
}

/**
* Adding a key to the diagnostics whitelist. Will only be used in combination with setting the {@code dlqMode} to
* {@link AxoniqConsoleDlqMode#LIMITED}. It will filter the diagnostics, and only show the ones included in the
* whitelist.
* Adding a key to the diagnostics whitelist. Will only be used in combination with setting the {@code dlqMode}
* to {@link AxoniqConsoleDlqMode#LIMITED}. It will filter the diagnostics, and only show the ones included in
* the whitelist.
*
* @param key The value to add to the whitelist
* @return The builder for fluent interfacing
Expand Down Expand Up @@ -435,6 +450,18 @@ public Builder secure(boolean secure) {
return this;
}

/**
* Set the event scheduler, this will be wrapped to be able to include these messages in the message flow.
*
* @param eventScheduler the event scheduler to use
* @return The builder for fluent interfacing
*/
public Builder eventScheduler(EventScheduler eventScheduler) {
BuilderUtils.assertNonNull(eventScheduler, "Event scheduler must be non-null");
this.eventScheduler = eventScheduler;
return this;
}

/**
* Builds the {@link AxoniqConsoleConfigurerModule} based on the fields set in this {@link Builder}.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright (c) 2022-2024. AxonIQ B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.axoniq.console.framework.messaging

import io.axoniq.console.framework.api.AxoniqConsoleMessageOrigin
import io.axoniq.console.framework.api.metrics.DispatcherStatisticIdentifier
import io.axoniq.console.framework.api.metrics.HandlerStatisticsMetricIdentifier
import io.axoniq.console.framework.api.metrics.HandlerType
import io.axoniq.console.framework.api.metrics.MessageIdentifier
import org.axonframework.eventhandling.EventMessage
import org.axonframework.eventhandling.scheduling.EventScheduler
import org.axonframework.eventhandling.scheduling.ScheduleToken
import org.axonframework.lifecycle.Lifecycle
import org.axonframework.lifecycle.Lifecycle.LifecycleRegistry
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork
import java.time.Duration
import java.time.Instant

/**
* Used to be able to report the Dispatcher for events send via a scheduler.
* The original configured scheduler will become the delegate of this class.
*/
class AxoniqConsoleWrappedEventScheduler(
private val delegate: EventScheduler,
private val registry: HandlerMetricsRegistry,
private val componentName: String,
) : EventScheduler, Lifecycle {
override fun schedule(triggerDateTime: Instant, event: Any): ScheduleToken {
registerEvent(event)
return delegate.schedule(triggerDateTime, event)
}

override fun schedule(triggerDuration: Duration, event: Any): ScheduleToken {
registerEvent(event)
return delegate.schedule(triggerDuration, event)
}

override fun cancelSchedule(scheduleToken: ScheduleToken?) {
delegate.cancelSchedule(scheduleToken)
}

override fun reschedule(scheduleToken: ScheduleToken, triggerDuration: Duration, event: Any): ScheduleToken {
return delegate.reschedule(scheduleToken, triggerDuration, event)
}

override fun reschedule(scheduleToken: ScheduleToken, triggerDateTime: Instant, event: Any): ScheduleToken {
return delegate.reschedule(scheduleToken, triggerDateTime, event)
}

override fun shutdown() {
delegate.shutdown()
}

override fun registerLifecycleHandlers(lifecycleRegistry: LifecycleRegistry) {
if (delegate is Lifecycle) {
delegate.registerLifecycleHandlers(lifecycleRegistry)
}
}

private fun registerEvent(event: Any) {
if (!CurrentUnitOfWork.isStarted()) {
// Determine the origin of the handler
val origin = when {
event is EventMessage<*> && event.payloadType?.javaClass?.isAnnotationPresent(AxoniqConsoleMessageOrigin::class.java) == true -> event.payloadType.getAnnotation(AxoniqConsoleMessageOrigin::class.java).name
event.javaClass.isAnnotationPresent(AxoniqConsoleMessageOrigin::class.java) -> event.javaClass.getAnnotation(AxoniqConsoleMessageOrigin::class.java).name
else -> componentName
}
reportMessageDispatchedFromOrigin(origin, event)
} else {
reportMessageDispatchedFromHandler(CurrentUnitOfWork.get().message.payload.javaClass.simpleName, event)
}

}

private fun reportMessageDispatchedFromOrigin(originName: String, event: Any) {
registry.registerMessageDispatchedDuringHandling(
DispatcherStatisticIdentifier(HandlerStatisticsMetricIdentifier(
type = HandlerType.Origin,
component = originName,
message = MessageIdentifier("Dispatcher", originName)), event.toInformation())
)
}

private fun reportMessageDispatchedFromHandler(handlerName: String, event: Any) {
registry.registerMessageDispatchedDuringHandling(
DispatcherStatisticIdentifier(HandlerStatisticsMetricIdentifier(
type = HandlerType.Message,
component = handlerName,
message = MessageIdentifier("Dispatcher", handlerName)), event.toInformation())
)
}

private fun Any.toInformation() = MessageIdentifier(
EventMessage::class.java.simpleName,
when (this) {
is EventMessage<*> -> this.payloadType.name.toSimpleName()
else -> this.javaClass.name.toSimpleName()
}
)
}

0 comments on commit d7046f8

Please sign in to comment.