Skip to content

Commit

Permalink
Merge pull request #72 from AxonIQ/feature/event-processors-gen-2
Browse files Browse the repository at this point in the history
Second generation event processors
  • Loading branch information
CodeDrivenMitch authored Jul 15, 2024
2 parents 053f584 + 644f183 commit dd2bfce
Show file tree
Hide file tree
Showing 11 changed files with 438 additions and 213 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.axoniq.console.framework.api

import com.fasterxml.jackson.annotation.JsonSubTypes
import com.fasterxml.jackson.annotation.JsonTypeInfo
import java.net.URLDecoder
import java.net.URLEncoder

Expand All @@ -27,8 +29,8 @@ import java.net.URLEncoder
* @code Bearer WORK_SPACE_ID:ENVIRONMENT_ID:COMPONENT_NAME:NODE_ID:ACCESS_TOKEN}
*/
data class ConsoleClientAuthentication(
val identification: ConsoleClientIdentifier,
val accessToken: String,
val identification: ConsoleClientIdentifier,
val accessToken: String,
) {
fun toBearerToken(): String {
return BEARER_PREFIX + listOf(
Expand All @@ -50,24 +52,24 @@ data class ConsoleClientAuthentication(
if (tokenParts.size == 5) {
val (_, environmentId, applicationName, nodeName, accessToken) = tokenParts
return ConsoleClientAuthentication(
ConsoleClientIdentifier(
environmentId = environmentId,
applicationName = applicationName.decode(),
nodeName = nodeName.decode()
),
accessToken
ConsoleClientIdentifier(
environmentId = environmentId,
applicationName = applicationName.decode(),
nodeName = nodeName.decode()
),
accessToken
)
}

assert(tokenParts.size == 4) { TOKEN_ERROR }
val (environmentId, applicationName, nodeName, accessToken) = tokenParts
return ConsoleClientAuthentication(
ConsoleClientIdentifier(
environmentId = environmentId,
applicationName = applicationName.decode(),
nodeName = nodeName.decode()
),
accessToken
ConsoleClientIdentifier(
environmentId = environmentId,
applicationName = applicationName.decode(),
nodeName = nodeName.decode()
),
accessToken
)
}

Expand All @@ -87,7 +89,7 @@ data class SetupPayload(
val commandBus: CommandBusInformation,
val queryBus: QueryBusInformation,
val eventStore: EventStoreInformation,
val processors: List<ProcessorInformation>,
val processors: List<EventProcessorInformation>,
val versions: Versions,
val upcasters: List<String>,
)
Expand All @@ -107,48 +109,139 @@ data class ModuleVersion(
)

data class CommandBusInformation(
val type: String,
val axonServer: Boolean,
val localSegmentType: String?,
val context: String?,
val handlerInterceptors: List<InterceptorInformation> = emptyList(),
val dispatchInterceptors: List<InterceptorInformation> = emptyList(),
val messageSerializer: SerializerInformation?,
val type: String,
val axonServer: Boolean,
val localSegmentType: String?,
val context: String?,
val handlerInterceptors: List<InterceptorInformation> = emptyList(),
val dispatchInterceptors: List<InterceptorInformation> = emptyList(),
val messageSerializer: SerializerInformation?,
)

data class QueryBusInformation(
val type: String,
val axonServer: Boolean,
val localSegmentType: String?,
val context: String?,
val handlerInterceptors: List<InterceptorInformation> = emptyList(),
val dispatchInterceptors: List<InterceptorInformation> = emptyList(),
val messageSerializer: SerializerInformation?,
val serializer: SerializerInformation?,
val type: String,
val axonServer: Boolean,
val localSegmentType: String?,
val context: String?,
val handlerInterceptors: List<InterceptorInformation> = emptyList(),
val dispatchInterceptors: List<InterceptorInformation> = emptyList(),
val messageSerializer: SerializerInformation?,
val serializer: SerializerInformation?,
)

data class EventStoreInformation(
val type: String,
val axonServer: Boolean,
val context: String?,
val dispatchInterceptors: List<InterceptorInformation> = emptyList(),
val eventSerializer: SerializerInformation?,
val snapshotSerializer: SerializerInformation?,
val approximateSize: Long? = null,
)

data class ProcessorInformation(
val name: String,
val messageSourceType: String,
val contexts: List<String>? = emptyList(),
val tokenStoreType: String,
val supportsReset: Boolean,
val batchSize: Int,
val tokenClaimInterval: Long,
val tokenStoreClaimTimeout: Long,
val errorHandler: String,
val invocationErrorHandler: String,
val interceptors: List<InterceptorInformation>,
val type: String,
val axonServer: Boolean,
val context: String?,
val dispatchInterceptors: List<InterceptorInformation> = emptyList(),
val eventSerializer: SerializerInformation?,
val snapshotSerializer: SerializerInformation?,
val approximateSize: Long? = null,
)


data class EventProcessorInformation(
val name: String,
val processorType: ProcessorType? = null,

val commonProcessorInformation: CommonProcessorInformation? = null,
val subscribingProcessorInformation: SubscribingProcessorInformation? = null,
val streamingInformation: StreamingEventProcessorInformation? = null,
val trackingInformation: TrackingEventProcessorInformation? = null,
val pooledStreamingInformation: PooledStreamingEventProcessorInformation? = null,

@Deprecated("Deprecated since version 1.6.0 due to new processor structure")
val messageSourceType: String? = null,
@Deprecated("Deprecated since version 1.6.0 due to new processor structure")
val contexts: List<String>? = null,
@Deprecated("Deprecated since version 1.6.0 due to new processor structure")
val tokenStoreType: String? = null,
@Deprecated("Deprecated since version 1.6.0 due to new processor structure")
val supportsReset: Boolean? = null,
@Deprecated("Deprecated since version 1.6.0 due to new processor structure")
val batchSize: Int? = null,
@Deprecated("Deprecated since version 1.6.0 due to new processor structure")
val tokenClaimInterval: Long? = null,
@Deprecated("Deprecated since version 1.6.0 due to new processor structure")
val tokenStoreClaimTimeout: Long? = null,
@Deprecated("Deprecated since version 1.6.0 due to new processor structure")
val errorHandler: String? = null,
@Deprecated("Deprecated since version 1.6.0 due to new processor structure")
val invocationErrorHandler: String? = null,
@Deprecated("Deprecated since version 1.6.0 due to new processor structure")
val interceptors: List<InterceptorInformation>? = null,
)

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
@JsonSubTypes(
JsonSubTypes.Type(value = AxonServerEventStoreMessageSourceInformation::class, name = "AxonServer"),
JsonSubTypes.Type(value = EmbeddedEventStoreMessageSourceInformation::class, name = "Embedded"),
JsonSubTypes.Type(value = MultiStreamableMessageSourceInformation::class, name = "MultiStreamable"),
JsonSubTypes.Type(value = UnspecifiedMessageSourceInformation::class, name = "Unspecified"),
)
interface MessageSourceInformation {
val className: String
}

data class AxonServerEventStoreMessageSourceInformation(
override val className: String,
val contexts: List<String>
) : MessageSourceInformation

data class EmbeddedEventStoreMessageSourceInformation(
override val className: String,
val optimizeEventConsumption: Boolean?,
val fetchDelay: Long?,
val cachedEvents: Int?,
val cleanupDelay: Long?,
val eventStorageEngineType: String?,
) : MessageSourceInformation

data class MultiStreamableMessageSourceInformation(
override val className: String,
val sources: List<MessageSourceInformation>,
) : MessageSourceInformation

data class UnspecifiedMessageSourceInformation(
override val className: String,
) : MessageSourceInformation


enum class ProcessorType {
SUBSCRIBING,
TRACKING,
POOLED_STREAMING,
}

data class StreamingEventProcessorInformation(
val tokenStoreType: String?,
val batchSize: Int?,
val tokenClaimInterval: Long?,
val tokenStoreClaimTimeout: Long?,
val supportsReset: Boolean?,
)

data class TrackingEventProcessorInformation(
val maxThreadCount: Int?,
val eventAvailabilityTimeout: Int?,
val storeTokenBeforeProcessing: Boolean?,
)

data class PooledStreamingEventProcessorInformation(
val maxClaimedSegments: Int?,
val claimExtensionThreshold: Long?,
val coordinatorExtendsClaims: Boolean?,
)

data class SubscribingProcessorInformation(
val processingStrategy: String,
)

data class CommonProcessorInformation(
val messageSource: MessageSourceInformation?,
val errorHandler: String?,
val invocationErrorHandler: String?,
val interceptors: List<InterceptorInformation>,
)

data class InterceptorInformation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,67 +19,70 @@ package io.axoniq.console.framework.api
import java.time.Instant

data class ProcessorStatusReport(
val processors: List<ProcessorStatus>
val processors: List<ProcessorStatus>
)

enum class ProcessorMode {
POOLED,
TRACKING,
SUBSCRIBING,
UNKNOWN,
}

data class ProcessorStatus(
val name: String,
val processingGroups: List<ProcessingGroupStatus>,
val tokenStoreIdentifier: String,
val mode: ProcessorMode,
val started: Boolean,
val error: Boolean,
val segmentCapacity: Int,
val activeSegments: Int,
val segments: List<SegmentStatus>,
val name: String,
val processingGroups: List<ProcessingGroupStatus>,
val tokenStoreIdentifier: String,
val mode: ProcessorMode,
val started: Boolean,
val error: Boolean,
val segmentCapacity: Int,
val activeSegments: Int,
val segments: List<SegmentStatus>,
)

data class ProcessingGroupStatus(
val name: String,
val dlqSize: Long?,
val name: String,
val dlqSize: Long?,
)

data class SegmentStatus(
val segment: Int,
val mergeableSegment: Int,
val mask: Int = -1,
val oneOf: Int,
val caughtUp: Boolean,
val error: Boolean,
val errorType: String?,
val errorMessage: String?,
val ingestLatency: Double?,
val commitLatency: Double?,
val segment: Int,
val mergeableSegment: Int,
val mask: Int = -1,
val oneOf: Int,
val caughtUp: Boolean,
val error: Boolean,
val errorType: String?,
val errorMessage: String?,
val ingestLatency: Double?,
val commitLatency: Double?,
val position: Long? = -1,
val resetPosition: Long? = -1,
)

data class ProcessorSegmentId(
val processorName: String,
val segmentId: Int
val processorName: String,
val segmentId: Int
)

enum class ResetDecisions {
HEAD, TAIL, FROM
}

data class ResetDecision(
val processorName: String,
val decision: ResetDecisions,
val from: Instant? = null
val processorName: String,
val decision: ResetDecisions,
val from: Instant? = null
)

data class SegmentOverview(
val segments: List<SegmentDetails>
val segments: List<SegmentDetails>
)

data class SegmentDetails(

val segment: Int,
val mergeableSegment: Int,
val mask: Int,
val segment: Int,
val mergeableSegment: Int,
val mask: Int,
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package io.axoniq.console.framework.api.metrics

enum class HandlerType {
Origin,
EventProcessor,
Aggregate,
Message,
enum class HandlerType(val shortIdentifier: String) {
Origin("o"),
EventProcessor("ep"),
Aggregate("ag"),
Message("m"),
}
Loading

0 comments on commit dd2bfce

Please sign in to comment.