Skip to content

Commit

Permalink
Fix getting the aggregate size. This will help getting the size when …
Browse files Browse the repository at this point in the history
…either snapshotting or caching is used.
  • Loading branch information
Gerard Klijs committed Feb 12, 2024
1 parent ba7d659 commit f3b35a9
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023. AxonIQ B.V.
* Copyright (c) 2022-2024. AxonIQ B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@ import org.axonframework.config.ConfigurerModule
import org.axonframework.eventsourcing.EventSourcingRepository
import org.axonframework.eventsourcing.eventstore.EventStore
import org.axonframework.modelling.command.Repository
import kotlin.reflect.full.superclasses

class AxoniqConsoleAggregateConfigurerModule : ConfigurerModule {
override fun configureModule(configurer: Configurer) {
Expand All @@ -32,19 +33,24 @@ class AxoniqConsoleAggregateConfigurerModule : ConfigurerModule {
it.findModules(AggregateConfiguration::class.java).forEach { ac ->
val repo = ac.repository().unwrapPossiblyDecoratedClass(Repository::class.java)
if (repo is EventSourcingRepository) {
val field =
ReflectionUtils.fieldsOf(repo::class.java).firstOrNull { f -> f.name == "eventStore" }
if (field != null) {
val current = ReflectionUtils.getFieldValue<EventStore>(field, repo)
if (current !is AxoniqConsoleWrappedEventStore) {
ReflectionUtils.setFieldValue(field, repo, AxoniqConsoleWrappedEventStore(current))
}
wrapIfPresentAndNotWrapped(repo, repo::class.java)
repo::class.superclasses.forEach { superClass ->
wrapIfPresentAndNotWrapped(repo, superClass.java)
}
}
}
}
}
}


private fun wrapIfPresentAndNotWrapped(repo: EventSourcingRepository<*>, clazz: Class<*>) {
val field =
ReflectionUtils.fieldsOf(clazz).firstOrNull { f -> f.name == "eventStore" }
if (field != null) {
val current = ReflectionUtils.getFieldValue<EventStore>(field, repo)
if (current !is AxoniqConsoleWrappedEventStore) {
ReflectionUtils.setFieldValue(field, repo, AxoniqConsoleWrappedEventStore(current))
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023. AxonIQ B.V.
* Copyright (c) 2022-2024. AxonIQ B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -54,12 +54,22 @@ class AxoniqConsoleWrappedEventStore(
return delegate.openStream(trackingToken)
}

override fun readEvents(p0: String): DomainEventStream {
val result = delegate.readEvents(p0)
override fun readEvents(aggregateIdentifier: String): DomainEventStream {
val result = delegate.readEvents(aggregateIdentifier)
val events = result.asStream().map { it }.collect(Collectors.toList())
onTopLevelSpanIfActive {
it.registerMetricValue(PreconfiguredMetric.AGGREGATE_EVENTS_SIZE, events.size.toLong())
}
return DomainEventStream.of(events)
}

override fun readEvents(aggregateIdentifier: String, firstSequenceNumber: Long): DomainEventStream {
val result = delegate.readEvents(aggregateIdentifier, firstSequenceNumber)
val events = result.asStream().map { it }.collect(Collectors.toList())
val size = events.lastOrNull()?.sequenceNumber?.plus(1) ?: 0
onTopLevelSpanIfActive {
it.registerMetricValue(PreconfiguredMetric.AGGREGATE_EVENTS_SIZE, size)
}
return DomainEventStream.of(events)
}
}

0 comments on commit f3b35a9

Please sign in to comment.