Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send an approximate event store size in the setup payload. #49

Merged
merged 1 commit into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,29 +111,30 @@ data class CommandBusInformation(
val axonServer: Boolean,
val localSegmentType: String?,
val context: String?,
val handlerInterceptors: List<io.axoniq.console.framework.api.InterceptorInformation> = emptyList(),
val dispatchInterceptors: List<io.axoniq.console.framework.api.InterceptorInformation> = emptyList(),
val messageSerializer: io.axoniq.console.framework.api.SerializerInformation?,
val handlerInterceptors: List<InterceptorInformation> = emptyList(),
val dispatchInterceptors: List<InterceptorInformation> = emptyList(),
val messageSerializer: SerializerInformation?,
)

data class QueryBusInformation(
val type: String,
val axonServer: Boolean,
val localSegmentType: String?,
val context: String?,
val handlerInterceptors: List<io.axoniq.console.framework.api.InterceptorInformation> = emptyList(),
val dispatchInterceptors: List<io.axoniq.console.framework.api.InterceptorInformation> = emptyList(),
val messageSerializer: io.axoniq.console.framework.api.SerializerInformation?,
val serializer: io.axoniq.console.framework.api.SerializerInformation?,
val handlerInterceptors: List<InterceptorInformation> = emptyList(),
val dispatchInterceptors: List<InterceptorInformation> = emptyList(),
val messageSerializer: SerializerInformation?,
val serializer: SerializerInformation?,
)

data class EventStoreInformation(
val type: String,
val axonServer: Boolean,
val context: String?,
val dispatchInterceptors: List<io.axoniq.console.framework.api.InterceptorInformation> = emptyList(),
val eventSerializer: io.axoniq.console.framework.api.SerializerInformation?,
val snapshotSerializer: io.axoniq.console.framework.api.SerializerInformation?,
val dispatchInterceptors: List<InterceptorInformation> = emptyList(),
val eventSerializer: SerializerInformation?,
val snapshotSerializer: SerializerInformation?,
val approximateSize: Long? = null,
)

