diff --git a/src/main/kotlin/adapter/in/AdminController.kt b/src/main/kotlin/adapter/in/AdminController.kt new file mode 100644 index 0000000..65c2813 --- /dev/null +++ b/src/main/kotlin/adapter/in/AdminController.kt @@ -0,0 +1,31 @@ +package io.holixon.example.axon.event.transformation.adapter.`in` + +import io.holixon.example.axon.event.transformation.adapter.out.event.MeasurementsUpdatedEvent +import io.holixon.example.axon.event.transformation.infrastructure.EventCleanup +import org.springframework.http.ResponseEntity +import org.springframework.http.ResponseEntity.noContent +import org.springframework.web.bind.annotation.PostMapping +import org.springframework.web.bind.annotation.RequestBody +import org.springframework.web.bind.annotation.RequestMapping +import org.springframework.web.bind.annotation.RestController +import java.time.Instant + +@RestController +@RequestMapping("/admin") +class AdminController( + val eventCleanup: EventCleanup +) { + + @PostMapping(value = ["/cleanup"]) + fun cleanup(@RequestBody deleteUntil: Instant): ResponseEntity { + eventCleanup.deleteEventsUntil(eventFQDN = MeasurementsUpdatedEvent::class.java.name, deleteUntil = deleteUntil) + return noContent().build() + } + + @PostMapping(value = ["/compact"]) + fun compact(): ResponseEntity { + eventCleanup.compact() + return noContent().build() + } + +} diff --git a/src/main/kotlin/adapter/in/MeasurementController.kt b/src/main/kotlin/adapter/in/MeasurementController.kt index d16fa6f..f2f9417 100644 --- a/src/main/kotlin/adapter/in/MeasurementController.kt +++ b/src/main/kotlin/adapter/in/MeasurementController.kt @@ -1,9 +1,6 @@ package io.holixon.example.axon.event.transformation.adapter.`in` -import io.holixon.example.axon.event.transformation.application.port.`in`.Measurement -import io.holixon.example.axon.event.transformation.application.port.`in`.RetrieveMeasurementsInPort -import io.holixon.example.axon.event.transformation.application.port.`in`.SensorMeasurements -import io.holixon.example.axon.event.transformation.application.port.`in`.UpdateMeasurementInPort +import io.holixon.example.axon.event.transformation.application.port.`in`.* import org.springframework.http.MediaType import org.springframework.http.ResponseEntity import org.springframework.http.ResponseEntity.noContent @@ -18,7 +15,7 @@ import java.time.ZoneOffset class MeasurementController( val updateMeasurementInPort: UpdateMeasurementInPort, val retrieveMeasurementsInPort: RetrieveMeasurementsInPort, - val batchUpdater: BatchUpdater + val generateRandomBatchUpdateInPort: GenerateRandomBatchUpdateInPort ) { @GetMapping(value = ["/sensor/{sensorId}"], produces = [MediaType.APPLICATION_JSON_VALUE]) @@ -59,7 +56,7 @@ class MeasurementController( @PostMapping(value = ["/batch"]) fun batchUpdate(): ResponseEntity { - batchUpdater.runBatchUpdate() + generateRandomBatchUpdateInPort.runBatchUpdate() return noContent().build() } } diff --git a/src/main/kotlin/application/port/in/GenerateRandomBatchUpdateInPort.kt b/src/main/kotlin/application/port/in/GenerateRandomBatchUpdateInPort.kt new file mode 100644 index 0000000..aec11d0 --- /dev/null +++ b/src/main/kotlin/application/port/in/GenerateRandomBatchUpdateInPort.kt @@ -0,0 +1,5 @@ +package io.holixon.example.axon.event.transformation.application.port.`in` + +interface GenerateRandomBatchUpdateInPort { + fun runBatchUpdate(sensorId: String = "temp1") +} diff --git a/src/main/kotlin/adapter/in/BatchUpdater.kt b/src/main/kotlin/application/usecase/GenerateRandomBatchUpdateUseCase.kt similarity index 68% rename from src/main/kotlin/adapter/in/BatchUpdater.kt rename to src/main/kotlin/application/usecase/GenerateRandomBatchUpdateUseCase.kt index d21da1c..484169c 100644 --- a/src/main/kotlin/adapter/in/BatchUpdater.kt +++ b/src/main/kotlin/application/usecase/GenerateRandomBatchUpdateUseCase.kt @@ -1,5 +1,6 @@ -package io.holixon.example.axon.event.transformation.adapter.`in` +package io.holixon.example.axon.event.transformation.application.usecase +import io.holixon.example.axon.event.transformation.application.port.`in`.GenerateRandomBatchUpdateInPort import io.holixon.example.axon.event.transformation.application.port.`in`.Measurement import io.holixon.example.axon.event.transformation.application.port.`in`.SensorMeasurements import io.holixon.example.axon.event.transformation.application.port.`in`.UpdateMeasurementInPort @@ -8,11 +9,11 @@ import java.time.LocalDate import kotlin.random.Random @Component -class BatchUpdater( +class GenerateRandomBatchUpdateUseCase( val updateMeasurementInPort: UpdateMeasurementInPort -) { +) : GenerateRandomBatchUpdateInPort { - fun runBatchUpdate(sensorId: String = "temp1") { + override fun runBatchUpdate(sensorId: String) { (0L..365L).forEach { offset -> val updateDate = LocalDate.now().plusDays(offset) generateForecast(sensorId, updateDate) @@ -24,12 +25,9 @@ class BatchUpdater( SensorMeasurements( sensorId = sensorId, values = (0L..365L).associate { forecastOffset -> - updateDate.plusDays(forecastOffset) to Measurement(randomFloat(), "°C") + updateDate.plusDays(forecastOffset) to Measurement(Random.nextFloat() * 30, "°C") } ) ) - } - - private fun randomFloat(): Float = Random.nextFloat() * 30 } diff --git a/src/main/kotlin/infrastructure/AutoClosableAxonServerConnection.kt b/src/main/kotlin/infrastructure/AutoClosableAxonServerConnection.kt new file mode 100644 index 0000000..9063746 --- /dev/null +++ b/src/main/kotlin/infrastructure/AutoClosableAxonServerConnection.kt @@ -0,0 +1,32 @@ +package io.holixon.example.axon.event.transformation.infrastructure + +import io.axoniq.axonserver.connector.AxonServerConnection +import io.axoniq.axonserver.connector.AxonServerConnectionFactory +import io.axoniq.axonserver.connector.impl.ServerAddress +import mu.KLogging +import java.io.Closeable + +/** + * Auto-closable version of AxonServer Connection. + */ +class AutoClosableAxonServerConnection( + private val connection: AxonServerConnection +) : Closeable, AxonServerConnection by connection { + + companion object : KLogging() { + fun connect(axonServerHost: String, axonServerPort: Int = 8124, componentName: String, context: String): AutoClosableAxonServerConnection { + val factory = AxonServerConnectionFactory.forClient(componentName) + .routingServers(ServerAddress(axonServerHost, axonServerPort)) + .build() + val connection = factory.connect(context) + logger.info { "Connected to $axonServerHost:$axonServerPort@$context" } + return AutoClosableAxonServerConnection(connection) + } + } + + override fun close() { + if (connection.isConnected) { + connection.disconnect() + } + } +} diff --git a/src/main/kotlin/infrastructure/EventCleanup.kt b/src/main/kotlin/infrastructure/EventCleanup.kt new file mode 100644 index 0000000..d32dbf2 --- /dev/null +++ b/src/main/kotlin/infrastructure/EventCleanup.kt @@ -0,0 +1,74 @@ +package io.holixon.example.axon.event.transformation.infrastructure + +import io.axoniq.axonserver.connector.AxonServerConnection +import io.axoniq.axonserver.connector.event.transformation.EventTransformation +import io.axoniq.axonserver.connector.event.transformation.event.EventSources +import mu.KLogging +import org.axonframework.axonserver.connector.AxonServerConfiguration +import org.springframework.stereotype.Component +import java.time.Instant + +@Component +class EventCleanup( + axonServerConfiguration: AxonServerConfiguration +) { + + private val connect = axonServerConfiguration.routingServers().first() + private val context = axonServerConfiguration.context + + companion object : KLogging() + + fun deleteEventsUntil(eventFQDN: String, deleteUntil: Instant, firstToken: Long = 0, lastToken: Long = -1L) { + AutoClosableAxonServerConnection + .connect(connect.hostName, connect.grpcPort, this.javaClass.simpleName, this.context).use { connection -> + + // define the range of events + val last = if (lastToken == -1L) { + connection.eventChannel().lastToken.get() + } else { + lastToken + } + + try { + EventSources + .range({ connection.eventChannel() }, firstToken, last) + .filter { eventWithToken -> + logger.debug { "Event: ${eventWithToken.event.payload.type}" } + eventWithToken.event.payload.type == eventFQDN && eventWithToken.event.timestamp < deleteUntil.toEpochMilli() + } + .transform("Deleting events until $deleteUntil") { event, appender -> appender.deleteEvent(event.token) } + .execute { connection.eventTransformationChannel() } + .get() + } catch (e: Exception) { + logger.error(e) { "Error executing event transformation" } + } finally { + connection.ensureNoActiveTransformations() + } + } + } + + fun compact() { + AutoClosableAxonServerConnection + .connect(connect.hostName, connect.grpcPort, this.javaClass.simpleName, this.context).use { connection -> + connection.ensureNoActiveTransformations() + connection + .eventTransformationChannel() + .startCompacting() + .get() + } + } + + fun AxonServerConnection.ensureNoActiveTransformations() { + this.eventTransformationChannel().transformations().get().forEach { + logger.debug { "Found event transformation: ${it.id()}, ${it.description()}, ${it.state()}, ${it.lastSequence()}" } + if (it.state() == EventTransformation.State.ACTIVE) { + // cancelling the active one + this.eventTransformationChannel().activeTransformation().get().cancel().get() + logger.warn { "Cancelled active transformation ${it.id()}, ${it.description()}, ${it.state()}, ${it.lastSequence()}" } + } + } + } + +} + +