Skip to content

Commit

Permalink
Added instrumentation via System.Diagnostics.Activity (#110)
Browse files Browse the repository at this point in the history
  • Loading branch information
mburumaxwell authored Jun 6, 2024
1 parent d443882 commit ccd70d1
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 8 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ services.AddSingleton<Medallion.Threading.IDistributedLockProvider>(provider =>
});
```

### Instrumentation and observability

This core library (`Tingle.PeriodicTasks`) is instrumented using `System.Diagnostics.Activity` and `System.Diagnostics.ActivitySource`.
This makes it easy to use with OpenTelemetry by listening to the `Tingle.PeriodicTasks` activity source.

```cs
services.AddOpenTelemetry().WithTracing().AddSource("Tingle.PeriodicTasks");
```

### Management via endpoints in AspNetCore

You can choose to manage the periodic tasks in using endpoints in AspNetCore. Update your application setup as follows.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
using System.Diagnostics;
using System.Reflection;

namespace Tingle.PeriodicTasks.Diagnostics;

///
public static class PeriodicTasksActivitySource
{
private static readonly AssemblyName AssemblyName = typeof(PeriodicTasksActivitySource).Assembly.GetName();
private static readonly string ActivitySourceName = AssemblyName.Name!;
private static readonly Version Version = AssemblyName.Version!;
private static readonly ActivitySource ActivitySource = new(ActivitySourceName, Version.ToString());

/// <summary>
/// Creates a new activity if there are active listeners for it, using the specified
/// name, activity kind, and parent Id.
/// </summary>
/// <param name="name"></param>
/// <param name="kind"></param>
/// <param name="parentId"></param>
/// <returns></returns>
public static Activity? StartActivity(string name, ActivityKind kind = ActivityKind.Internal, string? parentId = null)
{
return parentId is not null
? ActivitySource.StartActivity(name: name, kind: kind, parentId: parentId)
: ActivitySource.StartActivity(name: name, kind: kind);
}
}

/// <summary>
/// Names for activities generated by PeriodicTasks
/// </summary>
public static class ActivityNames
{
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
public const string Execute = "Tingle.PeriodicTasks.Execute";
public const string ExecuteAttempt = "Tingle.PeriodicTasks.ExecuteAttempt";
#pragma warning restore CS1591 // Missing XML comment for publicly visible type or member
}

/// <summary>
/// Names for tags added to activities generated by PeriodicTasks
/// </summary>
public static class ActivityTagNames
{
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
public const string PeriodicTaskType = "periodictask.type";
public const string PeriodicTaskName = "periodictask.name";
public const string PeriodicTaskSchedule = "periodictask.schedule";
public const string PeriodicTaskTimezone = "periodictask.timezone";
public const string PeriodicTaskDeadline = "periodictask.deadline";
public const string PeriodicTaskAttemptNumber = "periodictask.attempt_number";
#pragma warning restore CS1591 // Missing XML comment for publicly visible type or member
}
57 changes: 57 additions & 0 deletions src/Tingle.PeriodicTasks/Extensions/ActivityExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
namespace System.Diagnostics;

internal static class ActivityExtensions
{
// Copied from https://github.com/dotnet/runtime/pull/102905/files
#if !NET9_0_OR_GREATER
public static Activity AddException(this Activity activity, Exception exception, TagList tags = default, DateTimeOffset timestamp = default)
{
ArgumentNullException.ThrowIfNull(activity);
ArgumentNullException.ThrowIfNull(exception);

TagList exceptionTags = tags;

const string ExceptionEventName = "exception";
const string ExceptionMessageTag = "exception.message";
const string ExceptionStackTraceTag = "exception.stacktrace";
const string ExceptionTypeTag = "exception.type";

bool hasMessage = false;
bool hasStackTrace = false;
bool hasType = false;

for (int i = 0; i < exceptionTags.Count; i++)
{
if (exceptionTags[i].Key == ExceptionMessageTag)
{
hasMessage = true;
}
else if (exceptionTags[i].Key == ExceptionStackTraceTag)
{
hasStackTrace = true;
}
else if (exceptionTags[i].Key == ExceptionTypeTag)
{
hasType = true;
}
}

if (!hasMessage)
{
exceptionTags.Add(new KeyValuePair<string, object?>(ExceptionMessageTag, exception.Message));
}

if (!hasStackTrace)
{
exceptionTags.Add(new KeyValuePair<string, object?>(ExceptionStackTraceTag, exception.ToString()));
}

if (!hasType)
{
exceptionTags.Add(new KeyValuePair<string, object?>(ExceptionTypeTag, exception.GetType().ToString()));
}

return activity.AddEvent(new ActivityEvent(ExceptionEventName, timestamp));
}
#endif
}
64 changes: 58 additions & 6 deletions src/Tingle.PeriodicTasks/Internal/PeriodicTaskRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using Tingle.PeriodicTasks.Diagnostics;

namespace Tingle.PeriodicTasks.Internal;

Expand Down Expand Up @@ -65,13 +67,26 @@ public async Task RunAsync(string name, CancellationToken cancellationToken = de
{
var options = optionsMonitor.Get(name);

// Instrumentation
using var activity = PeriodicTasksActivitySource.StartActivity(ActivityNames.Execute, ActivityKind.Consumer);
if (activity is not null)
{
activity.DisplayName = $"Periodic Task: {name}";
activity.AddTag(ActivityTagNames.PeriodicTaskName, name);
activity.AddTag(ActivityTagNames.PeriodicTaskType, typeof(TTask).FullName);
activity.AddTag(ActivityTagNames.PeriodicTaskSchedule, options.Schedule!.ToString());
activity.AddTag(ActivityTagNames.PeriodicTaskTimezone, TimeZoneInfo.FindSystemTimeZoneById(options.Timezone!).Id);
activity.AddTag(ActivityTagNames.PeriodicTaskDeadline, options.Deadline!.ToString());
}

// create linked CancellationTokenSource and attach deadline if not null
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(options.Deadline!.Value);

// execute the task
var id = idGenerator.Generate(name, options.ExecutionIdFormat!.Value);
var t = ExecuteInnerAsync(executionId: id,
var t = ExecuteInnerAsync(activity: activity,
executionId: id,
name: name,
options: options,
throwOnError: throwOnError,
Expand All @@ -97,7 +112,7 @@ public async Task RunAsync(string name, CancellationToken cancellationToken = de
return null;
}

internal async Task<PeriodicTaskExecutionAttempt?> ExecuteInnerAsync(string executionId, string name, PeriodicTaskOptions options, bool throwOnError, CancellationToken cancellationToken = default)
internal async Task<PeriodicTaskExecutionAttempt?> ExecuteInnerAsync(Activity? activity, string executionId, string name, PeriodicTaskOptions options, bool throwOnError, CancellationToken cancellationToken = default)
{
var start = DateTimeOffset.UtcNow;
var lockName = options.LockName!;
Expand Down Expand Up @@ -126,24 +141,37 @@ public async Task RunAsync(string name, CancellationToken cancellationToken = de
{
var task = ActivatorUtilities.GetServiceOrCreateInstance<TTask>(provider);

var context = new PeriodicTaskExecutionContext(name, executionId) { TaskType = typeof(TTask), };
var context = new PeriodicTaskExecutionContext(name, executionId, typeof(TTask));

// Invoke handler method, with resilience pipeline if specified
var resiliencePipeline = options.ResiliencePipeline;
if (resiliencePipeline != null)
if (resiliencePipeline is not null)
{
var contextData = new Dictionary<string, object> { ["context"] = context, };
await resiliencePipeline.ExecuteAsync(async (ctx, ct) => await task.ExecuteAsync(context, cts.Token).ConfigureAwait(false), contextData, cancellationToken).ConfigureAwait(false);
var attemptNumber = 0;
await resiliencePipeline.ExecuteAsync(
async (ctx, ct) =>
{
attemptNumber++;
await ExecuteTrackedAsync(task, context, attemptNumber, cts.Token).ConfigureAwait(false);
},
contextData,
cancellationToken
).ConfigureAwait(false);
}
else
{
await task.ExecuteAsync(context, cts.Token).ConfigureAwait(false);
await ExecuteTrackedAsync(task, context, 1, cts.Token).ConfigureAwait(false);
}
}
catch (Exception ex)
{
Interlocked.Exchange(ref caught, ex);
logger.ExceptionInPeriodicTask(ex, executionId);

// record the exception in the activity and set the status to error
activity?.SetStatus(ActivityStatusCode.Error);
activity?.AddException(ex);
}

var end = DateTimeOffset.UtcNow;
Expand Down Expand Up @@ -203,4 +231,28 @@ public async Task RunAsync(string name, CancellationToken cancellationToken = de

return attempt;
}

private async Task ExecuteTrackedAsync(IPeriodicTask task, PeriodicTaskExecutionContext context, int attemptNumber, CancellationToken cancellationToken)
{
using var activity = PeriodicTasksActivitySource.StartActivity(ActivityNames.ExecuteAttempt, ActivityKind.Consumer);
if (activity is not null)
{
activity.DisplayName = $"Periodic Task: {context.Name} (Attempt: {attemptNumber})";
activity.AddTag(ActivityTagNames.PeriodicTaskName, context.Name);
activity.AddTag(ActivityTagNames.PeriodicTaskType, typeof(TTask).FullName);
activity.AddTag(ActivityTagNames.PeriodicTaskAttemptNumber, attemptNumber);
}

try
{
await task.ExecuteAsync(context, cancellationToken).ConfigureAwait(false);
activity?.SetStatus(ActivityStatusCode.Ok);
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error);
activity?.AddException(ex);
throw;
}
}
}
6 changes: 4 additions & 2 deletions src/Tingle.PeriodicTasks/PeriodicTaskExecutionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ public class PeriodicTaskExecutionContext
/// This value is different for tasks of the same type but multiple registrations.
/// </param>
/// <param name="executionId">Unique identifier of the execution.</param>
/// <param name="taskType">Periodic task type.</param>
/// <exception cref="ArgumentNullException"></exception>
public PeriodicTaskExecutionContext(string name, string executionId)
public PeriodicTaskExecutionContext(string name, string executionId, Type taskType)
{
Name = name ?? throw new ArgumentNullException(nameof(name));
ExecutionId = executionId ?? throw new ArgumentNullException(nameof(executionId));
TaskType = taskType ?? throw new ArgumentNullException(nameof(taskType));
}

/// <summary>
Expand All @@ -30,5 +32,5 @@ public PeriodicTaskExecutionContext(string name, string executionId)
public string ExecutionId { get; }

/// <summary>Periodic task type.</summary>
public Type? TaskType { get; init; }
public Type TaskType { get; }
}

0 comments on commit ccd70d1

Please sign in to comment.