data class ProcessorInformation(
Expand All @@ -147,7 +148,7 @@ data class ProcessorInformation(
val tokenStoreClaimTimeout: Long,
val errorHandler: String,
val invocationErrorHandler: String,
val interceptors: List<io.axoniq.console.framework.api.InterceptorInformation>,
val interceptors: List<InterceptorInformation>,
)

data class InterceptorInformation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import org.axonframework.commandhandling.CommandBus
import org.axonframework.common.ReflectionUtils
import org.axonframework.config.Configuration
import org.axonframework.config.EventProcessingModule
import org.axonframework.eventhandling.MultiStreamableMessageSource
import org.axonframework.eventhandling.StreamingEventProcessor
import org.axonframework.eventhandling.*
import org.axonframework.eventhandling.tokenstore.TokenStore
import org.axonframework.eventsourcing.eventstore.EventStore
import org.axonframework.messaging.StreamableMessageSource
Expand All @@ -34,94 +33,94 @@ import java.time.temporal.ChronoUnit
import java.time.temporal.TemporalAmount

class SetupPayloadCreator(
private val configuration: Configuration,
private val configuration: Configuration,
) {
private val eventProcessingConfiguration = configuration.eventProcessingConfiguration() as EventProcessingModule

fun createReport(): SetupPayload {
val processors = eventProcessingConfiguration.eventProcessors()
.filter { it.value is StreamingEventProcessor }
.map { entry ->
entry.key
}
.filter { it.value is StreamingEventProcessor }
.map { entry ->
entry.key
}
return SetupPayload(
commandBus = commandBusInformation(),
queryBus = queryBusInformation(),
eventStore = eventBusInformation(),
processors = processors.map {
val processor =
eventProcessingConfiguration.eventProcessor(it, StreamingEventProcessor::class.java).get()
ProcessorInformation(
name = it,
supportsReset = processor.supportsReset(),
batchSize = processor.getBatchSize(),
messageSourceType = processor.getMessageSource(),
tokenClaimInterval = processor.getTokenClaimInterval(),
tokenStoreClaimTimeout = processor.getStoreTokenClaimTimeout(),
errorHandler = eventProcessingConfiguration.errorHandler(it)::class.java.name,
invocationErrorHandler = eventProcessingConfiguration.listenerInvocationErrorHandler(it)::class.java.name,
interceptors = processor.getInterceptors("interceptors"),
tokenStoreType = processor.getStoreTokenStoreType(),
contexts = processor.contexts()
commandBus = commandBusInformation(),
queryBus = queryBusInformation(),
eventStore = eventBusInformation(),
processors = processors.map {
val processor =
eventProcessingConfiguration.eventProcessor(it, StreamingEventProcessor::class.java).get()
ProcessorInformation(
name = it,
supportsReset = processor.supportsReset(),
batchSize = processor.getBatchSize(),
messageSourceType = processor.getMessageSource(),
tokenClaimInterval = processor.getTokenClaimInterval(),
tokenStoreClaimTimeout = processor.getStoreTokenClaimTimeout(),
errorHandler = eventProcessingConfiguration.errorHandler(it)::class.java.name,
invocationErrorHandler = eventProcessingConfiguration.listenerInvocationErrorHandler(it)::class.java.name,
interceptors = processor.getInterceptors("interceptors"),
tokenStoreType = processor.getStoreTokenStoreType(),
contexts = processor.contexts()
)
},
versions = versionInformation(),
upcasters = upcasters(),
features = SupportedFeatures(
heartbeat = true
)
},
versions = versionInformation(),
upcasters = upcasters(),
features = SupportedFeatures(
heartbeat = true
)
)
}

private fun upcasters(): List<String> {
val upcasters =
configuration.upcasterChain().getPropertyValue<List<out Upcaster<*>>>("upcasters") ?: emptyList()
configuration.upcasterChain().getPropertyValue<List<out Upcaster<*>>>("upcasters") ?: emptyList()
return upcasters.map { it::class.java.name }
}

private val dependenciesToCheck = listOf(
"org.axonframework:axon-messaging",
"org.axonframework:axon-configuration",
"org.axonframework:axon-disruptor",
"org.axonframework:axon-eventsourcing",
"org.axonframework:axon-legacy",
"org.axonframework:axon-metrics",
"org.axonframework:axon-micrometer",
"org.axonframework:axon-modelling",
"org.axonframework:axon-server-connector",
"org.axonframework:axon-spring",
"org.axonframework:axon-spring-boot-autoconfigure",
"org.axonframework:axon-spring-boot-starter",
"org.axonframework:axon-tracing-opentelemetry",
"org.axonframework.extensions.amqp:axon-amqp",
"org.axonframework.extensions.jgroups:axon-jgroups",
"org.axonframework.extensions.kafka:axon-kafka",
"org.axonframework.extensions.mongo:axon-mongo",
"org.axonframework.extensions.reactor:axon-reactor",
"org.axonframework.extensions.springcloud:axon-springcloud",
"org.axonframework.extensions.tracing:axon-tracing",
"io.axoniq:axonserver-connector-java",
"io.axoniq.console:console-framework-client",
"org.axonframework:axon-messaging",
"org.axonframework:axon-configuration",
"org.axonframework:axon-disruptor",
"org.axonframework:axon-eventsourcing",
"org.axonframework:axon-legacy",
"org.axonframework:axon-metrics",
"org.axonframework:axon-micrometer",
"org.axonframework:axon-modelling",
"org.axonframework:axon-server-connector",
"org.axonframework:axon-spring",
"org.axonframework:axon-spring-boot-autoconfigure",
"org.axonframework:axon-spring-boot-starter",
"org.axonframework:axon-tracing-opentelemetry",
"org.axonframework.extensions.amqp:axon-amqp",
"org.axonframework.extensions.jgroups:axon-jgroups",
"org.axonframework.extensions.kafka:axon-kafka",
"org.axonframework.extensions.mongo:axon-mongo",
"org.axonframework.extensions.reactor:axon-reactor",
"org.axonframework.extensions.springcloud:axon-springcloud",
"org.axonframework.extensions.tracing:axon-tracing",
"io.axoniq:axonserver-connector-java",
"io.axoniq.console:console-framework-client",
)

private fun versionInformation(): Versions {
return Versions(
frameworkVersion = resolveVersion("org.axonframework:axon-messaging") ?: "Unknown",
moduleVersions = dependenciesToCheck.map {
io.axoniq.console.framework.api.ModuleVersion(
it,
resolveVersion(it)
)
}
frameworkVersion = resolveVersion("org.axonframework:axon-messaging") ?: "Unknown",
moduleVersions = dependenciesToCheck.map {
io.axoniq.console.framework.api.ModuleVersion(
it,
resolveVersion(it)
)
}
)
}

private fun resolveVersion(dep: String): String? {
val (groupId, artifactId) = dep.split(":")
return MavenArtifactVersionResolver(
groupId,
artifactId,
this::class.java.classLoader
groupId,
artifactId,
this::class.java.classLoader
).get()
}

Expand All @@ -143,44 +142,64 @@ class SetupPayloadCreator(
bus.getPropertyValue<Any>("serializer")?.getSerializerType("serializer")
} else null
return QueryBusInformation(
type = bus::class.java.name,
axonServer = axonServer,
localSegmentType = localSegmentType,
context = context,
handlerInterceptors = handlerInterceptors,
dispatchInterceptors = dispatchInterceptors,
messageSerializer = messageSerializer,
serializer = serializer,
type = bus::class.java.name,
axonServer = axonServer,
localSegmentType = localSegmentType,
context = context,
handlerInterceptors = handlerInterceptors,
dispatchInterceptors = dispatchInterceptors,
messageSerializer = messageSerializer,
serializer = serializer,
)
}

private fun eventBusInformation(): EventStoreInformation {
val bus = configuration.eventBus().unwrapPossiblyDecoratedClass(EventStore::class.java)
val axonServer =
bus::class.java.name == "org.axonframework.axonserver.connector.event.axon.AxonServerEventStore"
bus::class.java.name == "org.axonframework.axonserver.connector.event.axon.AxonServerEventStore"
val context = if (axonServer) {
bus.getPropertyValue<Any>("storageEngine")?.getPropertyValue<String>("context")
} else null
val dispatchInterceptors = bus.getInterceptors("dispatchInterceptors")
return EventStoreInformation(
type = bus::class.java.name,
axonServer = axonServer,
context = context,
dispatchInterceptors = dispatchInterceptors,
eventSerializer = bus.getPropertyValue<Any>("storageEngine")?.getSerializerType("eventSerializer"),
snapshotSerializer = bus.getPropertyValue<Any>("storageEngine")?.getSerializerType("snapshotSerializer"),
type = bus::class.java.name,
axonServer = axonServer,
context = context,
dispatchInterceptors = dispatchInterceptors,
eventSerializer = bus.getPropertyValue<Any>("storageEngine")?.getSerializerType("eventSerializer"),
snapshotSerializer = bus.getPropertyValue<Any>("storageEngine")?.getSerializerType("snapshotSerializer"),
approximateSize = getApproximateSize(bus)
)
}

private fun getApproximateSize(bus: EventBus): Long? =
if (bus is StreamableMessageSource<*>) {
runCatching {
getSizeFromToken(bus.createHeadToken())
}.getOrElse { null }
} else {
null
}

private fun getSizeFromToken(token: TrackingToken): Long? =
when (token) {
is GlobalSequenceTrackingToken -> token.globalIndex
is GapAwareTrackingToken -> token.index
is MultiSourceTrackingToken -> token.trackingTokens.values.sumOf {
getSizeFromToken(it) ?: 0
}
else -> null
}
gklijs marked this conversation as resolved.
Show resolved Hide resolved

private fun commandBusInformation(): CommandBusInformation {
val bus = configuration.commandBus().unwrapPossiblyDecoratedClass(CommandBus::class.java)
val axonServer = bus::class.java.name == "org.axonframework.axonserver.connector.command.AxonServerCommandBus"
val localSegmentType =
if (axonServer) bus.getPropertyTypeNested("localSegment", CommandBus::class.java) else null
if (axonServer) bus.getPropertyTypeNested("localSegment", CommandBus::class.java) else null
val context = if (axonServer) bus.getPropertyValue<String>("context") else null
val handlerInterceptors = if (axonServer) {
bus.getPropertyValue<Any>("localSegment")?.getInterceptors("handlerInterceptors", "invokerInterceptors")
?: emptyList()
?: emptyList()
} else {
bus.getInterceptors("handlerInterceptors", "invokerInterceptors")
}
Expand All @@ -189,50 +208,50 @@ class SetupPayloadCreator(
bus.getPropertyValue<Any>("serializer")?.getSerializerType("messageSerializer")
} else null
return CommandBusInformation(
type = bus::class.java.name,
axonServer = axonServer,
localSegmentType = localSegmentType,
context = context,
handlerInterceptors = handlerInterceptors,
dispatchInterceptors = dispatchInterceptors,
messageSerializer = serializer
type = bus::class.java.name,
axonServer = axonServer,
localSegmentType = localSegmentType,
context = context,
handlerInterceptors = handlerInterceptors,
dispatchInterceptors = dispatchInterceptors,
messageSerializer = serializer
)
}

private fun <T> Any.getPropertyValue(fieldName: String): T? {
val field = ReflectionUtils.fieldsOf(this::class.java).firstOrNull { it.name == fieldName } ?: return null
return ReflectionUtils.getMemberValue(
field,
this
field,
this
)
}

private fun Any.getPropertyType(fieldName: String): String {
return ReflectionUtils.getMemberValue<Any>(
ReflectionUtils.fieldsOf(this::class.java).first { it.name == fieldName },
this
ReflectionUtils.fieldsOf(this::class.java).first { it.name == fieldName },
this
).let { it::class.java.name }
}

private fun Any.getPropertyTypeNested(fieldName: String, clazz: Class<out Any>): String {
return ReflectionUtils.getMemberValue<Any>(
ReflectionUtils.fieldsOf(this::class.java).first { it.name == fieldName },
this
ReflectionUtils.fieldsOf(this::class.java).first { it.name == fieldName },
this
)
.let { it.unwrapPossiblyDecoratedClass(clazz) }
.let { it::class.java.name }
.let { it.unwrapPossiblyDecoratedClass(clazz) }
.let { it::class.java.name }
}

private fun StreamingEventProcessor.getBatchSize(): Int = getPropertyValue("batchSize") ?: -1
private fun StreamingEventProcessor.getMessageSource(): String =
getPropertyTypeNested("messageSource", EventStore::class.java)
getPropertyTypeNested("messageSource", EventStore::class.java)

private fun StreamingEventProcessor.getTokenClaimInterval(): Long = getPropertyValue("tokenClaimInterval") ?: -1
private fun StreamingEventProcessor.getStoreTokenStoreType(): String =
getPropertyTypeNested("tokenStore", TokenStore::class.java)
getPropertyTypeNested("tokenStore", TokenStore::class.java)

private fun StreamingEventProcessor.getStoreTokenClaimTimeout(): Long = getPropertyValue<Any>("tokenStore")
?.getPropertyValue<TemporalAmount>("claimTimeout")?.let { it.get(ChronoUnit.SECONDS) * 1000 } ?: -1
?.getPropertyValue<TemporalAmount>("claimTimeout")?.let { it.get(ChronoUnit.SECONDS) * 1000 } ?: -1


private fun Any.getInterceptors(vararg fieldNames: String): List<InterceptorInformation> {
Expand All @@ -245,13 +264,13 @@ class SetupPayloadCreator(
return emptyList()
}
return interceptors
.filterNotNull()
.map {
if (it is AxoniqConsoleMeasuringHandlerInterceptor) {
InterceptorInformation(it.subject::class.java.name, true)
} else InterceptorInformation(it::class.java.name, false)
}
.filter { !it.type.startsWith("org.axonframework.eventhandling") }
.filterNotNull()
.map {
if (it is AxoniqConsoleMeasuringHandlerInterceptor) {
InterceptorInformation(it.subject::class.java.name, true)
} else InterceptorInformation(it::class.java.name, false)
}
.filter { !it.type.startsWith("org.axonframework.eventhandling") }
}

private fun Any.getSerializerType(fieldName: String): SerializerInformation? {
Expand Down