Skip to content

Commit

Permalink
Adds support to inject telemetry instances to plugins (#13636)
Browse files Browse the repository at this point in the history
* Adds support to inject telemetry instances to plugins

Signed-off-by: Gagan Juneja <[email protected]>

* Adds test

Signed-off-by: Gagan Juneja <[email protected]>

* incorporate pr comments

Signed-off-by: Gagan Juneja <[email protected]>

---------

Signed-off-by: Gagan Juneja <[email protected]>
Co-authored-by: Gagan Juneja <[email protected]>
  • Loading branch information
Gaganjuneja and Gagan Juneja authored May 17, 2024
1 parent 4700be3 commit 6ba6f59
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed
- Add ability for Boolean and date field queries to run when only doc_values are enabled ([#11650](https://github.com/opensearch-project/OpenSearch/pull/11650))
- Refactor implementations of query phase searcher, allow QueryCollectorContext to have zero collectors ([#13481](https://github.com/opensearch-project/OpenSearch/pull/13481))
- Adds support to inject telemetry instances to plugins ([#13636](https://github.com/opensearch-project/OpenSearch/pull/13636))

### Deprecated

Expand Down
38 changes: 38 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.plugins.SecureSettingsFactory;
import org.opensearch.plugins.SystemIndexPlugin;
import org.opensearch.plugins.TelemetryAwarePlugin;
import org.opensearch.plugins.TelemetryPlugin;
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService;
import org.opensearch.ratelimitting.admissioncontrol.transport.AdmissionControlTransportInterceptor;
Expand Down Expand Up @@ -274,6 +275,7 @@
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -620,6 +622,18 @@ protected Node(
final TelemetrySettings telemetrySettings = new TelemetrySettings(settings, clusterService.getClusterSettings());
if (telemetrySettings.isTracingFeatureEnabled() || telemetrySettings.isMetricsFeatureEnabled()) {
List<TelemetryPlugin> telemetryPlugins = pluginsService.filterPlugins(TelemetryPlugin.class);
List<TelemetryPlugin> telemetryPluginsImplementingTelemetryAware = telemetryPlugins.stream()
.filter(a -> TelemetryAwarePlugin.class.isAssignableFrom(a.getClass()))
.collect(toList());
if (telemetryPluginsImplementingTelemetryAware.isEmpty() == false) {
throw new IllegalStateException(
String.format(
Locale.ROOT,
"Telemetry plugins %s should not implement TelemetryAwarePlugin interface",
telemetryPluginsImplementingTelemetryAware
)
);
}
TelemetryModule telemetryModule = new TelemetryModule(telemetryPlugins, telemetrySettings);
if (telemetrySettings.isTracingFeatureEnabled()) {
tracerFactory = new TracerFactory(telemetrySettings, telemetryModule.getTelemetry(), threadPool.getThreadContext());
Expand Down Expand Up @@ -909,6 +923,30 @@ protected Node(
)
.collect(Collectors.toList());

Collection<Object> telemetryAwarePluginComponents = pluginsService.filterPlugins(TelemetryAwarePlugin.class)
.stream()
.flatMap(
p -> p.createComponents(
client,
clusterService,
threadPool,
resourceWatcherService,
scriptService,
xContentRegistry,
environment,
nodeEnvironment,
namedWriteableRegistry,
clusterModule.getIndexNameExpressionResolver(),
repositoriesServiceReference::get,
tracer,
metricsRegistry
).stream()
)
.collect(Collectors.toList());

// Add the telemetryAwarePlugin components to the existing pluginComponents collection.
pluginComponents.addAll(telemetryAwarePluginComponents);

// register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory
final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory =
new SearchRequestOperationsCompositeListenerFactory(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugins;

import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.lifecycle.LifecycleComponent;
import org.opensearch.core.common.io.stream.NamedWriteable;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.Collection;
import java.util.Collections;
import java.util.function.Supplier;

/**
* Plugin that provides the telemetry registries to build component with telemetry and also provide a way to
* pass telemetry registries to the implementing plugins for adding instrumentation in the code.
*
* @opensearch.experimental
*/
@ExperimentalApi
public interface TelemetryAwarePlugin {

/**
* Returns components added by this plugin.
* <p>
* Any components returned that implement {@link LifecycleComponent} will have their lifecycle managed.
* Note: To aid in the migration away from guice, all objects returned as components will be bound in guice
* to themselves.
*
* @param client A client to make requests to the system
* @param clusterService A service to allow watching and updating cluster state
* @param threadPool A service to allow retrieving an executor to run an async action
* @param resourceWatcherService A service to watch for changes to node local files
* @param scriptService A service to allow running scripts on the local node
* @param xContentRegistry the registry for extensible xContent parsing
* @param environment the environment for path and setting configurations
* @param nodeEnvironment the node environment used coordinate access to the data paths
* @param namedWriteableRegistry the registry for {@link NamedWriteable} object parsing
* @param indexNameExpressionResolver A service that resolves expression to index and alias names
* @param repositoriesServiceSupplier A supplier for the service that manages snapshot repositories; will return null when this method
* is called, but will return the repositories service once the node is initialized.
* @param tracer the tracer to add tracing instrumentation.
* @param metricsRegistry the registry for metrics instrumentation.
*/
default Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier,
Tracer tracer,
MetricsRegistry metricsRegistry
) {
return Collections.emptyList();
}
}
94 changes: 94 additions & 0 deletions server/src/test/java/org/opensearch/node/NodeTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,23 @@
import org.apache.lucene.tests.util.LuceneTestCase;
import org.opensearch.bootstrap.BootstrapCheck;
import org.opensearch.bootstrap.BootstrapContext;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SetOnce;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.transport.BoundTransportAddress;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexService;
Expand All @@ -56,22 +62,35 @@
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.plugins.CircuitBreakerPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.TelemetryAwarePlugin;
import org.opensearch.plugins.TelemetryPlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.telemetry.Telemetry;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.test.FeatureFlagSetter;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.MockHttpTransport;
import org.opensearch.test.NodeRoles;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
Expand Down Expand Up @@ -404,6 +423,81 @@ public void testCreateWithFileCache() throws Exception {
}
}

public void testTelemetryAwarePlugins() throws IOException {
Settings.Builder settings = baseSettings();
List<Class<? extends Plugin>> plugins = basePlugins();
plugins.add(MockTelemetryAwarePlugin.class);
try (Node node = new MockNode(settings.build(), plugins)) {
MockTelemetryAwareComponent mockTelemetryAwareComponent = node.injector().getInstance(MockTelemetryAwareComponent.class);
assertNotNull(mockTelemetryAwareComponent.getTracer());
assertNotNull(mockTelemetryAwareComponent.getMetricsRegistry());
TelemetryAwarePlugin telemetryAwarePlugin = node.getPluginsService().filterPlugins(TelemetryAwarePlugin.class).get(0);
assertTrue(telemetryAwarePlugin instanceof MockTelemetryAwarePlugin);
}
}

public void testTelemetryPluginShouldNOTImplementTelemetryAwarePlugin() throws IOException {
Settings.Builder settings = baseSettings();
List<Class<? extends Plugin>> plugins = basePlugins();
plugins.add(MockTelemetryPlugin.class);
FeatureFlagSetter.set(FeatureFlags.TELEMETRY);
settings.put(TelemetrySettings.TRACER_FEATURE_ENABLED_SETTING.getKey(), true);
assertThrows(IllegalStateException.class, () -> new MockNode(settings.build(), plugins));
}

private static class MockTelemetryAwareComponent {
private final Tracer tracer;
private final MetricsRegistry metricsRegistry;

public MockTelemetryAwareComponent(Tracer tracer, MetricsRegistry metricsRegistry) {
this.tracer = tracer;
this.metricsRegistry = metricsRegistry;
}

public Tracer getTracer() {
return tracer;
}

public MetricsRegistry getMetricsRegistry() {
return metricsRegistry;
}
}

public static class MockTelemetryAwarePlugin extends Plugin implements TelemetryAwarePlugin {
@Override
public Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier,
Tracer tracer,
MetricsRegistry metricsRegistry
) {
return List.of(new MockTelemetryAwareComponent(tracer, metricsRegistry));
}

}

public static class MockTelemetryPlugin extends Plugin implements TelemetryPlugin, TelemetryAwarePlugin {

@Override
public Optional<Telemetry> getTelemetry(TelemetrySettings telemetrySettings) {
return Optional.empty();
}

@Override
public String getName() {
return null;
}
}

public static class MockCircuitBreakerPlugin extends Plugin implements CircuitBreakerPlugin {

private SetOnce<CircuitBreaker> myCircuitBreaker = new SetOnce<>();
Expand Down

0 comments on commit 6ba6f59

Please sign in to comment.