Skip to content

Commit

Permalink
add event stre transformations
Browse files Browse the repository at this point in the history
  • Loading branch information
zambrovski committed Jan 14, 2024
1 parent 4f8475b commit b0a4626
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 14 deletions.
31 changes: 31 additions & 0 deletions src/main/kotlin/adapter/in/AdminController.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.holixon.example.axon.event.transformation.adapter.`in`

import io.holixon.example.axon.event.transformation.adapter.out.event.MeasurementsUpdatedEvent
import io.holixon.example.axon.event.transformation.infrastructure.EventCleanup
import org.springframework.http.ResponseEntity
import org.springframework.http.ResponseEntity.noContent
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
import java.time.Instant

@RestController
@RequestMapping("/admin")
class AdminController(
val eventCleanup: EventCleanup
) {

@PostMapping(value = ["/cleanup"])
fun cleanup(@RequestBody deleteUntil: Instant): ResponseEntity<Void> {
eventCleanup.deleteEventsUntil(eventFQDN = MeasurementsUpdatedEvent::class.java.name, deleteUntil = deleteUntil)
return noContent().build()
}

@PostMapping(value = ["/compact"])
fun compact(): ResponseEntity<Void> {
eventCleanup.compact()
return noContent().build()
}

}
9 changes: 3 additions & 6 deletions src/main/kotlin/adapter/in/MeasurementController.kt
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package io.holixon.example.axon.event.transformation.adapter.`in`

