Skip to content

Commit

Permalink
Adapt for Multi Tenancy
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerard Klijs committed Aug 7, 2024
1 parent 8d75ff5 commit c21d88c
Showing 1 changed file with 68 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,40 +29,81 @@ class ProcessorReportCreator(
private val processingConfig: EventProcessingConfiguration,
private val metricsRegistry: ProcessorMetricsRegistry,
) {
companion object {
const val TENANT_SEPARATOR = "~t~"
const val MULTI_TENANT_PROCESSOR_CLASS = "org.axonframework.extensions.multitenancy.components.eventhandeling.MultiTenantEventProcessor"
}

fun createReport() = ProcessorStatusReport(
processingConfig.eventProcessors()
.map { entry ->
if(entry.value is StreamingEventProcessor) {
val sep = entry.value as StreamingEventProcessor
ProcessorStatus(
entry.key,
entry.value.toProcessingGroupStatuses(),
sep.tokenStoreIdentifier,
sep.toType(),
sep.isRunning,
sep.isError,
sep.maxCapacity(),
sep.processingStatus().filterValues { !it.isErrorState }.size,
sep.processingStatus().map { (_, segment) -> segment.toStatus(entry.key) },
)
} else {
val sep = entry.value as SubscribingEventProcessor
ProcessorStatus(
sep.name,
emptyList(),
"",
ProcessorMode.SUBSCRIBING,
sep.isRunning,
sep.isError,
0,
0,
emptyList(),
)
.flatMap { entry ->
when (val processor = entry.value) {
is StreamingEventProcessor -> listOf(streamingStatus(entry.key, processor))
is SubscribingEventProcessor -> listOf(subscribingStatus(entry.key, processor))
else -> extractTenantProcessorsOrDefault(entry.key, processor)
}
}
)

private fun streamingStatus(name: String, processor: StreamingEventProcessor) =
ProcessorStatus(
name,
processor.toProcessingGroupStatuses(),
processor.tokenStoreIdentifier,
processor.toType(),
processor.isRunning,
processor.isError,
processor.maxCapacity(),
processor.processingStatus().filterValues { !it.isErrorState }.size,
processor.processingStatus().map { (_, segment) -> segment.toStatus(name) },
)

private fun subscribingStatus(name: String, processor: SubscribingEventProcessor) =
ProcessorStatus(
name,
emptyList(),
"",
ProcessorMode.SUBSCRIBING,
processor.isRunning,
processor.isError,
0,
0,
emptyList(),
)

private fun defaultStatus(name: String, processor: EventProcessor) =
ProcessorStatus(
name,
emptyList(),
"",
ProcessorMode.UNKNOWN,
processor.isRunning,
processor.isError,
0,
0,
emptyList(),
)


private fun extractTenantProcessorsOrDefault(name: String, processor: EventProcessor): List<ProcessorStatus> {
return if (processor.javaClass.name == MULTI_TENANT_PROCESSOR_CLASS) {
val segmentsField = ReflectionUtils.fieldsOf(processor.javaClass).first { it.name == "tenantEventProcessorsSegments" }
val segments = ReflectionUtils.getFieldValue(segmentsField, processor) as Map<Any, EventProcessor>
segments.map { toStatus(name, it) }
} else listOf(defaultStatus(name, processor))
}

private fun toStatus(name: String, entry: Map.Entry<Any, EventProcessor>): ProcessorStatus {
val tenantIdField = ReflectionUtils.fieldsOf(entry.key.javaClass).first { it.name == "tenantId" }
val tenantId = ReflectionUtils.getFieldValue(tenantIdField, entry.key) as String
val combinedName = "${name}${TENANT_SEPARATOR}${tenantId}"
return when (val processor = entry.value) {
is StreamingEventProcessor -> streamingStatus(combinedName, processor)
is SubscribingEventProcessor -> subscribingStatus(combinedName, processor)
else -> defaultStatus(combinedName, processor)
}
}

fun createSegmentOverview(processorName: String): SegmentOverview {
val tokenStore = processingConfig.tokenStore(processorName)
val segments = tokenStore.fetchSegments(processorName)
Expand Down

0 comments on commit c21d88c

Please sign in to comment.