From efe6b758c77449b3f72a3ec437bdce6356bb50ea Mon Sep 17 00:00:00 2001 From: Steve Gordon Date: Wed, 20 Nov 2024 16:35:41 +0000 Subject: [PATCH] Refactor OpenTelemetryData to Action (#143) Closes #142 --- .../Components/VirtualizedCluster.cs | 4 +- .../OpenTelemetry/OpenTelemetry.cs | 19 ++------- .../OpenTelemetry/OpenTelemetryData.cs | 23 ----------- src/Elastic.Transport/DistributedTransport.cs | 36 ++++++++--------- src/Elastic.Transport/ITransport.cs | 11 ++--- .../ITransportHttpMethodExtensions.cs | 40 +++++++++---------- .../MetaData/DefaultMetaHeaderProvider.cs | 2 +- .../OpenTelemetryTests.cs | 19 +++------ 8 files changed, 54 insertions(+), 100 deletions(-) delete mode 100644 src/Elastic.Transport/Diagnostics/OpenTelemetry/OpenTelemetryData.cs diff --git a/src/Elastic.Transport.VirtualizedCluster/Components/VirtualizedCluster.cs b/src/Elastic.Transport.VirtualizedCluster/Components/VirtualizedCluster.cs index 6d8d5ff..4b1dfc2 100644 --- a/src/Elastic.Transport.VirtualizedCluster/Components/VirtualizedCluster.cs +++ b/src/Elastic.Transport.VirtualizedCluster/Components/VirtualizedCluster.cs @@ -32,7 +32,7 @@ internal VirtualizedCluster(TransportConfigurationDescriptor settings) _syncCall = (t, r) => t.Request( path: RootPath, postData: PostData.Serializable(new { }), - openTelemetryData: default, + null, localConfiguration: r?.Invoke(new RequestConfigurationDescriptor()) ); _asyncCall = async (t, r) => @@ -41,7 +41,7 @@ internal VirtualizedCluster(TransportConfigurationDescriptor settings) ( path: RootPath, postData: PostData.Serializable(new { }), - openTelemetryData: default, + null, localConfiguration: r?.Invoke(new RequestConfigurationDescriptor()), CancellationToken.None ).ConfigureAwait(false); diff --git a/src/Elastic.Transport/Diagnostics/OpenTelemetry/OpenTelemetry.cs b/src/Elastic.Transport/Diagnostics/OpenTelemetry/OpenTelemetry.cs index 2306001..ba70e04 100644 --- a/src/Elastic.Transport/Diagnostics/OpenTelemetry/OpenTelemetry.cs +++ b/src/Elastic.Transport/Diagnostics/OpenTelemetry/OpenTelemetry.cs @@ -5,7 +5,9 @@ using System; using System.Diagnostics; +#pragma warning disable IDE0130 // Namespace does not match folder structure namespace Elastic.Transport.Diagnostics; +#pragma warning restore IDE0130 // Namespace does not match folder structure /// /// Activity information for OpenTelemetry instrumentation. @@ -37,7 +39,7 @@ public static class OpenTelemetry internal static bool CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested => ElasticTransportActivitySource.HasListeners() && ((Activity.Current?.Source.Name.Equals(ElasticTransportActivitySourceName, StringComparison.Ordinal) ?? false) && (Activity.Current?.IsAllDataRequested ?? false)); - internal static void SetCommonAttributes(Activity? activity, OpenTelemetryData openTelemetryData, ITransportConfiguration settings) + internal static void SetCommonAttributes(Activity? activity, ITransportConfiguration settings) { if (activity is null) return; @@ -51,20 +53,7 @@ internal static void SetCommonAttributes(Activity? activity, OpenTelemetryData o } var productSchemaVersion = string.Empty; - if (openTelemetryData.SpanAttributes is not null) - { - foreach (var attribute in openTelemetryData.SpanAttributes) - { - activity?.SetTag(attribute.Key, attribute.Value); - - if (attribute.Key.Equals(OpenTelemetryAttributes.DbElasticsearchSchemaUrl, StringComparison.Ordinal)) - { - if (attribute.Value is string schemaVersion) - productSchemaVersion = schemaVersion; - } - } - } - + // We add the client schema version only when it differs from the product schema version if (!productSchemaVersion.Equals(OpenTelemetrySchemaVersion, StringComparison.Ordinal)) activity?.SetTag(OpenTelemetryAttributes.ElasticTransportSchemaVersion, OpenTelemetrySchemaVersion); diff --git a/src/Elastic.Transport/Diagnostics/OpenTelemetry/OpenTelemetryData.cs b/src/Elastic.Transport/Diagnostics/OpenTelemetry/OpenTelemetryData.cs deleted file mode 100644 index aac594c..0000000 --- a/src/Elastic.Transport/Diagnostics/OpenTelemetry/OpenTelemetryData.cs +++ /dev/null @@ -1,23 +0,0 @@ -// Licensed to Elasticsearch B.V under one or more agreements. -// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. -// See the LICENSE file in the project root for more information - -using System.Collections.Generic; - -namespace Elastic.Transport.Diagnostics; - -/// -/// Allows consumers to pass specific values for OpenTelemetry instrumentation for each request. -/// -public readonly struct OpenTelemetryData -{ - /// - /// The name to use for spans relating to a request. - /// - public string? SpanName { get; init; } - - /// - /// Additional span attributes for transport spans relating to a request. - /// - public Dictionary? SpanAttributes { get; init; } -} diff --git a/src/Elastic.Transport/DistributedTransport.cs b/src/Elastic.Transport/DistributedTransport.cs index 9c1088c..dbe50ee 100644 --- a/src/Elastic.Transport/DistributedTransport.cs +++ b/src/Elastic.Transport/DistributedTransport.cs @@ -20,15 +20,13 @@ namespace Elastic.Transport; /// -public sealed class DistributedTransport : DistributedTransport +/// +/// Transport coordinates the client requests over the node pool nodes and is in charge of falling over on +/// different nodes +/// +/// The configuration to use for this transport +public sealed class DistributedTransport(ITransportConfiguration configuration) : DistributedTransport(configuration) { - /// - /// Transport coordinates the client requests over the node pool nodes and is in charge of falling over on - /// different nodes - /// - /// The configuration to use for this transport - public DistributedTransport(ITransportConfiguration configuration) - : base(configuration) { } } /// @@ -65,28 +63,28 @@ public DistributedTransport(TConfiguration configuration) public TResponse Request( in EndpointPath path, PostData? data, - in OpenTelemetryData openTelemetryData, + Action? configureActivity, IRequestConfiguration? localConfiguration ) where TResponse : TransportResponse, new() => - RequestCoreAsync(isAsync: false, path, data, openTelemetryData, localConfiguration) + RequestCoreAsync(isAsync: false, path, data, configureActivity, localConfiguration) .EnsureCompleted(); /// public Task RequestAsync( in EndpointPath path, PostData? data, - in OpenTelemetryData openTelemetryData, + Action? configureActivity, IRequestConfiguration? localConfiguration, CancellationToken cancellationToken = default ) where TResponse : TransportResponse, new() => - RequestCoreAsync(isAsync: true, path, data, openTelemetryData, localConfiguration, cancellationToken) + RequestCoreAsync(isAsync: true, path, data, configureActivity, localConfiguration, cancellationToken) .AsTask(); private async ValueTask RequestCoreAsync( bool isAsync, EndpointPath path, PostData? data, - OpenTelemetryData openTelemetryData, + Action? configureActivity, IRequestConfiguration? localConfiguration, CancellationToken cancellationToken = default ) where TResponse : TransportResponse, new() @@ -94,7 +92,7 @@ private async ValueTask RequestCoreAsync( Activity activity = null; if (OpenTelemetry.ElasticTransportActivitySource.HasListeners()) - activity = OpenTelemetry.ElasticTransportActivitySource.StartActivity(openTelemetryData.SpanName ?? path.Method.GetStringValue(), + activity = OpenTelemetry.ElasticTransportActivitySource.StartActivity(path.Method.GetStringValue(), ActivityKind.Client); try @@ -127,7 +125,7 @@ private async ValueTask RequestCoreAsync( if (activity is { IsAllDataRequested: true }) { if (activity.IsAllDataRequested) - OpenTelemetry.SetCommonAttributes(activity, openTelemetryData, Configuration); + OpenTelemetry.SetCommonAttributes(activity, Configuration); if (Configuration.Authentication is BasicAuthentication basicAuthentication) activity.SetTag(SemanticConventions.DbUser, basicAuthentication.Username); @@ -136,11 +134,6 @@ private async ValueTask RequestCoreAsync( activity.SetTag(OpenTelemetryAttributes.ElasticTransportProductVersion, Configuration.ProductRegistration.ProductAssemblyVersion); activity.SetTag(OpenTelemetryAttributes.ElasticTransportVersion, ReflectionVersionInfo.TransportVersion); activity.SetTag(SemanticConventions.UserAgentOriginal, Configuration.UserAgent.ToString()); - - if (openTelemetryData.SpanAttributes is not null) - foreach (var attribute in openTelemetryData.SpanAttributes) - activity.SetTag(attribute.Key, attribute.Value); - activity.SetTag(SemanticConventions.HttpRequestMethod, endpoint.Method.GetStringValue()); } @@ -268,6 +261,9 @@ private async ValueTask RequestCoreAsync( activity?.SetTag(SemanticConventions.HttpResponseStatusCode, response.ApiCallDetails.HttpStatusCode); activity?.SetTag(OpenTelemetryAttributes.ElasticTransportAttemptedNodes, attemptedNodes); + if (configureActivity is not null && activity is not null) + configureActivity.Invoke(activity); + return FinalizeResponse(endpoint, boundConfiguration, data, pipeline, startedOn, auditor, seenExceptions, response); } finally diff --git a/src/Elastic.Transport/ITransport.cs b/src/Elastic.Transport/ITransport.cs index 2b8bd15..7a4757a 100644 --- a/src/Elastic.Transport/ITransport.cs +++ b/src/Elastic.Transport/ITransport.cs @@ -2,9 +2,10 @@ // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information +using System.Diagnostics; +using System; using System.Threading; using System.Threading.Tasks; -using Elastic.Transport.Diagnostics; namespace Elastic.Transport; @@ -20,7 +21,7 @@ public interface ITransport /// The type to deserialize the response body into. /// The path of the request. /// The data to be included as the body of the HTTP request. - /// Data to be used to control the OpenTelemetry instrumentation. + /// An optional used to configure the . /// Per request configuration /// Allows callers to override completely how `TResponse` should be deserialized to a `TResponse` that implements instance. /// Expert setting only @@ -28,7 +29,7 @@ public interface ITransport public TResponse Request( in EndpointPath path, PostData? postData, - in OpenTelemetryData openTelemetryData, + Action? configureActivity, IRequestConfiguration? localConfiguration ) where TResponse : TransportResponse, new(); @@ -40,7 +41,7 @@ public TResponse Request( /// The path of the request. /// The data to be included as the body of the HTTP request. /// The cancellation token to use. - /// Data to be used to control the OpenTelemetry instrumentation. + /// An optional used to configure the . /// Per request configuration /// Allows callers to override completely how `TResponse` should be deserialized to a `TResponse` that implements instance. /// Expert setting only @@ -48,7 +49,7 @@ public TResponse Request( public Task RequestAsync( in EndpointPath path, PostData? postData, - in OpenTelemetryData openTelemetryData, + Action? configureActivity, IRequestConfiguration? localConfiguration, CancellationToken cancellationToken = default ) diff --git a/src/Elastic.Transport/ITransportHttpMethodExtensions.cs b/src/Elastic.Transport/ITransportHttpMethodExtensions.cs index ee4e656..c39f403 100644 --- a/src/Elastic.Transport/ITransportHttpMethodExtensions.cs +++ b/src/Elastic.Transport/ITransportHttpMethodExtensions.cs @@ -19,98 +19,98 @@ private static EndpointPath ToEndpointPath(HttpMethod method, string path, Reque /// Perform a GET request public static TResponse Get(this ITransport transport, string path, RequestParameters parameters) where TResponse : TransportResponse, new() => - transport.Request(ToEndpointPath(GET, path, parameters, transport.Configuration), postData: null, openTelemetryData: default, null); + transport.Request(ToEndpointPath(GET, path, parameters, transport.Configuration), postData: null, null, null); /// Perform a GET request public static Task GetAsync(this ITransport transport, string path, RequestParameters parameters, CancellationToken cancellationToken = default) where TResponse : TransportResponse, new() => - transport.RequestAsync(ToEndpointPath(GET, path, parameters, transport.Configuration), postData: null, openTelemetryData: default, null, cancellationToken); + transport.RequestAsync(ToEndpointPath(GET, path, parameters, transport.Configuration), postData: null, null, null, cancellationToken); /// Perform a GET request public static TResponse Get(this ITransport transport, string pathAndQuery) where TResponse : TransportResponse, new() => - transport.Request(new EndpointPath(GET, pathAndQuery), postData: null, openTelemetryData: default, null); + transport.Request(new EndpointPath(GET, pathAndQuery), postData: null, null, null); /// Perform a GET request public static Task GetAsync(this ITransport transport, string pathAndQuery, CancellationToken cancellationToken = default) where TResponse : TransportResponse, new() => - transport.RequestAsync(new EndpointPath(GET, pathAndQuery), postData: null, openTelemetryData: default, null, cancellationToken); + transport.RequestAsync(new EndpointPath(GET, pathAndQuery), postData: null, null, null, cancellationToken); /// Perform a HEAD request public static VoidResponse Head(this ITransport transport, string path, RequestParameters parameters) - => transport.Request(ToEndpointPath(HEAD, path, parameters, transport.Configuration), postData: null, openTelemetryData: default, null); + => transport.Request(ToEndpointPath(HEAD, path, parameters, transport.Configuration), postData: null, null, null); /// Perform a HEAD request public static Task HeadAsync(this ITransport transport, string path, RequestParameters parameters, CancellationToken cancellationToken = default) - => transport.RequestAsync(ToEndpointPath(HEAD, path, parameters, transport.Configuration), postData: null, openTelemetryData: default, null, cancellationToken); + => transport.RequestAsync(ToEndpointPath(HEAD, path, parameters, transport.Configuration), postData: null, null, null, cancellationToken); /// Perform a HEAD request public static VoidResponse Head(this ITransport transport, string pathAndQuery) - => transport.Request(new EndpointPath(HEAD, pathAndQuery), postData: null, openTelemetryData: default, null); + => transport.Request(new EndpointPath(HEAD, pathAndQuery), postData: null, null, null); /// Perform a HEAD request public static Task HeadAsync(this ITransport transport, string pathAndQuery, CancellationToken cancellationToken = default) - => transport.RequestAsync(new EndpointPath(HEAD, pathAndQuery), postData: null, openTelemetryData: default, null, cancellationToken); + => transport.RequestAsync(new EndpointPath(HEAD, pathAndQuery), postData: null, null, null, cancellationToken); /// Perform a POST request public static TResponse Post(this ITransport transport, string path, PostData data, RequestParameters parameters) where TResponse : TransportResponse, new() => - transport.Request(ToEndpointPath(POST, path, parameters, transport.Configuration), data, openTelemetryData: default, null); + transport.Request(ToEndpointPath(POST, path, parameters, transport.Configuration), data, null, null); /// Perform a POST request public static Task PostAsync(this ITransport transport, string path, PostData data, RequestParameters parameters, CancellationToken cancellationToken = default) where TResponse : TransportResponse, new() => - transport.RequestAsync(ToEndpointPath(POST, path, parameters, transport.Configuration), data, openTelemetryData: default, null, cancellationToken); + transport.RequestAsync(ToEndpointPath(POST, path, parameters, transport.Configuration), data, null, null, cancellationToken); /// Perform a POST request public static TResponse Post(this ITransport transport, string pathAndQuery, PostData data) where TResponse : TransportResponse, new() => - transport.Request(new EndpointPath(POST, pathAndQuery), data, openTelemetryData: default, null); + transport.Request(new EndpointPath(POST, pathAndQuery), data, null, null); /// Perform a POST request public static Task PostAsync(this ITransport transport, string pathAndQuery, PostData data, CancellationToken cancellationToken = default) where TResponse : TransportResponse, new() => - transport.RequestAsync(new EndpointPath(POST, pathAndQuery), data, openTelemetryData: default, null, cancellationToken); + transport.RequestAsync(new EndpointPath(POST, pathAndQuery), data, null, null, cancellationToken); /// Perform a PUT request public static TResponse Put(this ITransport transport, string path, PostData data, RequestParameters parameters) where TResponse : TransportResponse, new() => - transport.Request(ToEndpointPath(PUT, path, parameters, transport.Configuration), data, openTelemetryData: default, null); + transport.Request(ToEndpointPath(PUT, path, parameters, transport.Configuration), data, null, null); /// Perform a PUT request public static Task PutAsync(this ITransport transport, string path, PostData data, RequestParameters parameters, CancellationToken cancellationToken = default) where TResponse : TransportResponse, new() => - transport.RequestAsync(ToEndpointPath(PUT, path, parameters, transport.Configuration), data, openTelemetryData: default, null, cancellationToken); + transport.RequestAsync(ToEndpointPath(PUT, path, parameters, transport.Configuration), data, null, null, cancellationToken); /// Perform a PUT request public static TResponse Put(this ITransport transport, string pathAndQuery, PostData data) where TResponse : TransportResponse, new() => - transport.Request(new EndpointPath(PUT, pathAndQuery), data, openTelemetryData: default, null); + transport.Request(new EndpointPath(PUT, pathAndQuery), data, null, null); /// Perform a PUT request public static Task PutAsync(this ITransport transport, string pathAndQuery, PostData data, CancellationToken cancellationToken = default) where TResponse : TransportResponse, new() => - transport.RequestAsync(new EndpointPath(PUT, pathAndQuery), data, openTelemetryData: default, null, cancellationToken); + transport.RequestAsync(new EndpointPath(PUT, pathAndQuery), data, null, null, cancellationToken); /// Perform a DELETE request public static TResponse Delete(this ITransport transport, string path, RequestParameters parameters, PostData? data = null) where TResponse : TransportResponse, new() => - transport.Request(ToEndpointPath(DELETE, path, parameters, transport.Configuration), data, openTelemetryData: default, null); + transport.Request(ToEndpointPath(DELETE, path, parameters, transport.Configuration), data, null, null); /// Perform a DELETE request public static Task DeleteAsync(this ITransport transport, string path, RequestParameters parameters, PostData? data = null, CancellationToken cancellationToken = default) where TResponse : TransportResponse, new() => - transport.RequestAsync(ToEndpointPath(DELETE, path, parameters, transport.Configuration), data, openTelemetryData: default, null, cancellationToken); + transport.RequestAsync(ToEndpointPath(DELETE, path, parameters, transport.Configuration), data, null, null, cancellationToken); /// Perform a DELETE request public static TResponse Delete(this ITransport transport, string pathAndQuery, PostData? data = null) where TResponse : TransportResponse, new() => - transport.Request(new EndpointPath(DELETE, pathAndQuery), data, openTelemetryData: default, null); + transport.Request(new EndpointPath(DELETE, pathAndQuery), data, null, null); /// Perform a DELETE request public static Task DeleteAsync(this ITransport transport, string pathAndQuery, PostData? data = null, CancellationToken cancellationToken = default) where TResponse : TransportResponse, new() => - transport.RequestAsync(new EndpointPath(DELETE, pathAndQuery), data, openTelemetryData: default, null, cancellationToken); + transport.RequestAsync(new EndpointPath(DELETE, pathAndQuery), data, null, null, cancellationToken); } diff --git a/src/Elastic.Transport/Requests/MetaData/DefaultMetaHeaderProvider.cs b/src/Elastic.Transport/Requests/MetaData/DefaultMetaHeaderProvider.cs index fa3de26..1c71e68 100644 --- a/src/Elastic.Transport/Requests/MetaData/DefaultMetaHeaderProvider.cs +++ b/src/Elastic.Transport/Requests/MetaData/DefaultMetaHeaderProvider.cs @@ -72,7 +72,7 @@ public DefaultMetaHeaderProducer(VersionInfo versionInfo, string serviceIdentifi : _syncMetaDataHeader.ToString(); // TODO - Cache values against key to avoid allocating a string each time - if (boundConfiguration.RequestMetaData.Items.TryGetValue(RequestMetaData.HelperKey, out var helperSuffix)) + if (boundConfiguration.RequestMetaData is not null && boundConfiguration.RequestMetaData.Items.TryGetValue(RequestMetaData.HelperKey, out var helperSuffix)) headerValue = $"{headerValue},h={helperSuffix}"; return headerValue; diff --git a/tests/Elastic.Transport.Tests/OpenTelemetryTests.cs b/tests/Elastic.Transport.Tests/OpenTelemetryTests.cs index 637e468..315849b 100644 --- a/tests/Elastic.Transport.Tests/OpenTelemetryTests.cs +++ b/tests/Elastic.Transport.Tests/OpenTelemetryTests.cs @@ -3,7 +3,6 @@ // See the LICENSE file in the project root for more information using System; -using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Reflection; @@ -92,7 +91,7 @@ public async Task PreferSpanNameFromOpenTelemetryData() { const string spanName = "Overridden span name"; - await TestCoreAsync(Assertions, new OpenTelemetryData { SpanName = spanName }); + await TestCoreAsync(Assertions, static a => a.DisplayName = spanName); static void Assertions(Activity activity) { @@ -106,15 +105,7 @@ public async Task IncludeAttributesFromOpenTelemetryData() const string attributeName = "test.attribute"; const string attributeValue = "test-value"; - var otel = new OpenTelemetryData - { - SpanAttributes = new Dictionary - { - [attributeName] = attributeValue - } - }; - - await TestCoreAsync(Assertions, otel); + await TestCoreAsync(Assertions, static a => a.AddTag(attributeName, attributeValue)); static void Assertions(Activity activity) { @@ -122,9 +113,9 @@ static void Assertions(Activity activity) } } - private Task TestCoreAsync(Action assertion) => TestCoreAsync(assertion, default); + private static Task TestCoreAsync(Action assertion) => TestCoreAsync(assertion, default); - private async Task TestCoreAsync(Action assertions, OpenTelemetryData openTelemetryData, ITransport transport = null) + private static async Task TestCoreAsync(Action assertions, Action configureActivity, ITransport transport = null) { var mre = new ManualResetEvent(false); @@ -149,7 +140,7 @@ private async Task TestCoreAsync(Action assertions, OpenTelemetryData transport ??= new DistributedTransport(InMemoryConnectionFactory.Create()); - _ = await transport.RequestAsync(new EndpointPath(HttpMethod.GET, "/"), null, openTelemetryData, null, default); + _ = await transport.RequestAsync(new EndpointPath(HttpMethod.GET, "/"), null, configureActivity, null, default); mre.WaitOne(TimeSpan.FromSeconds(1)).Should().BeTrue(); }