diff --git a/README.md b/README.md index 873f380..06ab941 100644 --- a/README.md +++ b/README.md @@ -5,17 +5,44 @@ [![sponsored](https://img.shields.io/badge/sponsoredBy-Holisticon-RED.svg)](https://holisticon.de/) [![Maven Central](https://maven-badges.herokuapp.com/maven-central/io.holixon.example.axon-event-transformation/axon-event-transformation-example/badge.svg)](https://maven-badges.herokuapp.com/maven-central/io.holixon.example.axon-event-transformation/axon-event-transformation-example) +## Into + +The example demonstrates the usage of the event store transformation API and focuses on deletion of "stale" events. + +## Use case + +Consider a system storing temperature forecast delivered by some external system. The forecast is generated periodically and the system is representing the actual forecast. +Apart from the historical information (which can be for example used to tune the forecast model), the system holds current (= latest) forecast and can be queried for the +weather on a certain date. After the delivery of the new forecast, the previous forecast is considered as "stale" and doesn't need to be stored. + +## The API + +The system allows to insert a new temperature value at a single date and query temperature on certain date. For *simulation purposes* the system provides a fake endpoint +for generation of an entire forecast for the next year (taking random values for the temperature). + +## The mission + +Try to use event store transformation API to delete stale events and reduce the size of the event store - both for purposes of disk usage and the amount of events used on replays. + ## Experiment +- A sensor is created (1 event) +- The first batch of 365 updates (starting from today for the next year) for 365 days from update date to the future is sent +- The second batch of 365 updates (starting from today for the next year) for 365 days from update date to the future is sent +- All 365 events from the first batch are deleted + +### Preparation + +- Axon server is configured to use small segments. Size of event file set to 1000000 bytes (977kb). + ### Setup -- Size of event file set to 1000000 bytes (977kb). - We consider only event storage (no snapshot files, no nindex files, no global-index-00000.xref) -- The size of the snapshot file can be set-up via property +- The size of the snapshot file can be set-up via property separately. - The size of the index file seems to be 2097152 bytes (approx 2mb) for event segment files of default size (256mb) and for a drastically reduced event segment size of 977k. -- The size of global-index-00000.xref is 64mb - -Event payload: +- The size of global-index-00000.xref is 64mb. +- Events use Jackson serializer storing payload as JSON. +- Event payload is then represented like this: ```json { @@ -29,12 +56,13 @@ Event payload: "value": 23.188648, "unit": "°C" }, + ... another 363 values for dates } } ``` -A single JSON payload consumes 16415 bytes in total (netto) -Based on file names of event files (00000000000000000175.events, 00000000000000000233.events) 58 update events match into a 977kb file resulting in 16845 bytes (brutto). +- A single JSON payload consumes 16415 bytes in total (netto). +- Based on file names of event files (00000000000000000175.events, 00000000000000000233.events) 58 update events match into a 977kb file resulting in 16845 bytes (brutto). 356 event consume 16845 * 365 = 6148425 brutto (splitting), resulting in not quite full 7 segments per batch run. @@ -107,3 +135,14 @@ Based on file names of event files (00000000000000000175.events, 000000000000000 977K Jan 17 22:48 00000000000000000639.events 977K Jan 17 22:48 00000000000000000697.events +## How to run + +- Start Axon Server via `docker-compose up` +- Build application with mvn (`./mvnw clean package`) +- Start application `java -jar target/axon-event-transformation-example-0.0.1-SNAPSHOT.jar` and check the console [Admin console](http://localhost:8024) +- Open browser and navigate to [Swagger UI](http://localhost:8080/swagger-ui/index.html) +- Run a batch update (HTTP POST to `/measurements/batch`) and check files in `.docker/events/default` +- Run a batch update for the second time (HTTP POST to `/measurements/batch`) and check files in `.docker/events/default` +- Copy the `start` value of from the HTTP response of the second batch run +- Run cleanup, by providing the value from the previous step as a string payload (HTTP POST to `/admin/cleanup`) and check files in `.docker/events/default` +- Run compact (HTTP POST to `/admin/compact`) and check files in `.docker/events/default` diff --git a/pom.xml b/pom.xml index b6c1d74..7cfeebb 100644 --- a/pom.xml +++ b/pom.xml @@ -20,6 +20,7 @@ 5.2.1 3.24.2 + 3.1.2 @@ -27,7 +28,7 @@ org.springframework.boot spring-boot-dependencies - 3.1.2 + ${spring-boot.version} import pom @@ -139,6 +140,19 @@ org.apache.maven.plugins maven-enforcer-plugin + + org.springframework.boot + spring-boot-maven-plugin + ${spring-boot.version} + + + package + + repackage + + + + diff --git a/src/main/kotlin/SensorApplication.kt b/src/main/kotlin/SensorApplication.kt index 5e33d37..ffc6df9 100644 --- a/src/main/kotlin/SensorApplication.kt +++ b/src/main/kotlin/SensorApplication.kt @@ -4,6 +4,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.runApplication fun main(args: Array) { + System.setProperty("disable-axoniq-console-message", "true") runApplication(*args).let { Unit } } diff --git a/src/main/kotlin/application/usecase/GenerateRandomBatchUpdateUseCase.kt b/src/main/kotlin/application/usecase/GenerateRandomBatchUpdateUseCase.kt index 484169c..913fb59 100644 --- a/src/main/kotlin/application/usecase/GenerateRandomBatchUpdateUseCase.kt +++ b/src/main/kotlin/application/usecase/GenerateRandomBatchUpdateUseCase.kt @@ -4,7 +4,9 @@ import io.holixon.example.axon.event.transformation.application.port.`in`.Genera import io.holixon.example.axon.event.transformation.application.port.`in`.Measurement import io.holixon.example.axon.event.transformation.application.port.`in`.SensorMeasurements import io.holixon.example.axon.event.transformation.application.port.`in`.UpdateMeasurementInPort +import mu.KLogging import org.springframework.stereotype.Component +import java.time.Instant import java.time.LocalDate import kotlin.random.Random @@ -13,11 +15,15 @@ class GenerateRandomBatchUpdateUseCase( val updateMeasurementInPort: UpdateMeasurementInPort ) : GenerateRandomBatchUpdateInPort { + companion object: KLogging() + override fun runBatchUpdate(sensorId: String) { + logger.info { "Starting update batch..." } (0L..365L).forEach { offset -> val updateDate = LocalDate.now().plusDays(offset) generateForecast(sensorId, updateDate) } + logger.info { "Batch update complete." } } fun generateForecast(sensorId: String, updateDate: LocalDate) { diff --git a/src/main/kotlin/infrastructure/EventCleanup.kt b/src/main/kotlin/infrastructure/EventCleanup.kt index b0f7124..fe4a0b9 100644 --- a/src/main/kotlin/infrastructure/EventCleanup.kt +++ b/src/main/kotlin/infrastructure/EventCleanup.kt @@ -22,6 +22,7 @@ class EventCleanup( companion object : KLogging() fun deleteEventsUntil(eventFQDN: String, deleteUntil: Instant, firstToken: Long = 0, lastToken: Long = -1L) { + logger.info { "Deleting all events of type $eventFQDN older than $deleteUntil..." } AutoClosableAxonServerConnection .connect(connect.hostName, connect.grpcPort, this.javaClass.simpleName, this.context).use { connection -> @@ -50,9 +51,11 @@ class EventCleanup( connection.ensureNoActiveTransformations() } } + logger.info { "Deleting complete." } } fun compact() { + logger.info { "Compacting event store..." } AutoClosableAxonServerConnection .connect(connect.hostName, connect.grpcPort, this.javaClass.simpleName, this.context).use { connection -> connection.ensureNoActiveTransformations() @@ -61,6 +64,7 @@ class EventCleanup( .startCompacting() .get() } + logger.info { "Compacting complete." } } fun AxonServerConnection.ensureNoActiveTransformations() {