import io.holixon.example.axon.event.transformation.application.port.`in`.Measurement
import io.holixon.example.axon.event.transformation.application.port.`in`.RetrieveMeasurementsInPort
import io.holixon.example.axon.event.transformation.application.port.`in`.SensorMeasurements
import io.holixon.example.axon.event.transformation.application.port.`in`.UpdateMeasurementInPort
import io.holixon.example.axon.event.transformation.application.port.`in`.*
import org.springframework.http.MediaType
import org.springframework.http.ResponseEntity
import org.springframework.http.ResponseEntity.noContent
Expand All @@ -18,7 +15,7 @@ import java.time.ZoneOffset
class MeasurementController(
val updateMeasurementInPort: UpdateMeasurementInPort,
val retrieveMeasurementsInPort: RetrieveMeasurementsInPort,
val batchUpdater: BatchUpdater
val generateRandomBatchUpdateInPort: GenerateRandomBatchUpdateInPort
) {

@GetMapping(value = ["/sensor/{sensorId}"], produces = [MediaType.APPLICATION_JSON_VALUE])
Expand Down Expand Up @@ -59,7 +56,7 @@ class MeasurementController(

@PostMapping(value = ["/batch"])
fun batchUpdate(): ResponseEntity<Void> {
batchUpdater.runBatchUpdate()
generateRandomBatchUpdateInPort.runBatchUpdate()
return noContent().build()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.holixon.example.axon.event.transformation.application.port.`in`

interface GenerateRandomBatchUpdateInPort {
fun runBatchUpdate(sensorId: String = "temp1")
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.holixon.example.axon.event.transformation.adapter.`in`
package io.holixon.example.axon.event.transformation.application.usecase

import io.holixon.example.axon.event.transformation.application.port.`in`.GenerateRandomBatchUpdateInPort
import io.holixon.example.axon.event.transformation.application.port.`in`.Measurement
import io.holixon.example.axon.event.transformation.application.port.`in`.SensorMeasurements
import io.holixon.example.axon.event.transformation.application.port.`in`.UpdateMeasurementInPort
Expand All @@ -8,11 +9,11 @@ import java.time.LocalDate
import kotlin.random.Random

@Component
class BatchUpdater(
class GenerateRandomBatchUpdateUseCase(
val updateMeasurementInPort: UpdateMeasurementInPort
) {
) : GenerateRandomBatchUpdateInPort {

fun runBatchUpdate(sensorId: String = "temp1") {
override fun runBatchUpdate(sensorId: String) {
(0L..365L).forEach { offset ->
val updateDate = LocalDate.now().plusDays(offset)
generateForecast(sensorId, updateDate)
Expand All @@ -24,12 +25,9 @@ class BatchUpdater(
SensorMeasurements(
sensorId = sensorId,
values = (0L..365L).associate { forecastOffset ->
updateDate.plusDays(forecastOffset) to Measurement(randomFloat(), "°C")
updateDate.plusDays(forecastOffset) to Measurement(Random.nextFloat() * 30, "°C")
}
)
)

}

private fun randomFloat(): Float = Random.nextFloat() * 30
}
32 changes: 32 additions & 0 deletions src/main/kotlin/infrastructure/AutoClosableAxonServerConnection.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.holixon.example.axon.event.transformation.infrastructure

import io.axoniq.axonserver.connector.AxonServerConnection
import io.axoniq.axonserver.connector.AxonServerConnectionFactory
import io.axoniq.axonserver.connector.impl.ServerAddress
import mu.KLogging
import java.io.Closeable

/**
* Auto-closable version of AxonServer Connection.
*/
class AutoClosableAxonServerConnection(
private val connection: AxonServerConnection
) : Closeable, AxonServerConnection by connection {

companion object : KLogging() {
fun connect(axonServerHost: String, axonServerPort: Int = 8124, componentName: String, context: String): AutoClosableAxonServerConnection {
val factory = AxonServerConnectionFactory.forClient(componentName)
.routingServers(ServerAddress(axonServerHost, axonServerPort))
.build()
val connection = factory.connect(context)
logger.info { "Connected to $axonServerHost:$axonServerPort@$context" }
return AutoClosableAxonServerConnection(connection)
}
}

override fun close() {
if (connection.isConnected) {
connection.disconnect()
}
}
}
74 changes: 74 additions & 0 deletions src/main/kotlin/infrastructure/EventCleanup.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package io.holixon.example.axon.event.transformation.infrastructure

import io.axoniq.axonserver.connector.AxonServerConnection
import io.axoniq.axonserver.connector.event.transformation.EventTransformation
import io.axoniq.axonserver.connector.event.transformation.event.EventSources
import mu.KLogging
import org.axonframework.axonserver.connector.AxonServerConfiguration
import org.springframework.stereotype.Component
import java.time.Instant

@Component
class EventCleanup(
axonServerConfiguration: AxonServerConfiguration
) {

private val connect = axonServerConfiguration.routingServers().first()
private val context = axonServerConfiguration.context

companion object : KLogging()

fun deleteEventsUntil(eventFQDN: String, deleteUntil: Instant, firstToken: Long = 0, lastToken: Long = -1L) {
AutoClosableAxonServerConnection
.connect(connect.hostName, connect.grpcPort, this.javaClass.simpleName, this.context).use { connection ->

// define the range of events
val last = if (lastToken == -1L) {
connection.eventChannel().lastToken.get()
} else {
lastToken
}

try {
EventSources
.range({ connection.eventChannel() }, firstToken, last)
.filter { eventWithToken ->
logger.debug { "Event: ${eventWithToken.event.payload.type}" }
eventWithToken.event.payload.type == eventFQDN && eventWithToken.event.timestamp < deleteUntil.toEpochMilli()
}
.transform("Deleting events until $deleteUntil") { event, appender -> appender.deleteEvent(event.token) }
.execute { connection.eventTransformationChannel() }
.get()
} catch (e: Exception) {
logger.error(e) { "Error executing event transformation" }
} finally {
connection.ensureNoActiveTransformations()
}
}
}

fun compact() {
AutoClosableAxonServerConnection
.connect(connect.hostName, connect.grpcPort, this.javaClass.simpleName, this.context).use { connection ->
connection.ensureNoActiveTransformations()
connection
.eventTransformationChannel()
.startCompacting()
.get()
}
}

fun AxonServerConnection.ensureNoActiveTransformations() {
this.eventTransformationChannel().transformations().get().forEach {
logger.debug { "Found event transformation: ${it.id()}, ${it.description()}, ${it.state()}, ${it.lastSequence()}" }
if (it.state() == EventTransformation.State.ACTIVE) {
// cancelling the active one
this.eventTransformationChannel().activeTransformation().get().cancel().get()
logger.warn { "Cancelled active transformation ${it.id()}, ${it.description()}, ${it.state()}, ${it.lastSequence()}" }
}
}
}

}


0 comments on commit b0a4626

Please sign in to comment.