From 8743ea9c2cd68b927c0fb05c72dcb0cbd6c8f618 Mon Sep 17 00:00:00 2001 From: Mitchell Herrijgers Date: Fri, 5 Jul 2024 11:31:16 +0100 Subject: [PATCH 1/7] Second generation event processors Changes the structure of event processor information during the setup process, so we can display more information to the user, including: - Subscribing Event processors - Different properties based on event processor type - Improved information about the message source This has all been done in a backwards-compatible way. The backend will be able to handle all versions of clients, and the UI will change based on whether it's generation 1 or 2 --- console-framework-client-api/pom.xml | 2 +- .../framework/api/clientIdentification.kt | 195 +++++++++++++----- .../pom.xml | 2 +- console-framework-client/pom.xml | 2 +- .../framework/client/SetupPayloadCreator.kt | 180 +++++++++++----- pom.xml | 2 +- 6 files changed, 272 insertions(+), 111 deletions(-) diff --git a/console-framework-client-api/pom.xml b/console-framework-client-api/pom.xml index 29e7056..7c60bdd 100644 --- a/console-framework-client-api/pom.xml +++ b/console-framework-client-api/pom.xml @@ -21,7 +21,7 @@ io.axoniq.console console-framework-client-parent - 1.5.2-SNAPSHOT + 1.6.0-SNAPSHOT 4.0.0 diff --git a/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/clientIdentification.kt b/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/clientIdentification.kt index 2535680..70806ae 100644 --- a/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/clientIdentification.kt +++ b/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/clientIdentification.kt @@ -16,6 +16,8 @@ package io.axoniq.console.framework.api +import com.fasterxml.jackson.annotation.JsonSubTypes +import com.fasterxml.jackson.annotation.JsonTypeInfo import java.net.URLDecoder import java.net.URLEncoder @@ -27,8 +29,8 @@ import java.net.URLEncoder * @code Bearer WORK_SPACE_ID:ENVIRONMENT_ID:COMPONENT_NAME:NODE_ID:ACCESS_TOKEN} */ data class ConsoleClientAuthentication( - val identification: ConsoleClientIdentifier, - val accessToken: String, + val identification: ConsoleClientIdentifier, + val accessToken: String, ) { fun toBearerToken(): String { return BEARER_PREFIX + listOf( @@ -50,24 +52,24 @@ data class ConsoleClientAuthentication( if (tokenParts.size == 5) { val (_, environmentId, applicationName, nodeName, accessToken) = tokenParts return ConsoleClientAuthentication( - ConsoleClientIdentifier( - environmentId = environmentId, - applicationName = applicationName.decode(), - nodeName = nodeName.decode() - ), - accessToken + ConsoleClientIdentifier( + environmentId = environmentId, + applicationName = applicationName.decode(), + nodeName = nodeName.decode() + ), + accessToken ) } assert(tokenParts.size == 4) { TOKEN_ERROR } val (environmentId, applicationName, nodeName, accessToken) = tokenParts return ConsoleClientAuthentication( - ConsoleClientIdentifier( - environmentId = environmentId, - applicationName = applicationName.decode(), - nodeName = nodeName.decode() - ), - accessToken + ConsoleClientIdentifier( + environmentId = environmentId, + applicationName = applicationName.decode(), + nodeName = nodeName.decode() + ), + accessToken ) } @@ -87,7 +89,7 @@ data class SetupPayload( val commandBus: CommandBusInformation, val queryBus: QueryBusInformation, val eventStore: EventStoreInformation, - val processors: List, + val processors: List, val versions: Versions, val upcasters: List, ) @@ -107,48 +109,139 @@ data class ModuleVersion( ) data class CommandBusInformation( - val type: String, - val axonServer: Boolean, - val localSegmentType: String?, - val context: String?, - val handlerInterceptors: List = emptyList(), - val dispatchInterceptors: List = emptyList(), - val messageSerializer: SerializerInformation?, + val type: String, + val axonServer: Boolean, + val localSegmentType: String?, + val context: String?, + val handlerInterceptors: List = emptyList(), + val dispatchInterceptors: List = emptyList(), + val messageSerializer: SerializerInformation?, ) data class QueryBusInformation( - val type: String, - val axonServer: Boolean, - val localSegmentType: String?, - val context: String?, - val handlerInterceptors: List = emptyList(), - val dispatchInterceptors: List = emptyList(), - val messageSerializer: SerializerInformation?, - val serializer: SerializerInformation?, + val type: String, + val axonServer: Boolean, + val localSegmentType: String?, + val context: String?, + val handlerInterceptors: List = emptyList(), + val dispatchInterceptors: List = emptyList(), + val messageSerializer: SerializerInformation?, + val serializer: SerializerInformation?, ) data class EventStoreInformation( - val type: String, - val axonServer: Boolean, - val context: String?, - val dispatchInterceptors: List = emptyList(), - val eventSerializer: SerializerInformation?, - val snapshotSerializer: SerializerInformation?, - val approximateSize: Long? = null, -) - -data class ProcessorInformation( - val name: String, - val messageSourceType: String, - val contexts: List? = emptyList(), - val tokenStoreType: String, - val supportsReset: Boolean, - val batchSize: Int, - val tokenClaimInterval: Long, - val tokenStoreClaimTimeout: Long, - val errorHandler: String, - val invocationErrorHandler: String, - val interceptors: List, + val type: String, + val axonServer: Boolean, + val context: String?, + val dispatchInterceptors: List = emptyList(), + val eventSerializer: SerializerInformation?, + val snapshotSerializer: SerializerInformation?, + val approximateSize: Long? = null, +) + + +data class EventProcessorInformation( + val name: String, + val processorType: ProcessorType? = null, + + val commonProcessorInformation: CommonProcessorInformation? = null, + val subscribingProcessorInformation: SubscribingProcessorInformation? = null, + val streamingInformation: StreamingEventProcessorInformation? = null, + val trackingInformation: TrackingEventProcessorInformation? = null, + val pooledStreamingInformation: PooledStreamingEventProcessorInformation? = null, + + @Deprecated("Deprecated since version 1.6.0 due to new processor structure") + val messageSourceType: String? = null, + @Deprecated("Deprecated since version 1.6.0 due to new processor structure") + val contexts: List? = null, + @Deprecated("Deprecated since version 1.6.0 due to new processor structure") + val tokenStoreType: String? = null, + @Deprecated("Deprecated since version 1.6.0 due to new processor structure") + val supportsReset: Boolean? = null, + @Deprecated("Deprecated since version 1.6.0 due to new processor structure") + val batchSize: Int? = null, + @Deprecated("Deprecated since version 1.6.0 due to new processor structure") + val tokenClaimInterval: Long? = null, + @Deprecated("Deprecated since version 1.6.0 due to new processor structure") + val tokenStoreClaimTimeout: Long? = null, + @Deprecated("Deprecated since version 1.6.0 due to new processor structure") + val errorHandler: String? = null, + @Deprecated("Deprecated since version 1.6.0 due to new processor structure") + val invocationErrorHandler: String? = null, + @Deprecated("Deprecated since version 1.6.0 due to new processor structure") + val interceptors: List? = null, +) + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") +@JsonSubTypes( + JsonSubTypes.Type(value = AxonServerEventStoreMessageSourceInformation::class, name = "AxonServer"), + JsonSubTypes.Type(value = EmbeddedEventStoreMessageSourceInformation::class, name = "Embedded"), + JsonSubTypes.Type(value = MultiStreamableMessageSourceInformation::class, name = "MultiStreamable"), + JsonSubTypes.Type(value = UnspecifiedMessageSourceInformation::class, name = "Unspecified"), +) +interface MessageSourceInformation { + val className: String +} + +data class AxonServerEventStoreMessageSourceInformation( + override val className: String, + val contexts: List +) : MessageSourceInformation + +data class EmbeddedEventStoreMessageSourceInformation( + override val className: String, + val optimizeEventConsumption: Boolean?, + val fetchDelay: Long?, + val cachedEvents: Int?, + val cleanupDelay: Long?, + val eventStorageEngineType: String?, +) : MessageSourceInformation + +data class MultiStreamableMessageSourceInformation( + override val className: String, + val sources: List, +) : MessageSourceInformation + +data class UnspecifiedMessageSourceInformation( + override val className: String, +) : MessageSourceInformation + + +enum class ProcessorType { + SUBSCRIBING, + TRACKING, + POOLED_STREAMING, +} + +data class StreamingEventProcessorInformation( + val tokenStoreType: String?, + val batchSize: Int?, + val tokenClaimInterval: Long?, + val tokenStoreClaimTimeout: Long?, + val supportsReset: Boolean?, +) + +data class TrackingEventProcessorInformation( + val maxThreadCount: Int?, + val eventAvailabilityTimeout: Int?, + val storeTokenBeforeProcessing: Boolean?, +) + +data class PooledStreamingEventProcessorInformation( + val maxClaimedSegments: Int?, + val claimExtensionThreshold: Long?, + val coordinatorExtendsClaims: Boolean?, +) + +data class SubscribingProcessorInformation( + val processingStrategy: String, +) + +data class CommonProcessorInformation( + val messageSource: MessageSourceInformation?, + val errorHandler: String?, + val invocationErrorHandler: String?, + val interceptors: List, ) data class InterceptorInformation( diff --git a/console-framework-client-spring-boot-starter/pom.xml b/console-framework-client-spring-boot-starter/pom.xml index 3df389f..815f5dc 100644 --- a/console-framework-client-spring-boot-starter/pom.xml +++ b/console-framework-client-spring-boot-starter/pom.xml @@ -21,7 +21,7 @@ io.axoniq.console console-framework-client-parent - 1.5.2-SNAPSHOT + 1.6.0-SNAPSHOT 4.0.0 diff --git a/console-framework-client/pom.xml b/console-framework-client/pom.xml index 8f5397b..70e5a98 100644 --- a/console-framework-client/pom.xml +++ b/console-framework-client/pom.xml @@ -21,7 +21,7 @@ io.axoniq.console console-framework-client-parent - 1.5.2-SNAPSHOT + 1.6.0-SNAPSHOT 4.0.0 diff --git a/console-framework-client/src/main/java/io/axoniq/console/framework/client/SetupPayloadCreator.kt b/console-framework-client/src/main/java/io/axoniq/console/framework/client/SetupPayloadCreator.kt index a67e1df..4fc6aa4 100644 --- a/console-framework-client/src/main/java/io/axoniq/console/framework/client/SetupPayloadCreator.kt +++ b/console-framework-client/src/main/java/io/axoniq/console/framework/client/SetupPayloadCreator.kt @@ -23,7 +23,9 @@ import org.axonframework.common.ReflectionUtils import org.axonframework.config.Configuration import org.axonframework.config.EventProcessingModule import org.axonframework.eventhandling.* +import org.axonframework.eventhandling.pooled.PooledStreamingEventProcessor import org.axonframework.eventhandling.tokenstore.TokenStore +import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore import org.axonframework.eventsourcing.eventstore.EventStore import org.axonframework.messaging.StreamableMessageSource import org.axonframework.queryhandling.QueryBus @@ -38,31 +40,13 @@ class SetupPayloadCreator( private val eventProcessingConfiguration = configuration.eventProcessingConfiguration() as EventProcessingModule fun createReport(): SetupPayload { - val processors = eventProcessingConfiguration.eventProcessors() - .filter { it.value is StreamingEventProcessor } - .map { entry -> - entry.key - } + val processors = eventProcessingConfiguration.eventProcessors().keys return SetupPayload( commandBus = commandBusInformation(), queryBus = queryBusInformation(), eventStore = eventBusInformation(), - processors = processors.map { - val processor = - eventProcessingConfiguration.eventProcessor(it, StreamingEventProcessor::class.java).get() - ProcessorInformation( - name = it, - supportsReset = processor.supportsReset(), - batchSize = processor.getBatchSize(), - messageSourceType = processor.getMessageSource(), - tokenClaimInterval = processor.getTokenClaimInterval(), - tokenStoreClaimTimeout = processor.getStoreTokenClaimTimeout(), - errorHandler = eventProcessingConfiguration.errorHandler(it)::class.java.name, - invocationErrorHandler = eventProcessingConfiguration.listenerInvocationErrorHandler(it)::class.java.name, - interceptors = processor.getInterceptors("interceptors"), - tokenStoreType = processor.getStoreTokenStoreType(), - contexts = processor.contexts() - ) + processors = processors.mapNotNull { + toProcessor(it) }, versions = versionInformation(), upcasters = upcasters(), @@ -72,6 +56,119 @@ class SetupPayloadCreator( ) } + private fun toProcessor(name: String): EventProcessorInformation? { + val processor = eventProcessingConfiguration.eventProcessor(name, EventProcessor::class.java).orElse(null) + return when (processor) { + is PooledStreamingEventProcessor -> toPooledStreamingProcessorInformation(processor) + is TrackingEventProcessor -> toTrackingProcessorInformation(processor) + is SubscribingEventProcessor -> toSubscribingProcessorInformation(processor) + else -> null + } + } + + private fun toSubscribingProcessorInformation(processor: SubscribingEventProcessor): EventProcessorInformation { + return EventProcessorInformation( + name = processor.name, + processorType = ProcessorType.SUBSCRIBING, + commonProcessorInformation = commonProcessorInformation(processor), + subscribingProcessorInformation = SubscribingProcessorInformation( + processingStrategy = processor.getPropertyType("processingStrategy") + ) + ) + } + + private fun commonProcessorInformation(processor: EventProcessor) = + CommonProcessorInformation( + messageSource = toMessageSource(processor.getPropertyValue("messageSource")), + errorHandler = eventProcessingConfiguration.errorHandler(processor.name)::class.java.name, + invocationErrorHandler = eventProcessingConfiguration.listenerInvocationErrorHandler(processor.name)::class.java.name, + interceptors = processor.getInterceptors("interceptors"), + ) + + private fun toPooledStreamingProcessorInformation(processor: PooledStreamingEventProcessor): EventProcessorInformation { + return EventProcessorInformation( + name = processor.name, + processorType = ProcessorType.POOLED_STREAMING, + commonProcessorInformation = commonProcessorInformation(processor), + streamingInformation = streamingEventProcessorInformation(processor), + pooledStreamingInformation = PooledStreamingEventProcessorInformation( + maxClaimedSegments = processor.getPropertyValue("maxClaimedSegments"), + claimExtensionThreshold = processor.getPropertyValue("claimExtensionThreshold"), + coordinatorExtendsClaims = processor.getPropertyValue("coordinator")?.getPropertyValue("coordinatorExtendsClaims") + ), + ) + } + + private fun toTrackingProcessorInformation(processor: TrackingEventProcessor): EventProcessorInformation { + return EventProcessorInformation( + name = processor.name, + processorType = ProcessorType.TRACKING, + commonProcessorInformation = commonProcessorInformation(processor), + streamingInformation = streamingEventProcessorInformation(processor), + trackingInformation = TrackingEventProcessorInformation( + maxThreadCount = processor.getPropertyValue("maxThreadCount"), + eventAvailabilityTimeout = processor.getPropertyValue("eventAvailabilityTimeout"), + storeTokenBeforeProcessing = processor.getPropertyValue("storeTokenBeforeProcessing"), + ), + ) + } + + private fun streamingEventProcessorInformation(processor: StreamingEventProcessor) = StreamingEventProcessorInformation( + batchSize = processor.getPropertyValue("batchSize"), + tokenClaimInterval = processor.getPropertyValue("tokenClaimInterval"), + tokenStoreType = processor.getPropertyType("tokenStore", TokenStore::class.java), + supportsReset = processor.supportsReset(), + tokenStoreClaimTimeout = processor.getStoreTokenClaimTimeout("tokenStore"), + ) + + private fun toMessageSource(messageSource: StreamableMessageSource<*>?): MessageSourceInformation { + if (messageSource == null) { + return UnspecifiedMessageSourceInformation("Unknown") + } + return when { + messageSource is MultiStreamableMessageSource -> MultiStreamableMessageSourceInformation( + messageSource::class.java.name, + messageSource.getPropertyValue>>("eventStreams")?.map { toMessageSource(it) } + ?: emptyList() + ) + + messageSource is EmbeddedEventStore -> createEmbeddedMessageSourceInformation(messageSource) + messageSource::class.java.simpleName == "AxonServerEventStore" -> createAxonServerMessageSourceInfoFromStore(messageSource) + messageSource::class.java.simpleName == "AxonIQEventStorageEngine" -> createAxonServerMessageSourceInfoFromStorageEngine(messageSource) + else -> UnspecifiedMessageSourceInformation(messageSource::class.java.name) + } + } + + private fun createAxonServerMessageSourceInfoFromStorageEngine(messageSource: StreamableMessageSource<*>): MessageSourceInformation { + val context = messageSource.getPropertyValue("context") + + return AxonServerEventStoreMessageSourceInformation( + messageSource::class.java.name, + listOfNotNull(context) + ) + } + + private fun createAxonServerMessageSourceInfoFromStore(messageSource: StreamableMessageSource<*>): MessageSourceInformation { + val context = messageSource.getPropertyValue("storageEngine")?.getPropertyValue("context") + + return AxonServerEventStoreMessageSourceInformation( + messageSource::class.java.name, + listOfNotNull(context) + ) + } + + private fun createEmbeddedMessageSourceInformation(messageSource: EmbeddedEventStore): MessageSourceInformation { + return EmbeddedEventStoreMessageSourceInformation( + className = messageSource::class.java.name, + optimizeEventConsumption = messageSource.getPropertyValue("optimizeEventConsumption"), + fetchDelay = messageSource.getPropertyValue("producer")?.getPropertyValue("fetchDelayNanos")?.let { it / 1_000_000 }, + cachedEvents = messageSource.getPropertyValue("producer")?.getPropertyValue("cachedEvents"), + cleanupDelay = messageSource.getPropertyValue("cleanupDelayMillis"), + eventStorageEngineType = messageSource.getPropertyType("storageEngine") + + ) + } + private fun upcasters(): List { val upcasters = configuration.upcasterChain().getPropertyValue>>("upcasters") ?: emptyList() @@ -127,7 +224,7 @@ class SetupPayloadCreator( private fun queryBusInformation(): QueryBusInformation { val bus = configuration.queryBus().unwrapPossiblyDecoratedClass(QueryBus::class.java) val axonServer = bus::class.java.name == "org.axonframework.axonserver.connector.query.AxonServerQueryBus" - val localSegmentType = if (axonServer) bus.getPropertyTypeNested("localSegment", QueryBus::class.java) else null + val localSegmentType = if (axonServer) bus.getPropertyType("localSegment", QueryBus::class.java) else null val context = if (axonServer) bus.getPropertyValue("context") else null val handlerInterceptors = if (axonServer) { bus.getPropertyValue("localSegment")?.getInterceptors("handlerInterceptors") ?: emptyList() @@ -188,6 +285,7 @@ class SetupPayloadCreator( is MultiSourceTrackingToken -> token.trackingTokens.values.sumOf { getSizeFromToken(it) ?: 0 } + else -> null } @@ -195,7 +293,7 @@ class SetupPayloadCreator( val bus = configuration.commandBus().unwrapPossiblyDecoratedClass(CommandBus::class.java) val axonServer = bus::class.java.name == "org.axonframework.axonserver.connector.command.AxonServerCommandBus" val localSegmentType = - if (axonServer) bus.getPropertyTypeNested("localSegment", CommandBus::class.java) else null + if (axonServer) bus.getPropertyType("localSegment", CommandBus::class.java) else null val context = if (axonServer) bus.getPropertyValue("context") else null val handlerInterceptors = if (axonServer) { bus.getPropertyValue("localSegment")?.getInterceptors("handlerInterceptors", "invokerInterceptors") @@ -233,7 +331,7 @@ class SetupPayloadCreator( ).let { it::class.java.name } } - private fun Any.getPropertyTypeNested(fieldName: String, clazz: Class): String { + private fun Any.getPropertyType(fieldName: String, clazz: Class): String { return ReflectionUtils.getMemberValue( ReflectionUtils.fieldsOf(this::class.java).first { it.name == fieldName }, this @@ -242,16 +340,8 @@ class SetupPayloadCreator( .let { it::class.java.name } } - private fun StreamingEventProcessor.getBatchSize(): Int = getPropertyValue("batchSize") ?: -1 - private fun StreamingEventProcessor.getMessageSource(): String = - getPropertyTypeNested("messageSource", EventStore::class.java) - - private fun StreamingEventProcessor.getTokenClaimInterval(): Long = getPropertyValue("tokenClaimInterval") ?: -1 - private fun StreamingEventProcessor.getStoreTokenStoreType(): String = - getPropertyTypeNested("tokenStore", TokenStore::class.java) - - private fun StreamingEventProcessor.getStoreTokenClaimTimeout(): Long = getPropertyValue("tokenStore") - ?.getPropertyValue("claimTimeout")?.let { it.get(ChronoUnit.SECONDS) * 1000 } ?: -1 + private fun Any.getStoreTokenClaimTimeout(fieldName: String): Long? = getPropertyValue(fieldName) + ?.getPropertyValue("claimTimeout")?.let { it.get(ChronoUnit.SECONDS) * 1000 } private fun Any.getInterceptors(vararg fieldNames: String): List { @@ -280,28 +370,6 @@ class SetupPayloadCreator( } return SerializerInformation(serializer::class.java.name, false) } - - private fun StreamingEventProcessor.contexts(): List { - val messageSource = getPropertyValue>("messageSource") ?: return emptyList() - val sources = if (messageSource is MultiStreamableMessageSource) { - messageSource.getPropertyValue>>("eventStreams") ?: emptyList() - } else { - listOf(messageSource) - } - return sources.mapNotNull { toContext(it) }.distinct() - } - - private fun toContext(it: StreamableMessageSource<*>): String? { - if (it::class.java.simpleName == "AxonServerEventStore") { - return it.getPropertyValue("storageEngine")?.getPropertyValue("context") - } - if (it::class.java.simpleName == "AxonIQEventStorageEngine") { - return it.getPropertyValue("context") - } - // Fallback - return it.getPropertyValue("context") - } - } diff --git a/pom.xml b/pom.xml index b3fe637..4e11770 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ io.axoniq.console console-framework-client-parent - 1.5.2-SNAPSHOT + 1.6.0-SNAPSHOT console-framework-client-api From cd7bcfe49d6211d241d3d06b9eacb8c72f358910 Mon Sep 17 00:00:00 2001 From: Mitchell Herrijgers Date: Sat, 6 Jul 2024 12:08:47 +0100 Subject: [PATCH 2/7] Fix for AxonServerMessageSource --- .../framework/client/SetupPayloadCreator.kt | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/console-framework-client/src/main/java/io/axoniq/console/framework/client/SetupPayloadCreator.kt b/console-framework-client/src/main/java/io/axoniq/console/framework/client/SetupPayloadCreator.kt index 4fc6aa4..5ae1975 100644 --- a/console-framework-client/src/main/java/io/axoniq/console/framework/client/SetupPayloadCreator.kt +++ b/console-framework-client/src/main/java/io/axoniq/console/framework/client/SetupPayloadCreator.kt @@ -79,7 +79,7 @@ class SetupPayloadCreator( private fun commonProcessorInformation(processor: EventProcessor) = CommonProcessorInformation( - messageSource = toMessageSource(processor.getPropertyValue("messageSource")), + messageSource = toMessageSource(processor, processor.getPropertyValue("messageSource")), errorHandler = eventProcessingConfiguration.errorHandler(processor.name)::class.java.name, invocationErrorHandler = eventProcessingConfiguration.listenerInvocationErrorHandler(processor.name)::class.java.name, interceptors = processor.getInterceptors("interceptors"), @@ -121,19 +121,20 @@ class SetupPayloadCreator( tokenStoreClaimTimeout = processor.getStoreTokenClaimTimeout("tokenStore"), ) - private fun toMessageSource(messageSource: StreamableMessageSource<*>?): MessageSourceInformation { + private fun toMessageSource(processor: EventProcessor, messageSource: StreamableMessageSource<*>?): MessageSourceInformation { if (messageSource == null) { return UnspecifiedMessageSourceInformation("Unknown") } return when { messageSource is MultiStreamableMessageSource -> MultiStreamableMessageSourceInformation( messageSource::class.java.name, - messageSource.getPropertyValue>>("eventStreams")?.map { toMessageSource(it) } + messageSource.getPropertyValue>>("eventStreams")?.map { toMessageSource(processor, it) } ?: emptyList() ) messageSource is EmbeddedEventStore -> createEmbeddedMessageSourceInformation(messageSource) messageSource::class.java.simpleName == "AxonServerEventStore" -> createAxonServerMessageSourceInfoFromStore(messageSource) + messageSource::class.java.simpleName == "AxonServerMessageSource" -> createAxonServerMessageSourceInfoFromMessageSource(messageSource) messageSource::class.java.simpleName == "AxonIQEventStorageEngine" -> createAxonServerMessageSourceInfoFromStorageEngine(messageSource) else -> UnspecifiedMessageSourceInformation(messageSource::class.java.name) } @@ -148,6 +149,15 @@ class SetupPayloadCreator( ) } + private fun createAxonServerMessageSourceInfoFromMessageSource(messageSource: StreamableMessageSource<*>): MessageSourceInformation { + val context = messageSource.getPropertyValue("eventStorageEngine")?.getPropertyValue("context") + + return AxonServerEventStoreMessageSourceInformation( + messageSource::class.java.name, + listOfNotNull(context) + ) + } + private fun createAxonServerMessageSourceInfoFromStore(messageSource: StreamableMessageSource<*>): MessageSourceInformation { val context = messageSource.getPropertyValue("storageEngine")?.getPropertyValue("context") From 83e9d33aff249673b858cd61e2ae95e1f66cba39 Mon Sep 17 00:00:00 2001 From: Mitchell Herrijgers Date: Sat, 6 Jul 2024 15:14:39 +0100 Subject: [PATCH 3/7] Register child messages as part of the parent when handled in the same thread --- .../framework/api/metrics/HandlerType.kt | 10 +-- .../console/framework/api/metrics/Metric.kt | 73 ++++++++++--------- .../AxoniqConsoleProcessorInterceptor.kt | 6 +- .../AxoniqConsoleHandlerEnhancerDefinition.kt | 11 +-- .../messaging/AxoniqConsoleSpanFactory.kt | 48 +++++++++--- .../framework/messaging/SpanMatcher.kt | 3 + .../console/framework/messaging/extensions.kt | 7 +- 7 files changed, 96 insertions(+), 62 deletions(-) diff --git a/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/metrics/HandlerType.kt b/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/metrics/HandlerType.kt index 626b092..103a603 100644 --- a/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/metrics/HandlerType.kt +++ b/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/metrics/HandlerType.kt @@ -16,9 +16,9 @@ package io.axoniq.console.framework.api.metrics -enum class HandlerType { - Origin, - EventProcessor, - Aggregate, - Message, +enum class HandlerType(val shortIdentifier: String) { + Origin("o"), + EventProcessor("ep"), + Aggregate("ag"), + Message("m"), } diff --git a/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/metrics/Metric.kt b/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/metrics/Metric.kt index 6a04ded..cbe2424 100644 --- a/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/metrics/Metric.kt +++ b/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/metrics/Metric.kt @@ -16,14 +16,14 @@ package io.axoniq.console.framework.api.metrics -import io.axoniq.console.framework.api.metrics.MetricTargetType.* +import io.axoniq.console.framework.api.metrics.MetricTargetType.AGGREGATE +import io.axoniq.console.framework.api.metrics.MetricTargetType.HANDLER interface Metric { val type: MetricType val identifier: String val description: String - val breakDownMetrics: List val targetTypes: List @@ -31,51 +31,58 @@ interface Metric { get() = "${type.metricPrefix}_${identifier}" } +data class ChildHandlerMetric( + val handler: HandlerStatisticsMetricIdentifier +) : Metric { + override val type: MetricType = MetricType.TIMER + override val description: String = "Time it took for the event handler to process an event as subscriber" + override val identifier: String = "${handler.type.shortIdentifier}_\"${handler.component}\"_\"${handler.message.type}\"_\"${handler.message.name}\"" + override val targetTypes: List = listOf(HANDLER) + +} + data class UserHandlerInterceptorMetric( - override val type: MetricType = MetricType.TIMER, - override val identifier: String, - override val description: String = "User defined metric", - override val breakDownMetrics: List = emptyList(), - override val targetTypes: List = listOf(HANDLER, AGGREGATE), + override val type: MetricType = MetricType.TIMER, + override val identifier: String, + override val description: String = "User defined metric", + override val targetTypes: List = listOf(HANDLER, AGGREGATE), ) : Metric enum class PreconfiguredMetric( - override val type: MetricType, - override val identifier: String, - override val description: String, - override val breakDownMetrics: List = listOf(), - override val targetTypes: List, + override val type: MetricType, + override val identifier: String, + override val description: String, + override val targetTypes: List, ) : Metric { MESSAGE_HANDLER_TIME( - type = MetricType.TIMER, - identifier = "handler", - description = "Time it took for the actual user-made handler function to be invoked", - targetTypes = listOf(AGGREGATE, HANDLER), + type = MetricType.TIMER, + identifier = "handler", + description = "Time it took for the actual user-made handler function to be invoked", + targetTypes = listOf(AGGREGATE, HANDLER), ), AGGREGATE_LOCK_TIME( - type = MetricType.TIMER, - identifier = "aggregate_lock", - description = "Time it took for a command to acquire a lock for the aggregate identifier", - targetTypes = listOf(AGGREGATE, HANDLER), + type = MetricType.TIMER, + identifier = "aggregate_lock", + description = "Time it took for a command to acquire a lock for the aggregate identifier", + targetTypes = listOf(AGGREGATE, HANDLER), ), AGGREGATE_LOAD_TIME( - type = MetricType.TIMER, - identifier = "aggregate_load", - description = "Time it took for a command to load the target aggregate", - targetTypes = listOf(AGGREGATE, HANDLER), - breakDownMetrics = listOf(AGGREGATE_LOCK_TIME) + type = MetricType.TIMER, + identifier = "aggregate_load", + description = "Time it took for a command to load the target aggregate", + targetTypes = listOf(AGGREGATE, HANDLER), ), EVENT_COMMIT_TIME( - type = MetricType.TIMER, - identifier = "event_commit", - description = "Time it took to commit events to the event store", - targetTypes = listOf(AGGREGATE, HANDLER) + type = MetricType.TIMER, + identifier = "event_commit", + description = "Time it took to commit events to the event store", + targetTypes = listOf(AGGREGATE, HANDLER) ), AGGREGATE_EVENTS_SIZE( - type = MetricType.COUNTER, - identifier = "agg_events_size", - description = "Amount of events loaded during aggregate initialization", - targetTypes = listOf(AGGREGATE) + type = MetricType.COUNTER, + identifier = "agg_events_size", + description = "Amount of events loaded during aggregate initialization", + targetTypes = listOf(AGGREGATE) ) ; diff --git a/console-framework-client/src/main/java/io/axoniq/console/framework/eventprocessor/metrics/AxoniqConsoleProcessorInterceptor.kt b/console-framework-client/src/main/java/io/axoniq/console/framework/eventprocessor/metrics/AxoniqConsoleProcessorInterceptor.kt index 0b022d8..a1d8258 100644 --- a/console-framework-client/src/main/java/io/axoniq/console/framework/eventprocessor/metrics/AxoniqConsoleProcessorInterceptor.kt +++ b/console-framework-client/src/main/java/io/axoniq/console/framework/eventprocessor/metrics/AxoniqConsoleProcessorInterceptor.kt @@ -16,7 +16,7 @@ package io.axoniq.console.framework.eventprocessor.metrics -import io.axoniq.console.framework.messaging.CONSOLE_PROCESSING_GROUP +import io.axoniq.console.framework.messaging.AxoniqConsoleSpanFactory import org.axonframework.eventhandling.EventMessage import org.axonframework.messaging.InterceptorChain import org.axonframework.messaging.Message @@ -41,7 +41,9 @@ class AxoniqConsoleProcessorInterceptor( return interceptorChain.proceed() } try { - unitOfWork.resources()[CONSOLE_PROCESSING_GROUP] = processorName + AxoniqConsoleSpanFactory.onTopLevelSpanIfActive { + it.reportProcessorName(processorName) + } val message = unitOfWork.message if (message is EventMessage) { val segment = unitOfWork.resources()["Processor[$processorName]/SegmentId"] as? Int ?: -1 diff --git a/console-framework-client/src/main/java/io/axoniq/console/framework/messaging/AxoniqConsoleHandlerEnhancerDefinition.kt b/console-framework-client/src/main/java/io/axoniq/console/framework/messaging/AxoniqConsoleHandlerEnhancerDefinition.kt index 7155aa3..b383adc 100644 --- a/console-framework-client/src/main/java/io/axoniq/console/framework/messaging/AxoniqConsoleHandlerEnhancerDefinition.kt +++ b/console-framework-client/src/main/java/io/axoniq/console/framework/messaging/AxoniqConsoleHandlerEnhancerDefinition.kt @@ -16,7 +16,6 @@ package io.axoniq.console.framework.messaging -import io.axoniq.console.framework.computeIfAbsentWithRetry import org.axonframework.common.Priority import org.axonframework.config.ProcessingGroup import org.axonframework.messaging.Message @@ -25,9 +24,6 @@ import org.axonframework.messaging.annotation.MessageHandlingMember import org.axonframework.messaging.annotation.WrappedMessageHandlingMember import org.axonframework.messaging.unitofwork.CurrentUnitOfWork -internal const val CONSOLE_DECLARING_CLASS = "___axoniqConsoleDeclaringClass" -internal const val CONSOLE_PROCESSING_GROUP = "___axoniqConsoleProcessor" - @Priority((Int.MIN_VALUE * 0.95).toInt()) class AxoniqConsoleHandlerEnhancerDefinition : HandlerEnhancerDefinition { @@ -45,20 +41,17 @@ class AxoniqConsoleHandlerEnhancerDefinition : HandlerEnhancerDefinition { return super.handle(message, target) } val uow = CurrentUnitOfWork.get() - uow.resources()[CONSOLE_DECLARING_CLASS] = declaringClassName - uow.resources().computeIfAbsentWithRetry(CONSOLE_PROCESSING_GROUP) { processingGroup } - val start = System.nanoTime() try { val result = super.handle(message, target) AxoniqConsoleSpanFactory.onTopLevelSpanIfActive { - it.registerHandler(uow.extractHandler(), System.nanoTime() - start) + it.registerHandler(uow.extractHandler(declaringClassName, processingGroup), System.nanoTime() - start) } return result } catch (e: Exception) { AxoniqConsoleSpanFactory.onTopLevelSpanIfActive { it.recordException(e) - it.registerHandler(uow.extractHandler(), System.nanoTime() - start) + it.registerHandler(uow.extractHandler(declaringClassName, processingGroup), System.nanoTime() - start) } throw e } diff --git a/console-framework-client/src/main/java/io/axoniq/console/framework/messaging/AxoniqConsoleSpanFactory.kt b/console-framework-client/src/main/java/io/axoniq/console/framework/messaging/AxoniqConsoleSpanFactory.kt index 728ffab..e71bc10 100644 --- a/console-framework-client/src/main/java/io/axoniq/console/framework/messaging/AxoniqConsoleSpanFactory.kt +++ b/console-framework-client/src/main/java/io/axoniq/console/framework/messaging/AxoniqConsoleSpanFactory.kt @@ -37,11 +37,7 @@ class AxoniqConsoleSpanFactory(private val spanMatcherPredicateMap: SpanMatcherP private val CURRENT_MESSAGE_ID = ThreadLocal() fun onTopLevelSpanIfActive(block: (MeasuringConsoleSpan) -> Unit) { - if (CURRENT_MESSAGE_ID.get() == null) { - return - } - - ACTIVE_ROOT_SPANS[CURRENT_MESSAGE_ID.get()]?.let { + currentSpan()?.let { try { block(it) } catch (e: Exception) { @@ -49,11 +45,17 @@ class AxoniqConsoleSpanFactory(private val spanMatcherPredicateMap: SpanMatcherP } } } + + fun currentSpan(): MeasuringConsoleSpan? { + return CURRENT_MESSAGE_ID.get()?.let { ACTIVE_ROOT_SPANS[it] } + } } inner class MeasuringConsoleSpan(private val messageId: String) : Span { + var processorName: String? = null private var timeStarted: Long? = null private var transactionSuccessful = true + private var parentMessageId: String? = null // Fields that should be set by the handler enhancer private var handlerMetricIdentifier: HandlerStatisticsMetricIdentifier? = null @@ -76,12 +78,18 @@ class AxoniqConsoleSpanFactory(private val spanMatcherPredicateMap: SpanMatcherP } fun registerMetricValue(metric: Metric, value: Long) { - val actualValue = value - metric.breakDownMetrics.sumOf { metrics[it] ?: 0 } - metrics[metric] = actualValue + metrics[metric] = value + } + + private fun registerChildHandler(identifier: HandlerStatisticsMetricIdentifier, value: Long) { + metrics[ChildHandlerMetric(identifier)] = value } override fun start(): Span { logger.trace("Starting span for message id $messageId") + if(CURRENT_MESSAGE_ID.get() != null) { + parentMessageId = CURRENT_MESSAGE_ID.get() + } ACTIVE_ROOT_SPANS[messageId] = this CURRENT_MESSAGE_ID.set(messageId) timeStarted = System.nanoTime() @@ -93,11 +101,17 @@ class AxoniqConsoleSpanFactory(private val spanMatcherPredicateMap: SpanMatcherP override fun end() { val end = System.nanoTime() + if(parentMessageId != null) { + CURRENT_MESSAGE_ID.set(parentMessageId) + } else { + CURRENT_MESSAGE_ID.remove() + } ACTIVE_ROOT_SPANS.remove(messageId) - CURRENT_MESSAGE_ID.remove() - logger.trace("Ending span for message id $messageId = $handlerMetricIdentifier") if (handlerMetricIdentifier == null || timeStarted == null) return + if(parentMessageId != null) { + ACTIVE_ROOT_SPANS[parentMessageId]?.registerChildHandler(handlerMetricIdentifier!!, end - timeStarted!!) + } CurrentUnitOfWork.map { it.onCleanup { report(end) } }.orElseGet { @@ -131,6 +145,10 @@ class AxoniqConsoleSpanFactory(private val spanMatcherPredicateMap: SpanMatcherP transactionSuccessful = false return this } + + fun reportProcessorName(processorName: String) { + this.processorName = processorName + } } override fun createRootTrace(operationNameSupplier: Supplier): Span { @@ -175,6 +193,9 @@ class AxoniqConsoleSpanFactory(private val spanMatcherPredicateMap: SpanMatcherP override fun createInternalSpan(operationNameSupplier: Supplier, message: Message<*>): Span { val name = operationNameSupplier.get() + if (spanMatcherPredicateMap[SpanMatcher.SUBCRIBING_EVENT_HANDLER]!!.test(name)) { + return startEvenIfActive(message) + } if (spanMatcherPredicateMap[SpanMatcher.MESSAGE_START]!!.test(name)) { return startIfNotActive(message) } @@ -183,6 +204,9 @@ class AxoniqConsoleSpanFactory(private val spanMatcherPredicateMap: SpanMatcherP override fun createChildHandlerSpan(operationNameSupplier: Supplier, message: Message<*>, vararg linkedParents: Message<*>?): Span { val name = operationNameSupplier.get() + if (spanMatcherPredicateMap[SpanMatcher.SUBCRIBING_EVENT_HANDLER]!!.test(name)) { + return startEvenIfActive(message) + } if (spanMatcherPredicateMap[SpanMatcher.MESSAGE_START]!!.test(name)) { return startIfNotActive(message) } @@ -206,6 +230,12 @@ class AxoniqConsoleSpanFactory(private val spanMatcherPredicateMap: SpanMatcherP } } + private fun startEvenIfActive(message: Message<*>): Span { + return ACTIVE_ROOT_SPANS.computeIfAbsentWithRetry(message.identifier) { + MeasuringConsoleSpan(message.identifier) + } + } + class TimeRecordingSpan(private val metric: Metric) : Span { init { assert(metric.type == MetricType.TIMER) diff --git a/console-framework-client/src/main/java/io/axoniq/console/framework/messaging/SpanMatcher.kt b/console-framework-client/src/main/java/io/axoniq/console/framework/messaging/SpanMatcher.kt index 3c999ee..91a52a4 100644 --- a/console-framework-client/src/main/java/io/axoniq/console/framework/messaging/SpanMatcher.kt +++ b/console-framework-client/src/main/java/io/axoniq/console/framework/messaging/SpanMatcher.kt @@ -45,6 +45,9 @@ enum class SpanMatcher(val pre49Predicate: Predicate, val from49Predicat COMMIT( Predicate { name: String -> name.endsWith(".commit") }, Predicate { name: String -> name == "EventBus.commitEvents" }), + SUBCRIBING_EVENT_HANDLER( + Predicate { name: String -> name.startsWith("SubscribingEventProcessor[") && name.endsWith(".process")}, + Predicate { name: String -> name == "EventProcessor.process" }), MESSAGE_START( Predicate { name: String -> name.endsWith("Bus.handle") diff --git a/console-framework-client/src/main/java/io/axoniq/console/framework/messaging/extensions.kt b/console-framework-client/src/main/java/io/axoniq/console/framework/messaging/extensions.kt index 413e2e1..b31a493 100644 --- a/console-framework-client/src/main/java/io/axoniq/console/framework/messaging/extensions.kt +++ b/console-framework-client/src/main/java/io/axoniq/console/framework/messaging/extensions.kt @@ -55,15 +55,14 @@ fun Message<*>.toInformation() = MessageIdentifier( fun String.toSimpleName() = split(".").last() -fun UnitOfWork<*>.extractHandler(): HandlerStatisticsMetricIdentifier? = try { - val processingGroup = resources()[CONSOLE_PROCESSING_GROUP] as? String? +fun UnitOfWork<*>.extractHandler(declaringClassName: String, processingGroup: String?): HandlerStatisticsMetricIdentifier? = try { val isAggregate = message is CommandMessage<*> && isAggregateLifecycleActive() val isProcessor = processingGroup != null val component = when { isAggregate -> (AggregateLifecycle.describeCurrentScope() as AggregateScopeDescriptor).type - isProcessor -> processingGroup - else -> resources()[CONSOLE_DECLARING_CLASS] as String? + isProcessor -> processingGroup ?: AxoniqConsoleSpanFactory.currentSpan()?.processorName + else -> declaringClassName } val type = when { isAggregate -> HandlerType.Aggregate From 346f860cb066a923d0518328cd6d33a044044956 Mon Sep 17 00:00:00 2001 From: Mitchell Herrijgers Date: Wed, 10 Jul 2024 10:34:37 +0100 Subject: [PATCH 4/7] Add subscribing event processors to the report --- .../framework/api/eventProcessorApi.kt | 1 + .../eventprocessor/ProcessorReportCreator.kt | 41 +++++++++++++------ 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/eventProcessorApi.kt b/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/eventProcessorApi.kt index a7ac0f4..6fba2e2 100644 --- a/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/eventProcessorApi.kt +++ b/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/eventProcessorApi.kt @@ -25,6 +25,7 @@ data class ProcessorStatusReport( enum class ProcessorMode { POOLED, TRACKING, + SUBSCRIBING, UNKNOWN, } diff --git a/console-framework-client/src/main/java/io/axoniq/console/framework/eventprocessor/ProcessorReportCreator.kt b/console-framework-client/src/main/java/io/axoniq/console/framework/eventprocessor/ProcessorReportCreator.kt index e1b12f1..8aaaec3 100644 --- a/console-framework-client/src/main/java/io/axoniq/console/framework/eventprocessor/ProcessorReportCreator.kt +++ b/console-framework-client/src/main/java/io/axoniq/console/framework/eventprocessor/ProcessorReportCreator.kt @@ -24,6 +24,7 @@ import org.axonframework.eventhandling.* import org.axonframework.eventhandling.deadletter.DeadLetteringEventHandlerInvoker import org.axonframework.eventhandling.pooled.PooledStreamingEventProcessor import org.axonframework.messaging.deadletter.SequencedDeadLetterQueue + class ProcessorReportCreator( private val processingConfig: EventProcessingConfiguration, private val metricsRegistry: ProcessorMetricsRegistry, @@ -31,20 +32,34 @@ class ProcessorReportCreator( fun createReport() = ProcessorStatusReport( processingConfig.eventProcessors() - .filter { it.value is StreamingEventProcessor } .map { entry -> - val sep = entry.value as StreamingEventProcessor - ProcessorStatus( - entry.key, - entry.value.toProcessingGroupStatuses(), - sep.tokenStoreIdentifier, - sep.toType(), - sep.isRunning, - sep.isError, - sep.maxCapacity(), - sep.processingStatus().filterValues { !it.isErrorState }.size, - sep.processingStatus().map { (_, segment) -> segment.toStatus(entry.key) }, - ) + if(entry.value is StreamingEventProcessor) { + val sep = entry.value as StreamingEventProcessor + ProcessorStatus( + entry.key, + entry.value.toProcessingGroupStatuses(), + sep.tokenStoreIdentifier, + sep.toType(), + sep.isRunning, + sep.isError, + sep.maxCapacity(), + sep.processingStatus().filterValues { !it.isErrorState }.size, + sep.processingStatus().map { (_, segment) -> segment.toStatus(entry.key) }, + ) + } else { + val sep = entry.value as SubscribingEventProcessor + ProcessorStatus( + sep.name, + emptyList(), + "", + ProcessorMode.SUBSCRIBING, + sep.isRunning, + sep.isError, + 0, + 0, + emptyList(), + ) + } } ) From 0d628dd50ccdb60faf3ab2f80b8caf21d1506be3 Mon Sep 17 00:00:00 2001 From: Mitchell Herrijgers Date: Wed, 10 Jul 2024 10:35:32 +0100 Subject: [PATCH 5/7] Remove aggregate metrics from handler --- .../java/io/axoniq/console/framework/api/metrics/Metric.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/metrics/Metric.kt b/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/metrics/Metric.kt index cbe2424..7c9ee70 100644 --- a/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/metrics/Metric.kt +++ b/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/metrics/Metric.kt @@ -64,13 +64,13 @@ enum class PreconfiguredMetric( type = MetricType.TIMER, identifier = "aggregate_lock", description = "Time it took for a command to acquire a lock for the aggregate identifier", - targetTypes = listOf(AGGREGATE, HANDLER), + targetTypes = listOf(AGGREGATE), ), AGGREGATE_LOAD_TIME( type = MetricType.TIMER, identifier = "aggregate_load", description = "Time it took for a command to load the target aggregate", - targetTypes = listOf(AGGREGATE, HANDLER), + targetTypes = listOf(AGGREGATE), ), EVENT_COMMIT_TIME( type = MetricType.TIMER, From f86a365b5f60c04c8fa08fc12d136b3d48ecdc19 Mon Sep 17 00:00:00 2001 From: Mitchell Herrijgers Date: Mon, 15 Jul 2024 08:41:22 +0100 Subject: [PATCH 6/7] Add position to API --- .../framework/api/eventProcessorApi.kt | 64 ++++++++++--------- .../eventprocessor/ProcessorReportCreator.kt | 2 + 2 files changed, 35 insertions(+), 31 deletions(-) diff --git a/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/eventProcessorApi.kt b/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/eventProcessorApi.kt index 6fba2e2..72112b6 100644 --- a/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/eventProcessorApi.kt +++ b/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/eventProcessorApi.kt @@ -19,7 +19,7 @@ package io.axoniq.console.framework.api import java.time.Instant data class ProcessorStatusReport( - val processors: List + val processors: List ) enum class ProcessorMode { @@ -30,38 +30,40 @@ enum class ProcessorMode { } data class ProcessorStatus( - val name: String, - val processingGroups: List, - val tokenStoreIdentifier: String, - val mode: ProcessorMode, - val started: Boolean, - val error: Boolean, - val segmentCapacity: Int, - val activeSegments: Int, - val segments: List, + val name: String, + val processingGroups: List, + val tokenStoreIdentifier: String, + val mode: ProcessorMode, + val started: Boolean, + val error: Boolean, + val segmentCapacity: Int, + val activeSegments: Int, + val segments: List, ) data class ProcessingGroupStatus( - val name: String, - val dlqSize: Long?, + val name: String, + val dlqSize: Long?, ) data class SegmentStatus( - val segment: Int, - val mergeableSegment: Int, - val mask: Int = -1, - val oneOf: Int, - val caughtUp: Boolean, - val error: Boolean, - val errorType: String?, - val errorMessage: String?, - val ingestLatency: Double?, - val commitLatency: Double?, + val segment: Int, + val mergeableSegment: Int, + val mask: Int = -1, + val oneOf: Int, + val caughtUp: Boolean, + val error: Boolean, + val errorType: String?, + val errorMessage: String?, + val ingestLatency: Double?, + val commitLatency: Double?, + val position: Long? = -1, + val resetPosition: Long? = -1, ) data class ProcessorSegmentId( - val processorName: String, - val segmentId: Int + val processorName: String, + val segmentId: Int ) enum class ResetDecisions { @@ -69,18 +71,18 @@ enum class ResetDecisions { } data class ResetDecision( - val processorName: String, - val decision: ResetDecisions, - val from: Instant? = null + val processorName: String, + val decision: ResetDecisions, + val from: Instant? = null ) data class SegmentOverview( - val segments: List + val segments: List ) data class SegmentDetails( - val segment: Int, - val mergeableSegment: Int, - val mask: Int, + val segment: Int, + val mergeableSegment: Int, + val mask: Int, ) \ No newline at end of file diff --git a/console-framework-client/src/main/java/io/axoniq/console/framework/eventprocessor/ProcessorReportCreator.kt b/console-framework-client/src/main/java/io/axoniq/console/framework/eventprocessor/ProcessorReportCreator.kt index 8aaaec3..7e9d645 100644 --- a/console-framework-client/src/main/java/io/axoniq/console/framework/eventprocessor/ProcessorReportCreator.kt +++ b/console-framework-client/src/main/java/io/axoniq/console/framework/eventprocessor/ProcessorReportCreator.kt @@ -91,6 +91,8 @@ class ProcessorReportCreator( errorMessage = this.error?.message, ingestLatency = metricsRegistry.ingestLatencyForProcessor(name, this.segment.segmentId).getValue(), commitLatency = metricsRegistry.commitLatencyForProcessor(name, this.segment.segmentId).getValue(), + position = this.currentPosition?.orElse(-1) ?: -1, + resetPosition = this.resetPosition?.orElse(-1) ?: -1, ) private fun EventProcessor.toProcessingGroupStatuses(): List = From 644f183a5718d1375da8a0a50538d4b53593eb79 Mon Sep 17 00:00:00 2001 From: Mitchell Herrijgers Date: Mon, 15 Jul 2024 08:41:27 +0100 Subject: [PATCH 7/7] Revert "Remove aggregate metrics from handler" This reverts commit 0d628dd50ccdb60faf3ab2f80b8caf21d1506be3. --- .../java/io/axoniq/console/framework/api/metrics/Metric.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/metrics/Metric.kt b/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/metrics/Metric.kt index 7c9ee70..cbe2424 100644 --- a/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/metrics/Metric.kt +++ b/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/metrics/Metric.kt @@ -64,13 +64,13 @@ enum class PreconfiguredMetric( type = MetricType.TIMER, identifier = "aggregate_lock", description = "Time it took for a command to acquire a lock for the aggregate identifier", - targetTypes = listOf(AGGREGATE), + targetTypes = listOf(AGGREGATE, HANDLER), ), AGGREGATE_LOAD_TIME( type = MetricType.TIMER, identifier = "aggregate_load", description = "Time it took for a command to load the target aggregate", - targetTypes = listOf(AGGREGATE), + targetTypes = listOf(AGGREGATE, HANDLER), ), EVENT_COMMIT_TIME( type = MetricType.TIMER,