From a6e5b62d559813ac69d82488d1024950c3357956 Mon Sep 17 00:00:00 2001 From: Gerard Klijs Date: Tue, 13 Feb 2024 10:04:11 +0100 Subject: [PATCH] Send an approximate event store size in the setup payload. --- .../framework/api/clientIdentification.kt | 23 +- .../framework/client/SetupPayloadCreator.kt | 231 ++++++++++-------- 2 files changed, 137 insertions(+), 117 deletions(-) diff --git a/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/clientIdentification.kt b/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/clientIdentification.kt index 417fae1..2535680 100644 --- a/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/clientIdentification.kt +++ b/console-framework-client-api/src/main/java/io/axoniq/console/framework/api/clientIdentification.kt @@ -111,9 +111,9 @@ data class CommandBusInformation( val axonServer: Boolean, val localSegmentType: String?, val context: String?, - val handlerInterceptors: List = emptyList(), - val dispatchInterceptors: List = emptyList(), - val messageSerializer: io.axoniq.console.framework.api.SerializerInformation?, + val handlerInterceptors: List = emptyList(), + val dispatchInterceptors: List = emptyList(), + val messageSerializer: SerializerInformation?, ) data class QueryBusInformation( @@ -121,19 +121,20 @@ data class QueryBusInformation( val axonServer: Boolean, val localSegmentType: String?, val context: String?, - val handlerInterceptors: List = emptyList(), - val dispatchInterceptors: List = emptyList(), - val messageSerializer: io.axoniq.console.framework.api.SerializerInformation?, - val serializer: io.axoniq.console.framework.api.SerializerInformation?, + val handlerInterceptors: List = emptyList(), + val dispatchInterceptors: List = emptyList(), + val messageSerializer: SerializerInformation?, + val serializer: SerializerInformation?, ) data class EventStoreInformation( val type: String, val axonServer: Boolean, val context: String?, - val dispatchInterceptors: List = emptyList(), - val eventSerializer: io.axoniq.console.framework.api.SerializerInformation?, - val snapshotSerializer: io.axoniq.console.framework.api.SerializerInformation?, + val dispatchInterceptors: List = emptyList(), + val eventSerializer: SerializerInformation?, + val snapshotSerializer: SerializerInformation?, + val approximateSize: Long? = null, ) data class ProcessorInformation( @@ -147,7 +148,7 @@ data class ProcessorInformation( val tokenStoreClaimTimeout: Long, val errorHandler: String, val invocationErrorHandler: String, - val interceptors: List, + val interceptors: List, ) data class InterceptorInformation( diff --git a/console-framework-client/src/main/java/io/axoniq/console/framework/client/SetupPayloadCreator.kt b/console-framework-client/src/main/java/io/axoniq/console/framework/client/SetupPayloadCreator.kt index 9675c89..a67e1df 100644 --- a/console-framework-client/src/main/java/io/axoniq/console/framework/client/SetupPayloadCreator.kt +++ b/console-framework-client/src/main/java/io/axoniq/console/framework/client/SetupPayloadCreator.kt @@ -22,8 +22,7 @@ import org.axonframework.commandhandling.CommandBus import org.axonframework.common.ReflectionUtils import org.axonframework.config.Configuration import org.axonframework.config.EventProcessingModule -import org.axonframework.eventhandling.MultiStreamableMessageSource -import org.axonframework.eventhandling.StreamingEventProcessor +import org.axonframework.eventhandling.* import org.axonframework.eventhandling.tokenstore.TokenStore import org.axonframework.eventsourcing.eventstore.EventStore import org.axonframework.messaging.StreamableMessageSource @@ -34,94 +33,94 @@ import java.time.temporal.ChronoUnit import java.time.temporal.TemporalAmount class SetupPayloadCreator( - private val configuration: Configuration, + private val configuration: Configuration, ) { private val eventProcessingConfiguration = configuration.eventProcessingConfiguration() as EventProcessingModule fun createReport(): SetupPayload { val processors = eventProcessingConfiguration.eventProcessors() - .filter { it.value is StreamingEventProcessor } - .map { entry -> - entry.key - } + .filter { it.value is StreamingEventProcessor } + .map { entry -> + entry.key + } return SetupPayload( - commandBus = commandBusInformation(), - queryBus = queryBusInformation(), - eventStore = eventBusInformation(), - processors = processors.map { - val processor = - eventProcessingConfiguration.eventProcessor(it, StreamingEventProcessor::class.java).get() - ProcessorInformation( - name = it, - supportsReset = processor.supportsReset(), - batchSize = processor.getBatchSize(), - messageSourceType = processor.getMessageSource(), - tokenClaimInterval = processor.getTokenClaimInterval(), - tokenStoreClaimTimeout = processor.getStoreTokenClaimTimeout(), - errorHandler = eventProcessingConfiguration.errorHandler(it)::class.java.name, - invocationErrorHandler = eventProcessingConfiguration.listenerInvocationErrorHandler(it)::class.java.name, - interceptors = processor.getInterceptors("interceptors"), - tokenStoreType = processor.getStoreTokenStoreType(), - contexts = processor.contexts() + commandBus = commandBusInformation(), + queryBus = queryBusInformation(), + eventStore = eventBusInformation(), + processors = processors.map { + val processor = + eventProcessingConfiguration.eventProcessor(it, StreamingEventProcessor::class.java).get() + ProcessorInformation( + name = it, + supportsReset = processor.supportsReset(), + batchSize = processor.getBatchSize(), + messageSourceType = processor.getMessageSource(), + tokenClaimInterval = processor.getTokenClaimInterval(), + tokenStoreClaimTimeout = processor.getStoreTokenClaimTimeout(), + errorHandler = eventProcessingConfiguration.errorHandler(it)::class.java.name, + invocationErrorHandler = eventProcessingConfiguration.listenerInvocationErrorHandler(it)::class.java.name, + interceptors = processor.getInterceptors("interceptors"), + tokenStoreType = processor.getStoreTokenStoreType(), + contexts = processor.contexts() + ) + }, + versions = versionInformation(), + upcasters = upcasters(), + features = SupportedFeatures( + heartbeat = true ) - }, - versions = versionInformation(), - upcasters = upcasters(), - features = SupportedFeatures( - heartbeat = true - ) ) } private fun upcasters(): List { val upcasters = - configuration.upcasterChain().getPropertyValue>>("upcasters") ?: emptyList() + configuration.upcasterChain().getPropertyValue>>("upcasters") ?: emptyList() return upcasters.map { it::class.java.name } } private val dependenciesToCheck = listOf( - "org.axonframework:axon-messaging", - "org.axonframework:axon-configuration", - "org.axonframework:axon-disruptor", - "org.axonframework:axon-eventsourcing", - "org.axonframework:axon-legacy", - "org.axonframework:axon-metrics", - "org.axonframework:axon-micrometer", - "org.axonframework:axon-modelling", - "org.axonframework:axon-server-connector", - "org.axonframework:axon-spring", - "org.axonframework:axon-spring-boot-autoconfigure", - "org.axonframework:axon-spring-boot-starter", - "org.axonframework:axon-tracing-opentelemetry", - "org.axonframework.extensions.amqp:axon-amqp", - "org.axonframework.extensions.jgroups:axon-jgroups", - "org.axonframework.extensions.kafka:axon-kafka", - "org.axonframework.extensions.mongo:axon-mongo", - "org.axonframework.extensions.reactor:axon-reactor", - "org.axonframework.extensions.springcloud:axon-springcloud", - "org.axonframework.extensions.tracing:axon-tracing", - "io.axoniq:axonserver-connector-java", - "io.axoniq.console:console-framework-client", + "org.axonframework:axon-messaging", + "org.axonframework:axon-configuration", + "org.axonframework:axon-disruptor", + "org.axonframework:axon-eventsourcing", + "org.axonframework:axon-legacy", + "org.axonframework:axon-metrics", + "org.axonframework:axon-micrometer", + "org.axonframework:axon-modelling", + "org.axonframework:axon-server-connector", + "org.axonframework:axon-spring", + "org.axonframework:axon-spring-boot-autoconfigure", + "org.axonframework:axon-spring-boot-starter", + "org.axonframework:axon-tracing-opentelemetry", + "org.axonframework.extensions.amqp:axon-amqp", + "org.axonframework.extensions.jgroups:axon-jgroups", + "org.axonframework.extensions.kafka:axon-kafka", + "org.axonframework.extensions.mongo:axon-mongo", + "org.axonframework.extensions.reactor:axon-reactor", + "org.axonframework.extensions.springcloud:axon-springcloud", + "org.axonframework.extensions.tracing:axon-tracing", + "io.axoniq:axonserver-connector-java", + "io.axoniq.console:console-framework-client", ) private fun versionInformation(): Versions { return Versions( - frameworkVersion = resolveVersion("org.axonframework:axon-messaging") ?: "Unknown", - moduleVersions = dependenciesToCheck.map { - io.axoniq.console.framework.api.ModuleVersion( - it, - resolveVersion(it) - ) - } + frameworkVersion = resolveVersion("org.axonframework:axon-messaging") ?: "Unknown", + moduleVersions = dependenciesToCheck.map { + io.axoniq.console.framework.api.ModuleVersion( + it, + resolveVersion(it) + ) + } ) } private fun resolveVersion(dep: String): String? { val (groupId, artifactId) = dep.split(":") return MavenArtifactVersionResolver( - groupId, - artifactId, - this::class.java.classLoader + groupId, + artifactId, + this::class.java.classLoader ).get() } @@ -143,44 +142,64 @@ class SetupPayloadCreator( bus.getPropertyValue("serializer")?.getSerializerType("serializer") } else null return QueryBusInformation( - type = bus::class.java.name, - axonServer = axonServer, - localSegmentType = localSegmentType, - context = context, - handlerInterceptors = handlerInterceptors, - dispatchInterceptors = dispatchInterceptors, - messageSerializer = messageSerializer, - serializer = serializer, + type = bus::class.java.name, + axonServer = axonServer, + localSegmentType = localSegmentType, + context = context, + handlerInterceptors = handlerInterceptors, + dispatchInterceptors = dispatchInterceptors, + messageSerializer = messageSerializer, + serializer = serializer, ) } private fun eventBusInformation(): EventStoreInformation { val bus = configuration.eventBus().unwrapPossiblyDecoratedClass(EventStore::class.java) val axonServer = - bus::class.java.name == "org.axonframework.axonserver.connector.event.axon.AxonServerEventStore" + bus::class.java.name == "org.axonframework.axonserver.connector.event.axon.AxonServerEventStore" val context = if (axonServer) { bus.getPropertyValue("storageEngine")?.getPropertyValue("context") } else null val dispatchInterceptors = bus.getInterceptors("dispatchInterceptors") return EventStoreInformation( - type = bus::class.java.name, - axonServer = axonServer, - context = context, - dispatchInterceptors = dispatchInterceptors, - eventSerializer = bus.getPropertyValue("storageEngine")?.getSerializerType("eventSerializer"), - snapshotSerializer = bus.getPropertyValue("storageEngine")?.getSerializerType("snapshotSerializer"), + type = bus::class.java.name, + axonServer = axonServer, + context = context, + dispatchInterceptors = dispatchInterceptors, + eventSerializer = bus.getPropertyValue("storageEngine")?.getSerializerType("eventSerializer"), + snapshotSerializer = bus.getPropertyValue("storageEngine")?.getSerializerType("snapshotSerializer"), + approximateSize = getApproximateSize(bus) ) } + private fun getApproximateSize(bus: EventBus): Long? = + if (bus is StreamableMessageSource<*>) { + runCatching { + getSizeFromToken(bus.createHeadToken()) + }.getOrElse { null } + } else { + null + } + + private fun getSizeFromToken(token: TrackingToken): Long? = + when (token) { + is GlobalSequenceTrackingToken -> token.globalIndex + is GapAwareTrackingToken -> token.index + is MultiSourceTrackingToken -> token.trackingTokens.values.sumOf { + getSizeFromToken(it) ?: 0 + } + else -> null + } + private fun commandBusInformation(): CommandBusInformation { val bus = configuration.commandBus().unwrapPossiblyDecoratedClass(CommandBus::class.java) val axonServer = bus::class.java.name == "org.axonframework.axonserver.connector.command.AxonServerCommandBus" val localSegmentType = - if (axonServer) bus.getPropertyTypeNested("localSegment", CommandBus::class.java) else null + if (axonServer) bus.getPropertyTypeNested("localSegment", CommandBus::class.java) else null val context = if (axonServer) bus.getPropertyValue("context") else null val handlerInterceptors = if (axonServer) { bus.getPropertyValue("localSegment")?.getInterceptors("handlerInterceptors", "invokerInterceptors") - ?: emptyList() + ?: emptyList() } else { bus.getInterceptors("handlerInterceptors", "invokerInterceptors") } @@ -189,50 +208,50 @@ class SetupPayloadCreator( bus.getPropertyValue("serializer")?.getSerializerType("messageSerializer") } else null return CommandBusInformation( - type = bus::class.java.name, - axonServer = axonServer, - localSegmentType = localSegmentType, - context = context, - handlerInterceptors = handlerInterceptors, - dispatchInterceptors = dispatchInterceptors, - messageSerializer = serializer + type = bus::class.java.name, + axonServer = axonServer, + localSegmentType = localSegmentType, + context = context, + handlerInterceptors = handlerInterceptors, + dispatchInterceptors = dispatchInterceptors, + messageSerializer = serializer ) } private fun Any.getPropertyValue(fieldName: String): T? { val field = ReflectionUtils.fieldsOf(this::class.java).firstOrNull { it.name == fieldName } ?: return null return ReflectionUtils.getMemberValue( - field, - this + field, + this ) } private fun Any.getPropertyType(fieldName: String): String { return ReflectionUtils.getMemberValue( - ReflectionUtils.fieldsOf(this::class.java).first { it.name == fieldName }, - this + ReflectionUtils.fieldsOf(this::class.java).first { it.name == fieldName }, + this ).let { it::class.java.name } } private fun Any.getPropertyTypeNested(fieldName: String, clazz: Class): String { return ReflectionUtils.getMemberValue( - ReflectionUtils.fieldsOf(this::class.java).first { it.name == fieldName }, - this + ReflectionUtils.fieldsOf(this::class.java).first { it.name == fieldName }, + this ) - .let { it.unwrapPossiblyDecoratedClass(clazz) } - .let { it::class.java.name } + .let { it.unwrapPossiblyDecoratedClass(clazz) } + .let { it::class.java.name } } private fun StreamingEventProcessor.getBatchSize(): Int = getPropertyValue("batchSize") ?: -1 private fun StreamingEventProcessor.getMessageSource(): String = - getPropertyTypeNested("messageSource", EventStore::class.java) + getPropertyTypeNested("messageSource", EventStore::class.java) private fun StreamingEventProcessor.getTokenClaimInterval(): Long = getPropertyValue("tokenClaimInterval") ?: -1 private fun StreamingEventProcessor.getStoreTokenStoreType(): String = - getPropertyTypeNested("tokenStore", TokenStore::class.java) + getPropertyTypeNested("tokenStore", TokenStore::class.java) private fun StreamingEventProcessor.getStoreTokenClaimTimeout(): Long = getPropertyValue("tokenStore") - ?.getPropertyValue("claimTimeout")?.let { it.get(ChronoUnit.SECONDS) * 1000 } ?: -1 + ?.getPropertyValue("claimTimeout")?.let { it.get(ChronoUnit.SECONDS) * 1000 } ?: -1 private fun Any.getInterceptors(vararg fieldNames: String): List { @@ -245,13 +264,13 @@ class SetupPayloadCreator( return emptyList() } return interceptors - .filterNotNull() - .map { - if (it is AxoniqConsoleMeasuringHandlerInterceptor) { - InterceptorInformation(it.subject::class.java.name, true) - } else InterceptorInformation(it::class.java.name, false) - } - .filter { !it.type.startsWith("org.axonframework.eventhandling") } + .filterNotNull() + .map { + if (it is AxoniqConsoleMeasuringHandlerInterceptor) { + InterceptorInformation(it.subject::class.java.name, true) + } else InterceptorInformation(it::class.java.name, false) + } + .filter { !it.type.startsWith("org.axonframework.eventhandling") } } private fun Any.getSerializerType(fieldName: String): SerializerInformation? {