Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the client compatible with AF 4.9. #27

Merged
merged 2 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ package io.axoniq.console.framework.starter

import io.axoniq.console.framework.AxoniqConsoleConfigurerModule
import io.axoniq.console.framework.messaging.AxoniqConsoleSpanFactory
import io.axoniq.console.framework.messaging.SpanMatcherPredicateMap
import io.axoniq.console.framework.messaging.getSpanMatcherPredicateMap
import org.axonframework.config.ConfigurerModule
import org.axonframework.tracing.MultiSpanFactory
import org.axonframework.tracing.NoOpSpanFactory
import org.axonframework.tracing.SpanFactory
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.config.BeanPostProcessor
import org.springframework.boot.autoconfigure.AutoConfiguration
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.ApplicationContext
Expand Down Expand Up @@ -73,14 +76,22 @@ class AxoniqConsoleAutoConfiguration {
.build()
}

@Bean
@ConditionalOnMissingBean(SpanMatcherPredicateMap::class)
fun spanMatcherPredicateMap(): SpanMatcherPredicateMap {
return getSpanMatcherPredicateMap()
}

@Bean
@ConditionalOnProperty("axoniq.console.credentials", matchIfMissing = false)
fun axoniqConsoleSpanFactoryPostProcessor(): BeanPostProcessor = object : BeanPostProcessor {
fun axoniqConsoleSpanFactoryPostProcessor(
spanMatcherPredicateMap: SpanMatcherPredicateMap
): BeanPostProcessor = object : BeanPostProcessor {
override fun postProcessAfterInitialization(bean: Any, beanName: String): Any {
if (bean !is SpanFactory || bean is AxoniqConsoleSpanFactory) {
return bean
}
val spanFactory = AxoniqConsoleSpanFactory()
val spanFactory = AxoniqConsoleSpanFactory(spanMatcherPredicateMap)
if (bean is NoOpSpanFactory) {
return spanFactory
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright (c) 2022-2023. AxonIQ B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.axoniq.console.framework;

import io.axoniq.console.framework.client.AxoniqConsoleRSocketClient;
Expand All @@ -6,12 +22,18 @@
import io.axoniq.console.framework.client.SetupPayloadCreator;
import io.axoniq.console.framework.client.strategy.CborEncodingStrategy;
import io.axoniq.console.framework.client.strategy.RSocketPayloadEncodingStrategy;
import io.axoniq.console.framework.eventprocessor.*;
import io.axoniq.console.framework.eventprocessor.DeadLetterManager;
import io.axoniq.console.framework.eventprocessor.EventProcessorManager;
import io.axoniq.console.framework.eventprocessor.ProcessorReportCreator;
import io.axoniq.console.framework.eventprocessor.RSocketDlqResponder;
import io.axoniq.console.framework.eventprocessor.RSocketProcessorResponder;
import io.axoniq.console.framework.eventprocessor.metrics.AxoniqConsoleProcessorInterceptor;
import io.axoniq.console.framework.eventprocessor.metrics.ProcessorMetricsRegistry;
import io.axoniq.console.framework.messaging.AxoniqConsoleDispatchInterceptor;
import io.axoniq.console.framework.messaging.AxoniqConsoleSpanFactory;
import io.axoniq.console.framework.messaging.HandlerMetricsRegistry;
import io.axoniq.console.framework.messaging.SpanMatcher;
import io.axoniq.console.framework.messaging.SpanMatcherPredicateMap;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.config.Configuration;
Expand All @@ -27,6 +49,9 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

import static io.axoniq.console.framework.messaging.SpanMatcherKt.getSpanMatcherPredicateMap;

/**
* Applies the configuration necessary for AxonIQ Console to the {@link Configurer} of Axon Framework.
Expand All @@ -44,6 +69,7 @@ public class AxoniqConsoleConfigurerModule implements ConfigurerModule {
private final ScheduledExecutorService reportingTaskExecutor;
private final ExecutorService managementTaskExecutor;
private final boolean configureSpanFactory;
private final SpanMatcherPredicateMap spanMatcherPredicateMap;

/**
* Creates the {@link AxoniqConsoleConfigurerModule} with the given {@code builder}.
Expand All @@ -62,6 +88,7 @@ protected AxoniqConsoleConfigurerModule(Builder builder) {
this.reportingTaskExecutor = builder.reportingTaskExecutor;
this.managementTaskExecutor = builder.managementTaskExecutor;
this.configureSpanFactory = !builder.disableSpanFactoryInConfiguration;
this.spanMatcherPredicateMap = builder.spanMatcherPredicateMap;
}

/**
Expand Down Expand Up @@ -160,7 +187,7 @@ public void configureModule(@NotNull Configurer configurer) {
));

if (configureSpanFactory) {
configurer.registerComponent(SpanFactory.class, c -> new AxoniqConsoleSpanFactory());
configurer.registerComponent(SpanFactory.class, c -> new AxoniqConsoleSpanFactory(spanMatcherPredicateMap));
}

configurer.onInitialize(c -> {
Expand Down Expand Up @@ -206,6 +233,7 @@ public static class Builder {
private AxoniqConsoleDlqMode dlqMode = AxoniqConsoleDlqMode.FULL;
private Long initialDelay = 0L;
private boolean disableSpanFactoryInConfiguration = false;
private final SpanMatcherPredicateMap spanMatcherPredicateMap = getSpanMatcherPredicateMap();

private ScheduledExecutorService reportingTaskExecutor;
private Integer reportingThreadPoolSize = 2;
Expand Down Expand Up @@ -349,6 +377,21 @@ public Builder disableSpanFactoryInConfiguration() {
return this;
}

/**
* Overwrites a span predicate. It might be necessary to set these when the naming of the spans is customized.
* See {@link SpanMatcher} for the defaults, based on the Axon Framework version.
*
* @param spanMatcher the type os span to change.
* @param predicate the function to determine is a predicate with a certain name, matches the type.
* @return The builder for fluent interfacing
*/
public Builder changeSpanPredicate(SpanMatcher spanMatcher, Predicate<String> predicate) {
BuilderUtils.assertNonNull(spanMatcher, "Span matcher to update spanMatcherPredicateMap must be non-null");
BuilderUtils.assertNonNull(predicate, "Predicate to update spanMatcherPredicateMap must be non-null");
spanMatcherPredicateMap.put(spanMatcher, predicate);
return this;
}

/**
* Whether to use a secure connection using SSL/TLS or not. Defaults to {@code true}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import org.axonframework.messaging.MetaData
import org.axonframework.messaging.deadletter.DeadLetter
import org.axonframework.messaging.deadletter.SequencedDeadLetterQueue
import org.axonframework.serialization.Serializer
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.Callable
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
import io.axoniq.console.framework.api.DeadLetter as ApiDeadLetter

private const val LETTER_PAYLOAD_SIZE_LIMIT = 1024
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package io.axoniq.console.framework.eventprocessor

import io.axoniq.console.framework.api.*
import io.axoniq.console.framework.api.ProcessorSegmentId
import io.axoniq.console.framework.api.ProcessorStatusReport
import io.axoniq.console.framework.api.ResetDecision
import io.axoniq.console.framework.api.SegmentOverview
import io.axoniq.console.framework.client.RSocketHandlerRegistrar
import org.axonframework.lifecycle.Lifecycle
import org.axonframework.lifecycle.Phase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package io.axoniq.console.framework.messaging

import io.axoniq.console.framework.api.ComponentPayload
import io.axoniq.console.framework.api.AxoniqConsoleMessageOrigin
import io.axoniq.console.framework.api.ComponentPayload
import io.axoniq.console.framework.api.metrics.DispatcherStatisticIdentifier
import io.axoniq.console.framework.api.metrics.HandlerStatisticsMetricIdentifier
import io.axoniq.console.framework.api.metrics.HandlerType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.function.Supplier


class AxoniqConsoleSpanFactory : SpanFactory {
class AxoniqConsoleSpanFactory(private val spanMatcherPredicateMap: SpanMatcherPredicateMap) : SpanFactory {

companion object {
private val logger = LoggerFactory.getLogger(this::class.java)
Expand Down Expand Up @@ -64,7 +64,7 @@ class AxoniqConsoleSpanFactory : SpanFactory {
private val metrics: MutableMap<Metric, Long> = mutableMapOf()

fun registerHandler(handlerMetricIdentifier: HandlerStatisticsMetricIdentifier?, time: Long) {
if(handlerMetricIdentifier == null) {
if (handlerMetricIdentifier == null) {
return
}
this.handlerMetricIdentifier = handlerMetricIdentifier
Expand Down Expand Up @@ -110,15 +110,15 @@ class AxoniqConsoleSpanFactory : SpanFactory {
logger.trace("Reporting span for message id $messageId = $handlerMetricIdentifier")
val success = handlerSuccessful && transactionSuccessful
HandlerMetricsRegistry.getInstance()?.registerMessageHandled(
handler = handlerMetricIdentifier!!,
success = success,
duration = end - timeStarted!!,
metrics = metrics
handler = handlerMetricIdentifier!!,
success = success,
duration = end - timeStarted!!,
metrics = metrics
)
if (success) {
dispatchedMessages.forEach {
HandlerMetricsRegistry.getInstance()?.registerMessageDispatchedDuringHandling(
DispatcherStatisticIdentifier(handlerMetricIdentifier, it)
DispatcherStatisticIdentifier(handlerMetricIdentifier, it)
)
}
}
Expand All @@ -138,35 +138,35 @@ class AxoniqConsoleSpanFactory : SpanFactory {
}

override fun createHandlerSpan(
operationNameSupplier: Supplier<String>,
parentMessage: Message<*>,
isChildTrace: Boolean,
vararg linkedParents: Message<*>?
operationNameSupplier: Supplier<String>,
parentMessage: Message<*>,
isChildTrace: Boolean,
vararg linkedParents: Message<*>?
): Span {
val name = operationNameSupplier.get()
if (name == "QueryProcessingTask" || name == "AxonServerCommandBus.handle" || name == "DeadlineJob.execute") {
if (spanMatcherPredicateMap[SpanMatcher.HANDLER]!!.test(name)) {
return startIfNotActive(parentMessage)
}
return NOOP_SPAN
}

override fun createDispatchSpan(
operationNameSupplier: Supplier<String>,
parentMessage: Message<*>?,
vararg linkedSiblings: Message<*>?
operationNameSupplier: Supplier<String>,
parentMessage: Message<*>?,
vararg linkedSiblings: Message<*>?
): Span {
return NOOP_SPAN
}

override fun createInternalSpan(operationNameSupplier: Supplier<String>): Span {
val name = operationNameSupplier.get()
if (name == "LockingRepository.obtainLock") {
if (spanMatcherPredicateMap[SpanMatcher.OBTAIN_LOCK]!!.test(name)) {
return TimeRecordingSpan(PreconfiguredMetric.AGGREGATE_LOCK_TIME)
}
if (name.contains(".load ")) {
if (spanMatcherPredicateMap[SpanMatcher.LOAD]!!.test(name)) {
return TimeRecordingSpan(PreconfiguredMetric.AGGREGATE_LOAD_TIME)
}
if (name.endsWith(".commit")) {
if (spanMatcherPredicateMap[SpanMatcher.COMMIT]!!.test(name)) {
return TimeRecordingSpan(PreconfiguredMetric.EVENT_COMMIT_TIME)
}

Expand All @@ -175,12 +175,15 @@ class AxoniqConsoleSpanFactory : SpanFactory {

override fun createInternalSpan(operationNameSupplier: Supplier<String>, message: Message<*>): Span {
val name = operationNameSupplier.get()
if (name.endsWith("Bus.handle")
|| name == "SimpleQueryBus.query"
|| name.startsWith("SimpleQueryBus.scatterGather")
|| name.startsWith("PooledStreamingEventProcessor")
|| name.startsWith("TrackingEventProcessor")
) {
if (spanMatcherPredicateMap[SpanMatcher.MESSAGE_START]!!.test(name)) {
return startIfNotActive(message)
}
return NOOP_SPAN
}

override fun createChildHandlerSpan(operationNameSupplier: Supplier<String>, message: Message<*>, vararg linkedParents: Message<*>?): Span {
val name = operationNameSupplier.get()
if (spanMatcherPredicateMap[SpanMatcher.MESSAGE_START]!!.test(name)) {
return startIfNotActive(message)
}
return NOOP_SPAN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class HandlerMetricsRegistry(
private val handlers: MutableMap<HandlerStatisticsMetricIdentifier, HandlerRegistryStatistics> = ConcurrentHashMap()
private val aggregates: MutableMap<AggregateStatisticIdentifier, AggregateRegistryStatistics> = ConcurrentHashMap()

private val noHanlerIdentifier = HandlerStatisticsMetricIdentifier(HandlerType.Origin, "application", MessageIdentifier("Dispatcher", componentName))
private val noHandlerIdentifier = HandlerStatisticsMetricIdentifier(HandlerType.Origin, "application", MessageIdentifier("Dispatcher", componentName))

override fun registerLifecycleHandlers(lifecycle: Lifecycle.LifecycleRegistry) {
lifecycle.onStart(Phase.INSTRUCTION_COMPONENTS, this::start)
Expand Down Expand Up @@ -173,7 +173,7 @@ class HandlerMetricsRegistry(
fun registerMessageDispatchedWithoutHandling(
message: MessageIdentifier,
) {
dispatches.computeIfAbsentWithRetry(DispatcherStatisticIdentifier(noHanlerIdentifier, message)) { _ ->
dispatches.computeIfAbsentWithRetry(DispatcherStatisticIdentifier(noHandlerIdentifier, message)) { _ ->
RollingCountMeasure()
}.increment()
}
Expand Down
Loading