Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Second generation event processors #72

Merged
merged 7 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion console-framework-client-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>io.axoniq.console</groupId>
<artifactId>console-framework-client-parent</artifactId>
<version>1.5.2-SNAPSHOT</version>
<version>1.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.axoniq.console.framework.api

import com.fasterxml.jackson.annotation.JsonSubTypes
import com.fasterxml.jackson.annotation.JsonTypeInfo
import java.net.URLDecoder
import java.net.URLEncoder

Expand All @@ -27,8 +29,8 @@ import java.net.URLEncoder
* @code Bearer WORK_SPACE_ID:ENVIRONMENT_ID:COMPONENT_NAME:NODE_ID:ACCESS_TOKEN}
*/
data class ConsoleClientAuthentication(
val identification: ConsoleClientIdentifier,
val accessToken: String,
val identification: ConsoleClientIdentifier,
val accessToken: String,
) {
fun toBearerToken(): String {
return BEARER_PREFIX + listOf(
Expand All @@ -50,24 +52,24 @@ data class ConsoleClientAuthentication(
if (tokenParts.size == 5) {
val (_, environmentId, applicationName, nodeName, accessToken) = tokenParts
return ConsoleClientAuthentication(
ConsoleClientIdentifier(
environmentId = environmentId,
applicationName = applicationName.decode(),
nodeName = nodeName.decode()
),
accessToken
ConsoleClientIdentifier(
environmentId = environmentId,
applicationName = applicationName.decode(),
nodeName = nodeName.decode()
),
accessToken
)
}

assert(tokenParts.size == 4) { TOKEN_ERROR }
val (environmentId, applicationName, nodeName, accessToken) = tokenParts
return ConsoleClientAuthentication(
ConsoleClientIdentifier(
environmentId = environmentId,
applicationName = applicationName.decode(),
nodeName = nodeName.decode()
),
accessToken
ConsoleClientIdentifier(
environmentId = environmentId,
applicationName = applicationName.decode(),
nodeName = nodeName.decode()
),
accessToken
)
}

Expand All @@ -87,7 +89,7 @@ data class SetupPayload(
val commandBus: CommandBusInformation,
val queryBus: QueryBusInformation,
val eventStore: EventStoreInformation,
val processors: List<ProcessorInformation>,
val processors: List<EventProcessorInformation>,
val versions: Versions,
val upcasters: List<String>,
)
Expand All @@ -107,48 +109,139 @@ data class ModuleVersion(
)

data class CommandBusInformation(
val type: String,
val axonServer: Boolean,
val localSegmentType: String?,
val context: String?,
val handlerInterceptors: List<InterceptorInformation> = emptyList(),
val dispatchInterceptors: List<InterceptorInformation> = emptyList(),
val messageSerializer: SerializerInformation?,
val type: String,
val axonServer: Boolean,
val localSegmentType: String?,
val context: String?,
val handlerInterceptors: List<InterceptorInformation> = emptyList(),
val dispatchInterceptors: List<InterceptorInformation> = emptyList(),
val messageSerializer: SerializerInformation?,
)

data class QueryBusInformation(
val type: String,
val axonServer: Boolean,
val localSegmentType: String?,
val context: String?,
val handlerInterceptors: List<InterceptorInformation> = emptyList(),
val dispatchInterceptors: List<InterceptorInformation> = emptyList(),
val messageSerializer: SerializerInformation?,
val serializer: SerializerInformation?,
val type: String,
val axonServer: Boolean,
val localSegmentType: String?,
val context: String?,
val handlerInterceptors: List<InterceptorInformation> = emptyList(),
val dispatchInterceptors: List<InterceptorInformation> = emptyList(),
val messageSerializer: SerializerInformation?,
val serializer: SerializerInformation?,
)

data class EventStoreInformation(
val type: String,
val axonServer: Boolean,
val context: String?,
val dispatchInterceptors: List<InterceptorInformation> = emptyList(),
val eventSerializer: SerializerInformation?,
val snapshotSerializer: SerializerInformation?,
val approximateSize: Long? = null,
)

data class ProcessorInformation(
val name: String,
val messageSourceType: String,
val contexts: List<String>? = emptyList(),
val tokenStoreType: String,
val supportsReset: Boolean,
val batchSize: Int,
val tokenClaimInterval: Long,
val tokenStoreClaimTimeout: Long,
val errorHandler: String,
val invocationErrorHandler: String,
val interceptors: List<InterceptorInformation>,
val type: String,
val axonServer: Boolean,
val context: String?,
val dispatchInterceptors: List<InterceptorInformation> = emptyList(),
val eventSerializer: SerializerInformation?,
val snapshotSerializer: SerializerInformation?,
val approximateSize: Long? = null,
)


data class EventProcessorInformation(
val name: String,
val processorType: ProcessorType? = null,

val commonProcessorInformation: CommonProcessorInformation? = null,
val subscribingProcessorInformation: SubscribingProcessorInformation? = null,
val streamingInformation: StreamingEventProcessorInformation? = null,
val trackingInformation: TrackingEventProcessorInformation? = null,
val pooledStreamingInformation: PooledStreamingEventProcessorInformation? = null,

@Deprecated("Deprecated since version 1.6.0 due to new processor structure")
val messageSourceType: String? = null,
@Deprecated("Deprecated since version 1.6.0 due to new processor structure")
val contexts: List<String>? = null,
@Deprecated("Deprecated since version 1.6.0 due to new processor structure")
val tokenStoreType: String? = null,
@Deprecated("Deprecated since version 1.6.0 due to new processor structure")
val supportsReset: Boolean? = null,
@Deprecated("Deprecated since version 1.6.0 due to new processor structure")
val batchSize: Int? = null,
@Deprecated("Deprecated since version 1.6.0 due to new processor structure")
val tokenClaimInterval: Long? = null,
@Deprecated("Deprecated since version 1.6.0 due to new processor structure")
val tokenStoreClaimTimeout: Long? = null,
@Deprecated("Deprecated since version 1.6.0 due to new processor structure")
val errorHandler: String? = null,
@Deprecated("Deprecated since version 1.6.0 due to new processor structure")
val invocationErrorHandler: String? = null,
@Deprecated("Deprecated since version 1.6.0 due to new processor structure")
val interceptors: List<InterceptorInformation>? = null,
)

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
@JsonSubTypes(
JsonSubTypes.Type(value = AxonServerEventStoreMessageSourceInformation::class, name = "AxonServer"),
JsonSubTypes.Type(value = EmbeddedEventStoreMessageSourceInformation::class, name = "Embedded"),
JsonSubTypes.Type(value = MultiStreamableMessageSourceInformation::class, name = "MultiStreamable"),
JsonSubTypes.Type(value = UnspecifiedMessageSourceInformation::class, name = "Unspecified"),
)
interface MessageSourceInformation {
val className: String
}

data class AxonServerEventStoreMessageSourceInformation(
override val className: String,
val contexts: List<String>
) : MessageSourceInformation

data class EmbeddedEventStoreMessageSourceInformation(
override val className: String,
val optimizeEventConsumption: Boolean?,
val fetchDelay: Long?,
val cachedEvents: Int?,
val cleanupDelay: Long?,
val eventStorageEngineType: String?,
) : MessageSourceInformation

data class MultiStreamableMessageSourceInformation(
override val className: String,
val sources: List<MessageSourceInformation>,
) : MessageSourceInformation

data class UnspecifiedMessageSourceInformation(
override val className: String,
) : MessageSourceInformation


enum class ProcessorType {
SUBSCRIBING,
TRACKING,
POOLED_STREAMING,
}

data class StreamingEventProcessorInformation(
val tokenStoreType: String?,
val batchSize: Int?,
val tokenClaimInterval: Long?,
val tokenStoreClaimTimeout: Long?,
val supportsReset: Boolean?,
)

data class TrackingEventProcessorInformation(
val maxThreadCount: Int?,
val eventAvailabilityTimeout: Int?,
val storeTokenBeforeProcessing: Boolean?,
)

data class PooledStreamingEventProcessorInformation(
val maxClaimedSegments: Int?,
val claimExtensionThreshold: Long?,
val coordinatorExtendsClaims: Boolean?,
)

data class SubscribingProcessorInformation(
val processingStrategy: String,
)

data class CommonProcessorInformation(
val messageSource: MessageSourceInformation?,
val errorHandler: String?,
val invocationErrorHandler: String?,
val interceptors: List<InterceptorInformation>,
)

data class InterceptorInformation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,67 +19,70 @@ package io.axoniq.console.framework.api
import java.time.Instant

data class ProcessorStatusReport(
val processors: List<ProcessorStatus>
val processors: List<ProcessorStatus>
)

enum class ProcessorMode {
POOLED,
TRACKING,
SUBSCRIBING,
UNKNOWN,
}

data class ProcessorStatus(
val name: String,
val processingGroups: List<ProcessingGroupStatus>,
val tokenStoreIdentifier: String,
val mode: ProcessorMode,
val started: Boolean,
val error: Boolean,
val segmentCapacity: Int,
val activeSegments: Int,
val segments: List<SegmentStatus>,
val name: String,
val processingGroups: List<ProcessingGroupStatus>,
val tokenStoreIdentifier: String,
val mode: ProcessorMode,
val started: Boolean,
val error: Boolean,
val segmentCapacity: Int,
val activeSegments: Int,
val segments: List<SegmentStatus>,
)

data class ProcessingGroupStatus(
val name: String,
val dlqSize: Long?,
val name: String,
val dlqSize: Long?,
)

data class SegmentStatus(
val segment: Int,
val mergeableSegment: Int,
val mask: Int = -1,
val oneOf: Int,
val caughtUp: Boolean,
val error: Boolean,
val errorType: String?,
val errorMessage: String?,
val ingestLatency: Double?,
val commitLatency: Double?,
val segment: Int,
val mergeableSegment: Int,
val mask: Int = -1,
val oneOf: Int,
val caughtUp: Boolean,
val error: Boolean,
val errorType: String?,
val errorMessage: String?,
val ingestLatency: Double?,
val commitLatency: Double?,
val position: Long? = -1,
val resetPosition: Long? = -1,
)

data class ProcessorSegmentId(
val processorName: String,
val segmentId: Int
val processorName: String,
val segmentId: Int
)

enum class ResetDecisions {
HEAD, TAIL, FROM
}

data class ResetDecision(
val processorName: String,
val decision: ResetDecisions,
val from: Instant? = null
val processorName: String,
val decision: ResetDecisions,
val from: Instant? = null
)

data class SegmentOverview(
val segments: List<SegmentDetails>
val segments: List<SegmentDetails>
)

data class SegmentDetails(

val segment: Int,
val mergeableSegment: Int,
val mask: Int,
val segment: Int,
val mergeableSegment: Int,
val mask: Int,
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package io.axoniq.console.framework.api.metrics

enum class HandlerType {
Origin,
EventProcessor,
Aggregate,
Message,
enum class HandlerType(val shortIdentifier: String) {
Origin("o"),
EventProcessor("ep"),
Aggregate("ag"),
Message("m"),
}
Loading