diff --git a/src/Tingle.EventBus.Transports.Amazon.Kinesis/AmazonKinesisTransport.cs b/src/Tingle.EventBus.Transports.Amazon.Kinesis/AmazonKinesisTransport.cs index 6a1dda0a..edb46f9d 100644 --- a/src/Tingle.EventBus.Transports.Amazon.Kinesis/AmazonKinesisTransport.cs +++ b/src/Tingle.EventBus.Transports.Amazon.Kinesis/AmazonKinesisTransport.cs @@ -71,7 +71,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) // log warning when trying to publish scheduled message if (scheduled != null) { - Logger.LogWarning("Amazon Kinesis does not support delay or scheduled publish"); + Logger.SchedulingNotSupported(); } using var scope = CreateScope(); @@ -81,7 +81,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) cancellationToken: cancellationToken); // prepare the record - var streamName = registration.EventName; + var streamName = registration.EventName!; var request = new PutRecordRequest { Data = body.ToMemoryStream(), @@ -90,7 +90,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) }; // send the event - Logger.LogInformation("Sending {Id} to '{StreamName}'. Scheduled: {Scheduled}", @event.Id, streamName, scheduled); + Logger.SendingToStream(eventId: @event.Id, streamName: streamName, scheduled: scheduled); var response = await kinesisClient.PutRecordAsync(request, cancellationToken); response.EnsureSuccess(); @@ -107,7 +107,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) // log warning when trying to publish scheduled message if (scheduled != null) { - Logger.LogWarning("Amazon Kinesis does not support delay or scheduled publish"); + Logger.SchedulingNotSupported(); } using var scope = CreateScope(); @@ -130,7 +130,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) } // prepare the request - var streamName = registration.EventName; + var streamName = registration.EventName!; var request = new PutRecordsRequest { StreamName = streamName, @@ -138,11 +138,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) }; // send the events - Logger.LogInformation("Sending {EventsCount} messages to '{StreamName}'. Scheduled: {Scheduled}. Events:\r\n- {Ids}", - events.Count, - streamName, - scheduled, - string.Join("\r\n- ", events.Select(e => e.Id))); + Logger.SendingEventsToStream(events, streamName, scheduled); var response = await kinesisClient.PutRecordsAsync(request, cancellationToken); response.EnsureSuccess(); diff --git a/src/Tingle.EventBus.Transports.Amazon.Kinesis/ILoggerExtensions.cs b/src/Tingle.EventBus.Transports.Amazon.Kinesis/ILoggerExtensions.cs new file mode 100644 index 00000000..e0511d86 --- /dev/null +++ b/src/Tingle.EventBus.Transports.Amazon.Kinesis/ILoggerExtensions.cs @@ -0,0 +1,34 @@ +using Tingle.EventBus; + +namespace Microsoft.Extensions.Logging; + +/// +/// Extensions on for the EventBus +/// +internal static partial class ILoggerExtensions +{ + [LoggerMessage(100, LogLevel.Information, "Sending {EventId} to '{StreamName}'. Scheduled: {Scheduled}.")] + public static partial void SendingToStream(this ILogger logger, string? eventId, string streamName, DateTimeOffset? scheduled); + + [LoggerMessage(101, LogLevel.Information, "Sending {EventsCount} messages to '{StreamName}'. Scheduled: {Scheduled}. Events:\r\n- {EventIds}")] + private static partial void SendingEventsToStream(this ILogger logger, int eventsCount, string streamName, DateTimeOffset? scheduled, string eventIds); + + public static void SendingEventsToStream(this ILogger logger, IList eventIds, string streamName, DateTimeOffset? scheduled) + { + if (!logger.IsEnabled(LogLevel.Information)) return; + logger.SendingEventsToStream(eventsCount: eventIds.Count, + streamName: streamName, + scheduled: scheduled, + eventIds: string.Join("\r\n- ", eventIds)); + } + + public static void SendingEventsToStream(this ILogger logger, IList> events, string entityPath, DateTimeOffset? scheduled = null) + where T : class + { + if (!logger.IsEnabled(LogLevel.Information)) return; + logger.SendingEventsToStream(events.Select(e => e.Id).ToList(), entityPath, scheduled); + } + + [LoggerMessage(102, LogLevel.Warning, "Amazon Kinesis does not support delay or scheduled publish.")] + public static partial void SchedulingNotSupported(this ILogger logger); +} diff --git a/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs b/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs index c2317bc9..54e6ae47 100644 --- a/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs +++ b/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs @@ -103,7 +103,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) // log warning when trying to publish scheduled message if (scheduled != null) { - Logger.LogWarning("Amazon SNS does not support delay or scheduled publish"); + Logger.SchedulingNotSupported(); } using var scope = CreateScope(); @@ -120,7 +120,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) .SetAttribute(MetadataNames.RequestId, @event.RequestId) .SetAttribute(MetadataNames.InitiatorId, @event.InitiatorId) .SetAttribute(MetadataNames.ActivityId, Activity.Current?.Id); - Logger.LogInformation("Sending {Id} to '{TopicArn}'. Scheduled: {Scheduled}", @event.Id, topicArn, scheduled); + Logger.SendingToTopic(eventId: @event.Id, topicArn: topicArn, scheduled: scheduled); var response = await snsClient.PublishAsync(request: request, cancellationToken: cancellationToken); response.EnsureSuccess(); @@ -135,12 +135,12 @@ public override async Task StopAsync(CancellationToken cancellationToken) CancellationToken cancellationToken = default) { // log warning when doing batch - Logger.LogWarning("Amazon SNS does not support batching. The events will be looped through one by one"); + Logger.BatchingNotSupported(); // log warning when trying to publish scheduled message if (scheduled != null) { - Logger.LogWarning("Amazon SNS does not support delay or scheduled publish"); + Logger.SchedulingNotSupported(); } using var scope = CreateScope(); @@ -162,7 +162,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) .SetAttribute(MetadataNames.RequestId, @event.RequestId) .SetAttribute(MetadataNames.InitiatorId, @event.InitiatorId) .SetAttribute(MetadataNames.ActivityId, Activity.Current?.Id); - Logger.LogInformation("Sending {Id} to '{TopicArn}'. Scheduled: {Scheduled}", @event.Id, topicArn, scheduled); + Logger.SendingToTopic(eventId: @event.Id, topicArn: topicArn, scheduled: scheduled); var response = await snsClient.PublishAsync(request: request, cancellationToken: cancellationToken); response.EnsureSuccess(); @@ -310,12 +310,12 @@ private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration if (messages.Count == 0) { var delay = TransportOptions.EmptyResultsDelay; - Logger.LogTrace("No messages on '{QueueUrl}', delaying check for {Delay}", queueUrl, delay); + Logger.NoMessages(queueUrl: queueUrl, delay: delay); await Task.Delay(delay, cancellationToken); } else { - Logger.LogDebug("Received {MessageCount} messages on '{QueueUrl}'", messages.Count, queueUrl); + Logger.ReceivedMessages(messagesCount: messages.Count, queueUrl: queueUrl); using var scope = CreateScope(); // shared foreach (var message in messages) { @@ -369,7 +369,7 @@ private async Task OnMessageReceivedAsync(EventRegistration r activity?.AddTag(ActivityTagNames.MessagingDestinationKind, "queue"); activity?.AddTag(ActivityTagNames.MessagingUrl, queueUrl); - Logger.LogDebug("Processing '{MessageId}' from '{QueueUrl}'", messageId, queueUrl); + Logger.ProcessingMessage(messageId: messageId, queueUrl: queueUrl); message.TryGetAttribute("Content-Type", out var contentType_str); var contentType = contentType_str == null ? null : new ContentType(contentType_str); @@ -381,10 +381,7 @@ private async Task OnMessageReceivedAsync(EventRegistration r identifier: messageId, cancellationToken: cancellationToken); - Logger.LogInformation("Received message: '{MessageId}' containing Event '{Id}' from '{QueueUrl}'", - messageId, - context.Id, - queueUrl); + Logger.ReceivedMessage(messageId: messageId, eventId: context.Id, queueUrl: queueUrl); var (successful, _) = await ConsumeAsync(ecr: ecr, @event: context, @@ -405,7 +402,7 @@ private async Task OnMessageReceivedAsync(EventRegistration r } // whether or not successful, always delete the message from the current queue - Logger.LogTrace("Deleting '{MessageId}' on '{QueueUrl}'", messageId, queueUrl); + Logger.DeletingMessage(messageId: messageId, queueUrl: queueUrl); await sqsClient.DeleteMessageAsync(queueUrl: queueUrl, receiptHandle: message.ReceiptHandle, cancellationToken: cancellationToken); diff --git a/src/Tingle.EventBus.Transports.Amazon.Sqs/ILoggerExtensions.cs b/src/Tingle.EventBus.Transports.Amazon.Sqs/ILoggerExtensions.cs new file mode 100644 index 00000000..3ddafade --- /dev/null +++ b/src/Tingle.EventBus.Transports.Amazon.Sqs/ILoggerExtensions.cs @@ -0,0 +1,31 @@ +namespace Microsoft.Extensions.Logging; + +/// +/// Extensions on for the EventBus +/// +internal static partial class ILoggerExtensions +{ + [LoggerMessage(100, LogLevel.Information, "Sending {EventId} to '{TopicArn}'. Scheduled: {Scheduled}.")] + public static partial void SendingToTopic(this ILogger logger, string? eventId, string topicArn, DateTimeOffset? scheduled); + + [LoggerMessage(101, LogLevel.Information, "Received message: '{MessageId}' containing Event '{EventId}' from '{QueueUrl}'")] + public static partial void ReceivedMessage(this ILogger logger, string messageId, string? eventId, string queueUrl); + + [LoggerMessage(102, LogLevel.Warning, "Amazon SNS does not support delay or scheduled publish.")] + public static partial void SchedulingNotSupported(this ILogger logger); + + [LoggerMessage(103, LogLevel.Warning, "Amazon SNS does not support batching. The events will be looped through one by one.")] + public static partial void BatchingNotSupported(this ILogger logger); + + [LoggerMessage(104, LogLevel.Trace, "No messages on '{QueueUrl}', delaying check for {Delay}.")] + public static partial void NoMessages(this ILogger logger, string queueUrl, TimeSpan delay); + + [LoggerMessage(105, LogLevel.Debug, "Received {MessagesCount} messages on '{QueueUrl}'")] + public static partial void ReceivedMessages(this ILogger logger, int messagesCount, string queueUrl); + + [LoggerMessage(106, LogLevel.Debug, "Processing '{MessageId}' from '{QueueUrl}'")] + public static partial void ProcessingMessage(this ILogger logger, string messageId, string queueUrl); + + [LoggerMessage(107, LogLevel.Trace, "Deleting '{MessageId}' from '{QueueUrl}'")] + public static partial void DeletingMessage(this ILogger logger, string messageId, string queueUrl); +} diff --git a/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs b/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs index 1e02cd82..82201425 100644 --- a/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs +++ b/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs @@ -86,18 +86,18 @@ public override async Task StopAsync(CancellationToken cancellationToken) var clients = processorsCache.Select(kvp => (key: kvp.Key, proc: kvp.Value)).ToList(); foreach (var (key, proc) in clients) { - Logger.LogDebug("Stopping client: {Processor}", key); + Logger.StoppingProcessor(processor: key); try { await proc.StopProcessingAsync(cancellationToken); processorsCache.Remove(key); - Logger.LogDebug("Stopped processor for {Processor}", key); + Logger.StoppedProcessor(processor: key); } catch (Exception exception) { - Logger.LogWarning(exception, "Stop processor faulted for {Processor}", key); + Logger.StopProcessorFaulted(processor: key, ex: exception); } } } @@ -111,13 +111,13 @@ public override async Task StopAsync(CancellationToken cancellationToken) // log warning when trying to publish scheduled event if (scheduled != null) { - Logger.LogWarning("Azure EventHubs does not support delay or scheduled publish"); + Logger.SchedulingNotSupported(); } // log warning when trying to publish expiring event if (@event.Expires != null) { - Logger.LogWarning("Azure EventHubs does not support expiring events"); + Logger.ExpiryNotSupported(); } using var scope = CreateScope(); @@ -138,10 +138,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) // get the producer and send the event accordingly var producer = await GetProducerAsync(reg: registration, deadletter: false, cancellationToken: cancellationToken); - Logger.LogInformation("Sending {Id} to '{EventHubName}'. Scheduled: {Scheduled}", - @event.Id, - producer.EventHubName, - scheduled); + Logger.SendingEvent(eventId: @event.Id, eventHubName: producer.EventHubName, scheduled: scheduled); await producer.SendAsync(new[] { data }, cancellationToken); // return the sequence number @@ -157,13 +154,13 @@ public override async Task StopAsync(CancellationToken cancellationToken) // log warning when trying to publish scheduled events if (scheduled != null) { - Logger.LogWarning("Azure EventHubs does not support delay or scheduled publish"); + Logger.SchedulingNotSupported(); } // log warning when trying to publish expiring events if (events.Any(e => e.Expires != null)) { - Logger.LogWarning("Azure EventHubs does not support expiring events"); + Logger.ExpiryNotSupported(); } using var scope = CreateScope(); @@ -189,11 +186,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) // get the producer and send the events accordingly var producer = await GetProducerAsync(reg: registration, deadletter: false, cancellationToken: cancellationToken); - Logger.LogInformation("Sending {EventsCount} events to '{EventHubName}'. Scheduled: {Scheduled}. Events:\r\n- {Ids}", - events.Count, - producer.EventHubName, - scheduled, - string.Join("\r\n- ", events.Select(e => e.Id))); + Logger.SendingEvents(events: events, eventHubName: producer.EventHubName, scheduled: scheduled); await producer.SendAsync(datas, cancellationToken); // return the sequence numbers @@ -350,14 +343,14 @@ private async Task OnEventReceivedAsync(EventRegistration reg { if (!args.HasEvent) { + // TODO: throw exception instead Logger.LogWarning($"'{nameof(OnEventReceivedAsync)}' was invoked but the arguments do not have an event."); return; } - Logger.LogDebug("Processor received event on EventHub:{EventHubName}, ConsumerGroup:{ConsumerGroup}, PartitionId:{PartitionId}", - processor.EventHubName, - processor.ConsumerGroup, - args.Partition.PartitionId); + Logger.ProcessorReceivedEvent(eventHubName: processor.EventHubName, + consumerGroup: processor.ConsumerGroup, + partitionId: args.Partition.PartitionId); var data = args.Data; var cancellationToken = args.CancellationToken; @@ -388,12 +381,11 @@ private async Task OnEventReceivedAsync(EventRegistration reg activity?.AddTag(ActivityTagNames.MessagingSystem, Name); activity?.AddTag(ActivityTagNames.MessagingDestination, processor.EventHubName); - Logger.LogDebug("Processing '{EventId}|{PartitionKey}|{SequenceNumber}' from '{EventHubName}/{ConsumerGroup}'", - eventId, - data.PartitionKey, - data.SequenceNumber, - processor.EventHubName, - processor.ConsumerGroup); + Logger.ProcessingEvent(eventId: eventId, + partitionKey: data.PartitionKey, + sequenceNumber: data.SequenceNumber, + eventHubName: processor.EventHubName, + consumerGroup: processor.ConsumerGroup); using var scope = CreateScope(); var contentType = contentType_str is string ctts ? new ContentType(ctts) : null; var context = await DeserializeAsync(scope: scope, @@ -402,13 +394,11 @@ private async Task OnEventReceivedAsync(EventRegistration reg registration: reg, identifier: data.SequenceNumber.ToString(), cancellationToken: cancellationToken); - Logger.LogInformation("Received event: '{EventId}|{PartitionKey}|{SequenceNumber}' containing Event '{Id}' from '{EventHubName}/{ConsumerGroup}'", - eventId, - data.PartitionKey, - data.SequenceNumber, - context.Id, - processor.EventHubName, - processor.ConsumerGroup); + Logger.ReceivedEvent(eventId: context.Id, + partitionKey: data.PartitionKey, + sequenceNumber: data.SequenceNumber, + eventHubName: processor.EventHubName, + consumerGroup: processor.ConsumerGroup); // set the extras context.SetConsumerGroup(processor.ConsumerGroup) @@ -433,44 +423,40 @@ private async Task OnEventReceivedAsync(EventRegistration reg */ if (ShouldCheckpoint(successful, ecr.UnhandledErrorBehaviour)) { - Logger.LogDebug("Checkpointing {Partition} of '{EventHubName}/{ConsumerGroup}', at {SequenceNumber}. Event: '{Id}'.", - args.Partition, - processor.EventHubName, - processor.ConsumerGroup, - data.SequenceNumber, - eventId); + Logger.Checkpointing(partition: args.Partition, + eventHubName: processor.EventHubName, + consumerGroup: processor.ConsumerGroup, + sequenceNumber: data.SequenceNumber, + eventId: eventId); await args.UpdateCheckpointAsync(args.CancellationToken); } } private Task OnPartitionClosingAsync(EventProcessorClient processor, PartitionClosingEventArgs args) { - Logger.LogInformation("Closing processor for EventHub:{EventHubName}, ConsumerGroup:{ConsumerGroup}, PartitionId:{PartitionId} (Reason:{Reason})", - processor.EventHubName, - processor.ConsumerGroup, - args.PartitionId, - args.Reason); + Logger.ClosingProcessor(eventHubName: processor.EventHubName, + consumerGroup: processor.ConsumerGroup, + partitionId: args.PartitionId, + reason: args.Reason); return Task.CompletedTask; } private Task OnPartitionInitializingAsync(EventProcessorClient processor, PartitionInitializingEventArgs args) { - Logger.LogInformation("Opening processor for PartitionId:{PartitionId}, EventHub:{EventHubName}, ConsumerGroup:{ConsumerGroup}, DefaultStartingPosition:{DefaultStartingPosition}", - args.PartitionId, - processor.EventHubName, - processor.ConsumerGroup, - args.DefaultStartingPosition); + Logger.OpeningProcessor(eventHubName: args.PartitionId, + consumerGroup: processor.EventHubName, + partitionId: processor.ConsumerGroup, + position: args.DefaultStartingPosition); return Task.CompletedTask; } private Task OnProcessErrorAsync(EventProcessorClient processor, ProcessErrorEventArgs args) { - Logger.LogError(args.Exception, - "Event processing faulted. Operation:{Operation}, EventHub:{EventHubName}, ConsumerGroup:{ConsumerGroup}, PartitionId: {PartitionId}", - args.Operation, - processor.EventHubName, - processor.ConsumerGroup, - args.PartitionId); + Logger.ProcessingError(operation: args.Operation, + eventHubName: processor.EventHubName, + consumerGroup: processor.ConsumerGroup, + partitionId: args.PartitionId, + ex: args.Exception); return Task.CompletedTask; } diff --git a/src/Tingle.EventBus.Transports.Azure.EventHubs/ILoggerExtensions.cs b/src/Tingle.EventBus.Transports.Azure.EventHubs/ILoggerExtensions.cs new file mode 100644 index 00000000..2baf5196 --- /dev/null +++ b/src/Tingle.EventBus.Transports.Azure.EventHubs/ILoggerExtensions.cs @@ -0,0 +1,71 @@ +using Azure.Messaging.EventHubs.Consumer; +using Azure.Messaging.EventHubs.Processor; +using Tingle.EventBus; + +namespace Microsoft.Extensions.Logging; + +/// +/// Extensions on for the EventBus +/// +internal static partial class ILoggerExtensions +{ + [LoggerMessage(100, LogLevel.Information, "Opening processor for EventHub:{EventHubName}, ConsumerGroup:{ConsumerGroup}, PartitionId:{PartitionId}, DefaultStartingPosition:{Position}")] + public static partial void OpeningProcessor(this ILogger logger, string eventHubName, string consumerGroup, string partitionId, EventPosition position); + + [LoggerMessage(101, LogLevel.Information, "Closing processor for EventHub:{EventHubName}, ConsumerGroup:{ConsumerGroup}, PartitionId:{PartitionId} (Reason:{Reason})")] + public static partial void ClosingProcessor(this ILogger logger, string eventHubName, string consumerGroup, string partitionId, ProcessingStoppedReason reason); + + [LoggerMessage(102, LogLevel.Error, "Event processing faulted. Operation:{Operation}, EventHub:{EventHubName}, ConsumerGroup:{ConsumerGroup}, PartitionId: {PartitionId}")] + public static partial void ProcessingError(this ILogger logger, string operation, string eventHubName, string consumerGroup, string partitionId, Exception ex); + + [LoggerMessage(103, LogLevel.Debug, "Stopping processor: {Processor}.")] + public static partial void StoppingProcessor(this ILogger logger, string processor); + + [LoggerMessage(104, LogLevel.Debug, "Stopped processor for {Processor}.")] + public static partial void StoppedProcessor(this ILogger logger, string processor); + + [LoggerMessage(105, LogLevel.Warning, "Stop processor faulted for {Processor}.")] + public static partial void StopProcessorFaulted(this ILogger logger, string processor, Exception ex); + + + [LoggerMessage(200, LogLevel.Warning, "Azure EventHubs does not support delay or scheduled publish.")] + public static partial void SchedulingNotSupported(this ILogger logger); + + [LoggerMessage(201, LogLevel.Warning, "Azure EventHubs does not support expiring events.")] + public static partial void ExpiryNotSupported(this ILogger logger); + + [LoggerMessage(202, LogLevel.Information, "Sending {EventId} to '{EventHubName}'. Scheduled: {Scheduled}")] + public static partial void SendingEvent(this ILogger logger, string? eventId, string eventHubName, DateTimeOffset? scheduled); + + [LoggerMessage(203, LogLevel.Information, "Sending {EventsCount} events to '{EventHubName}'. Scheduled: {Scheduled}. Events:\r\n- {EventIds}")] + private static partial void SendingEvents(this ILogger logger, int eventsCount, string eventHubName, DateTimeOffset? scheduled, string eventIds); + + public static void SendingEvents(this ILogger logger, IList eventIds, string eventHubName, DateTimeOffset? scheduled) + { + if (!logger.IsEnabled(LogLevel.Information)) return; + logger.SendingEvents(eventsCount: eventIds.Count, + eventHubName: eventHubName, + scheduled: scheduled, + eventIds: string.Join("\r\n- ", eventIds)); + } + + public static void SendingEvents(this ILogger logger, IList> events, string eventHubName, DateTimeOffset? scheduled = null) + where T : class + { + if (!logger.IsEnabled(LogLevel.Information)) return; + logger.SendingEvents(events.Select(e => e.Id).ToList(), eventHubName, scheduled); + } + + [LoggerMessage(204, LogLevel.Debug, "Checkpointing {Partition} of '{EventHubName}/{ConsumerGroup}', at {SequenceNumber}. Event: '{EventId}'.")] + public static partial void Checkpointing(this ILogger logger, PartitionContext partition, string eventHubName, string consumerGroup, long sequenceNumber, object? eventId); + + + [LoggerMessage(300, LogLevel.Debug, "Processor received event on EventHub:{EventHubName}, ConsumerGroup:{ConsumerGroup}, PartitionId:{PartitionId}")] + public static partial void ProcessorReceivedEvent(this ILogger logger, string eventHubName, string consumerGroup, string partitionId); + + [LoggerMessage(301, LogLevel.Debug, "Processing '{EventId} in {PartitionKey}|{SequenceNumber}' from '{EventHubName}/{ConsumerGroup}'")] + public static partial void ProcessingEvent(this ILogger logger, object? eventId, string partitionKey, long sequenceNumber, string eventHubName, string consumerGroup); + + [LoggerMessage(302, LogLevel.Information, "Received event: '{EventId} in {PartitionKey}|{SequenceNumber}' from '{EventHubName}/{ConsumerGroup}'")] + public static partial void ReceivedEvent(this ILogger logger, object? eventId, string partitionKey, long sequenceNumber, string eventHubName, string consumerGroup); +} diff --git a/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs b/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs index ffd4b463..253cdc40 100644 --- a/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs +++ b/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs @@ -99,7 +99,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) // get the queue client and send the message var queueClient = await GetQueueClientAsync(reg: registration, deadletter: false, cancellationToken: cancellationToken); - Logger.LogInformation("Sending {Id} to '{QueueName}'. Scheduled: {Scheduled}", @event.Id, queueClient.Name, scheduled); + Logger.SendingMessage(eventId: @event.Id, queueName: queueClient.Name, scheduled: scheduled); var response = await queueClient.SendMessageAsync(messageText: body.ToString(), visibilityTimeout: visibilityTimeout, timeToLive: ttl, @@ -118,7 +118,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) CancellationToken cancellationToken = default) { // log warning when doing batch - Logger.LogWarning("Azure Queue Storage does not support batching. The events will be looped through one by one"); + Logger.BatchingNotSupported(); using var scope = CreateScope(); @@ -138,7 +138,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) var ttl = @event.Expires - DateTimeOffset.UtcNow; // send the message - Logger.LogInformation("Sending {Id} to '{QueueName}'. Scheduled: {Scheduled}", @event.Id, queueClient.Name, scheduled); + Logger.SendingMessage(eventId: @event.Id, queueName: queueClient.Name, scheduled: scheduled); var response = await queueClient.SendMessageAsync(messageText: body.ToString(), visibilityTimeout: visibilityTimeout, timeToLive: ttl, @@ -166,13 +166,14 @@ public override async Task CancelAsync(string id, { var messageId = sid.MessageId; var popReceipt = sid.PopReceipt; - Logger.LogInformation("Cancelling '{MessageId}|{PopReceipt}' on '{QueueName}'", messageId, popReceipt, queueClient.Name); + Logger.CancelingMessage(messageId: messageId, popReceipt: popReceipt, queueName: queueClient.Name); await queueClient.DeleteMessageAsync(messageId: messageId, popReceipt: popReceipt, cancellationToken: cancellationToken); } else { + // TODO: throw exception instead Logger.LogWarning("The provided id '{Id}' does not match the expected format.", id); } } @@ -188,7 +189,7 @@ public override async Task CancelAsync(IList ids, } // log warning when doing batch - Logger.LogWarning("Azure Queue Storage does not support batching. The events will be canceled one by one"); + Logger.BatchingNotSupported(); var queueClient = await GetQueueClientAsync(reg: registration, deadletter: false, cancellationToken: cancellationToken); @@ -198,13 +199,14 @@ public override async Task CancelAsync(IList ids, { var messageId = sid.MessageId; var popReceipt = sid.PopReceipt; - Logger.LogInformation("Cancelling '{MessageId}|{PopReceipt}' on '{QueueName}'", messageId, popReceipt, queueClient.Name); + Logger.CancelingMessage(messageId: messageId, popReceipt: popReceipt, queueName: queueClient.Name); await queueClient.DeleteMessageAsync(messageId: messageId, popReceipt: popReceipt, cancellationToken: cancellationToken); } else { + // TODO: throw exception instead Logger.LogWarning("The provided id '{Id}' does not match the expected format.", id); } } @@ -218,7 +220,7 @@ private async Task GetQueueClientAsync(EventRegistration reg, bool { if (!queueClientsCache.TryGetValue((reg.EventType, deadletter), out var queueClient)) { - var name = reg.EventName; + var name = reg.EventName!; if (deadletter) name += TransportOptions.DeadLetterSuffix; // create the queue client options @@ -241,12 +243,12 @@ private async Task GetQueueClientAsync(EventRegistration reg, bool if (TransportOptions.EnableEntityCreation) { // ensure queue is created if it does not exist - Logger.LogInformation("Ensuring queue '{QueueName}' exists", name); + Logger.EnsuringQueue(queueName: name); await queueClient.CreateIfNotExistsAsync(cancellationToken: cancellationToken); } else { - Logger.LogTrace("Entity creation is diabled. Queue creation skipped"); + Logger.EntityCreationDisabled(); } queueClientsCache[(reg.EventType, deadletter)] = queueClient; @@ -279,12 +281,12 @@ private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration if (messages.Length == 0) { var delay = TransportOptions.EmptyResultsDelay; - Logger.LogTrace("No messages on '{QueueName}', delaying check for {Delay}", queueClient.Name, delay); + Logger.NoMessages(queueName: queueClient.Name, delay: delay); await Task.Delay(delay, cancellationToken); } else { - Logger.LogDebug("Received {MessageCount} messages on '{QueueName}'", messages.Length, queueClient.Name); + Logger.ReceivedMessages(messagesCount: messages.Length, queueName: queueClient.Name); using var scope = CreateScope(); // shared foreach (var message in messages) { @@ -330,7 +332,7 @@ private async Task OnMessageReceivedAsync(EventRegistration r activity?.AddTag(ActivityTagNames.MessagingDestination, queueClient.Name); activity?.AddTag(ActivityTagNames.MessagingDestinationKind, "queue"); - Logger.LogDebug("Processing '{MessageId}' from '{QueueName}'", messageId, queueClient.Name); + Logger.ProcessingMessage(messageId: messageId, queueName: queueClient.Name); var context = await DeserializeAsync(scope: scope, body: message.Body, contentType: null, @@ -338,10 +340,7 @@ private async Task OnMessageReceivedAsync(EventRegistration r identifier: (AzureQueueStorageSchedulingId)message, cancellationToken: cancellationToken); - Logger.LogInformation("Received message: '{MessageId}' containing Event '{Id}' from '{QueueName}'", - messageId, - context.Id, - queueClient.Name); + Logger.ReceivedMessage(messageId: messageId, eventId: context.Id, queueName: queueClient.Name); // if the event contains the parent activity id, set it if (context.Headers.TryGetValue(HeaderNames.ActivityId, out var parentActivityId)) @@ -362,7 +361,7 @@ private async Task OnMessageReceivedAsync(EventRegistration r } // whether or not successful, always delete the message from the current queue - Logger.LogTrace("Deleting '{MessageId}' on '{QueueName}'", messageId, queueClient.Name); + Logger.DeletingMessage(messageId: messageId, queueName: queueClient.Name); await queueClient.DeleteMessageAsync(messageId: messageId, popReceipt: message.PopReceipt, cancellationToken: cancellationToken); diff --git a/src/Tingle.EventBus.Transports.Azure.QueueStorage/ILoggerExtensions.cs b/src/Tingle.EventBus.Transports.Azure.QueueStorage/ILoggerExtensions.cs new file mode 100644 index 00000000..1ed13562 --- /dev/null +++ b/src/Tingle.EventBus.Transports.Azure.QueueStorage/ILoggerExtensions.cs @@ -0,0 +1,39 @@ +namespace Microsoft.Extensions.Logging; + +/// +/// Extensions on for the EventBus +/// +internal static partial class ILoggerExtensions +{ + [LoggerMessage(100, LogLevel.Information, "Ensuring queue '{QueueName}' exists.")] + public static partial void EnsuringQueue(this ILogger logger, string queueName); + + [LoggerMessage(101, LogLevel.Trace, "Entity creation is disabled. Queue creation skipped.")] + public static partial void EntityCreationDisabled(this ILogger logger); + + + [LoggerMessage(200, LogLevel.Warning, "Azure Queue Storage does not support batching. The events will be looped through one by one.")] + public static partial void BatchingNotSupported(this ILogger logger); + + [LoggerMessage(201, LogLevel.Information, "Sending {EventId} to '{QueueName}'. Scheduled: {Scheduled}.")] + public static partial void SendingMessage(this ILogger logger, string? eventId, string queueName, DateTimeOffset? scheduled); + + [LoggerMessage(202, LogLevel.Information, "Cancelling '{MessageId}|{PopReceipt}' on '{QueueName}'.")] + public static partial void CancelingMessage(this ILogger logger, string messageId, string popReceipt, string queueName); + + + [LoggerMessage(300, LogLevel.Trace, "No messages on '{QueueName}', delaying check for {Delay}.")] + public static partial void NoMessages(this ILogger logger, string queueName, TimeSpan delay); + + [LoggerMessage(301, LogLevel.Debug, "Received {MessagesCount} messages on '{QueueName}'")] + public static partial void ReceivedMessages(this ILogger logger, int messagesCount, string queueName); + + [LoggerMessage(302, LogLevel.Debug, "Processing '{MessageId}' from '{QueueName}'")] + public static partial void ProcessingMessage(this ILogger logger, string messageId, string queueName); + + [LoggerMessage(303, LogLevel.Information, "Received message: '{MessageId}' containing Event '{EventId}' from '{QueueName}'")] + public static partial void ReceivedMessage(this ILogger logger, string messageId, string? eventId, string queueName); + + [LoggerMessage(304, LogLevel.Trace, "Deleting '{MessageId}' on '{QueueName}'")] + public static partial void DeletingMessage(this ILogger logger, string messageId, string queueName); +} diff --git a/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs b/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs index 1ab6ff1c..f903bff8 100644 --- a/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs +++ b/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs @@ -81,7 +81,7 @@ public override async Task StartAsync(CancellationToken cancellationToken) }; // start processing - Logger.LogInformation("Starting processing on {EntityPath}", processor.EntityPath); + Logger.StartingProcessing(entityPath: processor.EntityPath); await processor.StartProcessingAsync(cancellationToken: cancellationToken); } } @@ -95,18 +95,18 @@ public override async Task StopAsync(CancellationToken cancellationToken) var clients = processorsCache.Select(kvp => (key: kvp.Key, proc: kvp.Value)).ToList(); foreach (var (key, proc) in clients) { - Logger.LogDebug("Stopping client: {Processor}", key); + Logger.StoppingProcessor(processor: key); try { await proc.StopProcessingAsync(cancellationToken); processorsCache.Remove(key); - Logger.LogDebug("Stopped processor for {Processor}", key); + Logger.StoppedProcessor(processor: key); } catch (Exception exception) { - Logger.LogWarning(exception, "Stop processor faulted for {Processor}", key); + Logger.StopProcessorFaulted(processor: key, ex: exception); } } } @@ -155,10 +155,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) // Get the sender and send the message accordingly var sender = await GetSenderAsync(registration, cancellationToken); - Logger.LogInformation("Sending {Id} to '{EntityPath}'. Scheduled: {Scheduled}", - @event.Id, - sender.EntityPath, - scheduled); + Logger.SendingMessage(eventId: @event.Id, entityPath: sender.EntityPath, scheduled: scheduled); if (scheduled != null) { var seqNum = await sender.ScheduleMessageAsync(message: message, @@ -223,11 +220,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) // Get the sender and send the messages accordingly var sender = await GetSenderAsync(registration, cancellationToken); - Logger.LogInformation("Sending {EventsCount} messages to '{EntityPath}'. Scheduled: {Scheduled}. Events:\r\n- {Ids}", - events.Count, - sender.EntityPath, - scheduled, - string.Join("\r\n- ", events.Select(e => e.Id))); + Logger.SendingMessages(events: events, entityPath: sender.EntityPath, scheduled: scheduled); if (scheduled != null) { var seqNums = await sender.ScheduleMessagesAsync(messages: messages, @@ -259,7 +252,7 @@ public override async Task CancelAsync(string id, // get the sender and cancel the message accordingly var sender = await GetSenderAsync(registration, cancellationToken); - Logger.LogInformation("Canceling scheduled message: {SequenceNumber} on {EntityPath}", seqNum, sender.EntityPath); + Logger.CancelingMessage(sequenceNumber: seqNum, entityPath: sender.EntityPath); await sender.CancelScheduledMessageAsync(sequenceNumber: seqNum, cancellationToken: cancellationToken); } @@ -284,10 +277,7 @@ public override async Task CancelAsync(IList ids, // get the sender and cancel the messages accordingly var sender = await GetSenderAsync(registration, cancellationToken); - Logger.LogInformation("Canceling {EventsCount} scheduled messages on {EntityPath}:\r\n- {SequenceNumbers}", - ids.Count, - sender.EntityPath, - string.Join("\r\n- ", seqNums)); + Logger.CancelingMessages(sequenceNumbers: seqNums, entityPath: sender.EntityPath); await sender.CancelScheduledMessagesAsync(sequenceNumbers: seqNums, cancellationToken: cancellationToken); } @@ -305,13 +295,13 @@ private async Task GetSenderAsync(EventRegistration reg, Cance if (await ShouldUseQueueAsync(reg, cancellationToken)) { // Ensure Queue is created - Logger.LogDebug("Creating sender for queue '{QueueName}'", name); + Logger.CreatingQueueSender(queueName: name); await CreateQueueIfNotExistsAsync(reg: reg, name: name, cancellationToken: cancellationToken); } else { // Ensure topic is created - Logger.LogDebug("Creating sender for topic '{TopicName}'", name); + Logger.CreatingTopicSender(topicName: name); await CreateTopicIfNotExistsAsync(reg: reg, name: name, cancellationToken: cancellationToken); } @@ -368,7 +358,7 @@ private async Task GetProcessorAsync(EventRegistration reg, await CreateQueueIfNotExistsAsync(reg: reg, name: topicName, cancellationToken: cancellationToken); // Create the processor for the Queue - Logger.LogDebug("Creating processor for queue '{QueueName}'", topicName); + Logger.CreatingQueueProcessor(queueName: topicName); processor = serviceBusClient.CreateProcessor(queueName: topicName, options: sbpo); } else @@ -383,9 +373,7 @@ await CreateSubscriptionIfNotExistsAsync(ecr: ecr, cancellationToken: cancellationToken); // Create the processor for the Subscription - Logger.LogDebug("Creating processor for topic '{TopicName}' and subscription '{Subscription}'", - topicName, - subscriptionName); + Logger.CreatingSubscriptionProcessor(topicName: topicName, subscriptionName: subscriptionName); processor = serviceBusClient.CreateProcessor(topicName: topicName, subscriptionName: subscriptionName, options: sbpo); @@ -407,15 +395,15 @@ private async Task CreateQueueIfNotExistsAsync(EventRegistration reg, string nam // if entity creation is not enabled, just return if (!TransportOptions.EnableEntityCreation) { - Logger.LogTrace("Entity creation is disabled. Queue creation skipped"); + Logger.QueueEntityCreationDisabled(); return; } // If the queue does not exist, create it - Logger.LogDebug("Checking if queue '{QueueName}' exists", name); + Logger.CheckingQueueExistence(queueName: name); if (!await managementClient.QueueExistsAsync(name: name, cancellationToken: cancellationToken)) { - Logger.LogTrace("Queue '{QueueName}' does not exist, preparing creation.", name); + Logger.CreatingQueuePreparation(queueName: name); var options = new CreateQueueOptions(name: name) { // set the defaults for a queue here @@ -438,7 +426,7 @@ private async Task CreateQueueIfNotExistsAsync(EventRegistration reg, string nam // Allow for the defaults to be overridden TransportOptions.SetupQueueOptions?.Invoke(reg, options); - Logger.LogInformation("Creating queue '{QueueName}'", name); + Logger.CreatingQueue(queueName: name); _ = await managementClient.CreateQueueAsync(options: options, cancellationToken: cancellationToken); } } @@ -448,15 +436,15 @@ private async Task CreateTopicIfNotExistsAsync(EventRegistration reg, string nam // if entity creation is not enabled, just return if (!TransportOptions.EnableEntityCreation) { - Logger.LogTrace("Entity creation is disabled. Topic creation skipped"); + Logger.TopicEntityCreationDisabled(); return; } // If the topic does not exist, create it - Logger.LogDebug("Checking if topic '{TopicName}' exists", name); + Logger.CheckingTopicExistence(topicName: name); if (!await managementClient.TopicExistsAsync(name: name, cancellationToken: cancellationToken)) { - Logger.LogTrace("Topic '{TopicName}' does not exist, preparing creation.", name); + Logger.CreatingTopicPreparation(topicName: name); var options = new CreateTopicOptions(name: name) { Status = EntityStatus.Active, @@ -470,7 +458,7 @@ private async Task CreateTopicIfNotExistsAsync(EventRegistration reg, string nam // Allow for the defaults to be overridden TransportOptions.SetupTopicOptions?.Invoke(reg, options); - Logger.LogInformation("Creating topic '{TopicName}'", name); + Logger.CreatingTopic(topicName: name); _ = await managementClient.CreateTopicAsync(options: options, cancellationToken: cancellationToken); } } @@ -480,19 +468,15 @@ private async Task CreateSubscriptionIfNotExistsAsync(EventConsumerRegistration // if entity creation is not enabled, just return if (!TransportOptions.EnableEntityCreation) { - Logger.LogTrace("Entity creation is disabled. Subscription creation skipped"); + Logger.SubscriptionEntityCreationDisabled(); return; } // If the subscription does not exist, create it - Logger.LogDebug("Checking if subscription '{SubscriptionName}' under topic '{TopicName}' exists", - subscriptionName, - topicName); + Logger.CheckingSubscriptionExistence(subscriptionName: subscriptionName, topicName: topicName); if (!await managementClient.SubscriptionExistsAsync(topicName, subscriptionName, cancellationToken)) { - Logger.LogTrace("Subscription '{SubscriptionName}' under topic '{TopicName}' does not exist, preparing creation.", - subscriptionName, - topicName); + Logger.CreatingSubscriptionPreparation(subscriptionName: subscriptionName, topicName: topicName); var options = new CreateSubscriptionOptions(topicName: topicName, subscriptionName: subscriptionName) { Status = EntityStatus.Active, @@ -506,9 +490,7 @@ private async Task CreateSubscriptionIfNotExistsAsync(EventConsumerRegistration // Allow for the defaults to be overridden TransportOptions.SetupSubscriptionOptions?.Invoke(ecr, options); - Logger.LogInformation("Creating subscription '{SubscriptionName}' under topic '{TopicName}'", - subscriptionName, - topicName); + Logger.CreatingSubscription(subscriptionName: subscriptionName, topicName: topicName); await managementClient.CreateSubscriptionAsync(options: options, cancellationToken: cancellationToken); } } @@ -545,7 +527,7 @@ private async Task OnMessageReceivedAsync(EventRegistration r activity?.AddTag(ActivityTagNames.MessagingDestination, destination); // name of the queue/subscription activity?.AddTag(ActivityTagNames.MessagingDestinationKind, "queue"); // the spec does not know subscription so we can only use queue for both - Logger.LogDebug("Processing '{MessageId}' from '{EntityPath}'", messageId, entityPath); + Logger.ProcessingMessage(messageId: messageId, entityPath: entityPath); using var scope = CreateScope(); var contentType = new ContentType(message.ContentType); var context = await DeserializeAsync(scope: scope, @@ -555,10 +537,7 @@ private async Task OnMessageReceivedAsync(EventRegistration r identifier: message.SequenceNumber.ToString(), cancellationToken: cancellationToken); - Logger.LogInformation("Received message: '{SequenceNumber}' containing Event '{Id}' from '{EntityPath}'", - message.SequenceNumber, - context.Id, - entityPath); + Logger.ReceivedMessage(sequenceNumber: message.SequenceNumber, eventId: context.Id, entityPath: entityPath); // set the extras context.SetServiceBusReceivedMessage(message); @@ -570,11 +549,7 @@ private async Task OnMessageReceivedAsync(EventRegistration r // Decide the action to execute then execute var action = DecideAction(successful, ecr.UnhandledErrorBehaviour, processor.AutoCompleteMessages); - Logger.LogDebug("Post Consume action: {Action} for message: {MessageId} from '{EntityPath}' containing Event: {EventId}.", - action, - messageId, - entityPath, - context.Id); + Logger.PostConsumeAction(action: action, messageId: messageId, entityPath: entityPath, eventId: context.Id); if (action == PostConsumeAction.Complete) { @@ -599,11 +574,10 @@ private async Task OnMessageReceivedAsync(EventRegistration r private Task OnMessageFaultedAsync(ProcessErrorEventArgs args) { - Logger.LogError(args.Exception, - "Message receiving faulted. Namespace:{FullyQualifiedNamespace}, Entity Path: {EntityPath}, Source: {ErrorSource}", - args.FullyQualifiedNamespace, - args.EntityPath, - args.ErrorSource); + Logger.MessageReceivingFaulted(fullyQualifiedNamespace: args.FullyQualifiedNamespace, + entityPath: args.EntityPath, + errorSource: args.ErrorSource, + ex: args.Exception); return Task.CompletedTask; } diff --git a/src/Tingle.EventBus.Transports.Azure.ServiceBus/ILoggerExtensions.cs b/src/Tingle.EventBus.Transports.Azure.ServiceBus/ILoggerExtensions.cs new file mode 100644 index 00000000..a7cf58a1 --- /dev/null +++ b/src/Tingle.EventBus.Transports.Azure.ServiceBus/ILoggerExtensions.cs @@ -0,0 +1,122 @@ +using Azure.Messaging.ServiceBus; +using Tingle.EventBus; +using Tingle.EventBus.Transports.Azure.ServiceBus; + +namespace Microsoft.Extensions.Logging; + +/// +/// Extensions on for the EventBus +/// +internal static partial class ILoggerExtensions +{ + [LoggerMessage(100, LogLevel.Information, "Starting processing on {EntityPath}.")] + public static partial void StartingProcessing(this ILogger logger, string entityPath); + + [LoggerMessage(101, LogLevel.Debug, "Stopping client: {Processor}.")] + public static partial void StoppingProcessor(this ILogger logger, string processor); + + [LoggerMessage(102, LogLevel.Debug, "Stopped processor for {Processor}.")] + public static partial void StoppedProcessor(this ILogger logger, string processor); + + [LoggerMessage(103, LogLevel.Warning, "Stop processor faulted for {Processor}.")] + public static partial void StopProcessorFaulted(this ILogger logger, string processor, Exception ex); + + [LoggerMessage(104, LogLevel.Debug, "Creating processor for queue '{QueueName}'.")] + public static partial void CreatingQueueProcessor(this ILogger logger, string queueName); + + [LoggerMessage(105, LogLevel.Debug, "Creating processor for topic '{TopicName}' and subscription '{SubscriptionName}'.")] + public static partial void CreatingSubscriptionProcessor(this ILogger logger, string topicName, string subscriptionName); + + [LoggerMessage(106, LogLevel.Debug, "Creating sender for queue '{QueueName}'.")] + public static partial void CreatingQueueSender(this ILogger logger, string queueName); + + [LoggerMessage(107, LogLevel.Debug, "Creating sender for topic '{TopicName}'.")] + public static partial void CreatingTopicSender(this ILogger logger, string topicName); + + [LoggerMessage(108, LogLevel.Trace, "Entity creation is disabled. Queue creation skipped.")] + public static partial void QueueEntityCreationDisabled(this ILogger logger); + + [LoggerMessage(109, LogLevel.Debug, "Checking if queue '{QueueName}' exists.")] + public static partial void CheckingQueueExistence(this ILogger logger, string queueName); + + [LoggerMessage(110, LogLevel.Trace, "Queue '{QueueName}' does not exist, preparing creation.")] + public static partial void CreatingQueuePreparation(this ILogger logger, string queueName); + + [LoggerMessage(111, LogLevel.Information, "Creating queue '{QueueName}'")] + public static partial void CreatingQueue(this ILogger logger, string queueName); + + [LoggerMessage(112, LogLevel.Debug, "Checking if topic '{TopicName}' exists.")] + public static partial void CheckingTopicExistence(this ILogger logger, string topicName); + + [LoggerMessage(113, LogLevel.Trace, "Entity creation is disabled. Topic creation skipped.")] + public static partial void TopicEntityCreationDisabled(this ILogger logger); + + [LoggerMessage(114, LogLevel.Trace, "Topic '{TopicName}' does not exist, preparing creation.")] + public static partial void CreatingTopicPreparation(this ILogger logger, string topicName); + + [LoggerMessage(115, LogLevel.Information, "Creating topic '{TopicName}'")] + public static partial void CreatingTopic(this ILogger logger, string topicName); + + [LoggerMessage(116, LogLevel.Trace, "Entity creation is disabled. Subscription creation skipped.")] + public static partial void SubscriptionEntityCreationDisabled(this ILogger logger); + + [LoggerMessage(117, LogLevel.Debug, "Checking if subscription '{SubscriptionName}' under topic '{TopicName}' exists")] + public static partial void CheckingSubscriptionExistence(this ILogger logger, string subscriptionName, string topicName); + + [LoggerMessage(118, LogLevel.Trace, "Subscription '{SubscriptionName}' under topic '{TopicName}' does not exist, preparing creation.")] + public static partial void CreatingSubscriptionPreparation(this ILogger logger, string subscriptionName, string topicName); + + [LoggerMessage(119, LogLevel.Information, "Creating subscription '{SubscriptionName}' under topic '{TopicName}'")] + public static partial void CreatingSubscription(this ILogger logger, string subscriptionName, string topicName); + + + + [LoggerMessage(201, LogLevel.Information, "Sending {EventId} to '{EntityPath}'. Scheduled: {Scheduled}")] + public static partial void SendingMessage(this ILogger logger, string? eventId, string entityPath, DateTimeOffset? scheduled); + + [LoggerMessage(202, LogLevel.Information, "Sending {EventsCount} messages to '{EntityPath}'. Scheduled: {Scheduled}. Events:\r\n- {EventIds}")] + private static partial void SendingMessages(this ILogger logger, int eventsCount, string entityPath, DateTimeOffset? scheduled, string eventIds); + + public static void SendingMessages(this ILogger logger, IList eventIds, string entityPath, DateTimeOffset? scheduled) + { + if (!logger.IsEnabled(LogLevel.Information)) return; + logger.SendingMessages(eventsCount: eventIds.Count, + entityPath: entityPath, + scheduled: scheduled, + eventIds: string.Join("\r\n- ", eventIds)); + } + + public static void SendingMessages(this ILogger logger, IList> events, string entityPath, DateTimeOffset? scheduled = null) + where T : class + { + if (!logger.IsEnabled(LogLevel.Information)) return; + logger.SendingMessages(events.Select(e => e.Id).ToList(), entityPath, scheduled); + } + + [LoggerMessage(203, LogLevel.Information, "Canceling scheduled message: {SequenceNumber} on {EntityPath}")] + public static partial void CancelingMessage(this ILogger logger, long sequenceNumber, string entityPath); + + [LoggerMessage(204, LogLevel.Information, "Canceling {messagesCount} scheduled messages on {EntityPath}:\r\n- {SequenceNumbers}")] + private static partial void CancelingMessages(this ILogger logger, int messagesCount, string entityPath, string sequenceNumbers); + + public static void CancelingMessages(this ILogger logger, IList sequenceNumbers, string entityPath) + { + if (!logger.IsEnabled(LogLevel.Information)) return; + logger.CancelingMessages(messagesCount: sequenceNumbers.Count, + entityPath: entityPath, + sequenceNumbers: string.Join("\r\n- ", sequenceNumbers)); + } + + + [LoggerMessage(300, LogLevel.Information, "Received message: '{SequenceNumber}' containing Event '{EventId}' from '{EntityPath}'")] + public static partial void ReceivedMessage(this ILogger logger, long sequenceNumber, string? eventId, string entityPath); + + [LoggerMessage(301, LogLevel.Debug, "Processing '{MessageId}' from '{EntityPath}'")] + public static partial void ProcessingMessage(this ILogger logger, string? messageId, string entityPath); + + [LoggerMessage(302, LogLevel.Debug, "Post Consume action: {Action} for message: {MessageId} from '{EntityPath}' containing Event: {EventId}.")] + public static partial void PostConsumeAction(this ILogger logger, PostConsumeAction? action, string? messageId, string entityPath, string? eventId); + + [LoggerMessage(303, LogLevel.Debug, "Message receiving faulted. Namespace: {FullyQualifiedNamespace}, Entity Path: {EntityPath}, Source: {ErrorSource}")] + public static partial void MessageReceivingFaulted(this ILogger logger, string fullyQualifiedNamespace, string entityPath, ServiceBusErrorSource errorSource, Exception ex); +} diff --git a/src/Tingle.EventBus.Transports.InMemory/ILoggerExtensions.cs b/src/Tingle.EventBus.Transports.InMemory/ILoggerExtensions.cs new file mode 100644 index 00000000..c96bdb5a --- /dev/null +++ b/src/Tingle.EventBus.Transports.InMemory/ILoggerExtensions.cs @@ -0,0 +1,73 @@ +using Tingle.EventBus; + +namespace Microsoft.Extensions.Logging; + +/// +/// Extensions on for the EventBus +/// +internal static partial class ILoggerExtensions +{ + [LoggerMessage(100, LogLevel.Information, "Starting processing on {EntityPath}")] + public static partial void StartingProcessing(this ILogger logger, string entityPath); + + [LoggerMessage(101, LogLevel.Debug, "Stopping client: {Processor}")] + public static partial void StoppingProcessor(this ILogger logger, string processor); + + [LoggerMessage(102, LogLevel.Debug, "Stopped processor for {Processor}")] + public static partial void StoppedProcessor(this ILogger logger, string processor); + + [LoggerMessage(103, LogLevel.Warning, "Stop processor faulted for {Processor}")] + public static partial void StopProcessorFaulted(this ILogger logger, string processor, Exception ex); + + [LoggerMessage(104, LogLevel.Debug, "Creating processor for queue '{QueueName}'")] + public static partial void CreatingQueueProcessor(this ILogger logger, string queueName); + + [LoggerMessage(105, LogLevel.Debug, "Creating processor for topic '{TopicName}' and subscription '{SubscriptionName}'")] + public static partial void CreatingSubscriptionProcessor(this ILogger logger, string topicName, string subscriptionName); + + [LoggerMessage(200, LogLevel.Warning, "InMemory EventBus uses a short-lived timer that is not persisted for scheduled publish.")] + public static partial void SchedulingShortLived(this ILogger logger); + + [LoggerMessage(201, LogLevel.Information, "Sending {EventId} to '{EntityPath}'. Scheduled: {Scheduled}")] + public static partial void SendingMessage(this ILogger logger, string? eventId, string entityPath, DateTimeOffset? scheduled); + + [LoggerMessage(202, LogLevel.Information, "Sending {EventsCount} messages to '{EntityPath}'. Scheduled: {Scheduled}. Events:\r\n- {EventIds}")] + private static partial void SendingMessages(this ILogger logger, int eventsCount, string entityPath, DateTimeOffset? scheduled, string eventIds); + + public static void SendingMessages(this ILogger logger, IList eventIds, string entityPath, DateTimeOffset? scheduled) + { + if (!logger.IsEnabled(LogLevel.Information)) return; + logger.SendingMessages(eventsCount: eventIds.Count, + entityPath: entityPath, + scheduled: scheduled, + eventIds: string.Join("\r\n- ", eventIds)); + } + + public static void SendingMessages(this ILogger logger, IList> events, string entityPath, DateTimeOffset? scheduled = null) + where T : class + { + if (!logger.IsEnabled(LogLevel.Information)) return; + logger.SendingMessages(events.Select(e => e.Id).ToList(), entityPath, scheduled); + } + + [LoggerMessage(203, LogLevel.Information, "Canceling scheduled message: {SequenceNumber} on {EntityPath}")] + public static partial void CancelingMessage(this ILogger logger, long sequenceNumber, string entityPath); + + [LoggerMessage(204, LogLevel.Information, "Canceling {messagesCount} scheduled messages on {EntityPath}:\r\n- {SequenceNumbers}")] + private static partial void CancelingMessages(this ILogger logger, int messagesCount, string entityPath, string sequenceNumbers); + + public static void CancelingMessages(this ILogger logger, IList sequenceNumbers, string entityPath) + { + if (!logger.IsEnabled(LogLevel.Information)) return; + logger.CancelingMessages(messagesCount: sequenceNumbers.Count, + entityPath: entityPath, + sequenceNumbers: string.Join("\r\n- ", sequenceNumbers)); + } + + + [LoggerMessage(300, LogLevel.Information, "Received message: '{SequenceNumber}' containing Event '{EventId}' from '{EntityPath}'")] + public static partial void ReceivedMessage(this ILogger logger, long sequenceNumber, string? eventId, string entityPath); + + [LoggerMessage(301, LogLevel.Debug, "Processing '{MessageId}' from '{EntityPath}'")] + public static partial void ProcessingMessage(this ILogger logger, string? messageId, string entityPath); +} diff --git a/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs b/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs index e18a3a5f..01487d69 100644 --- a/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs +++ b/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs @@ -82,7 +82,7 @@ public override async Task StartAsync(CancellationToken cancellationToken) }; // start processing - Logger.LogInformation("Starting processing on {EntityPath}", processor.EntityPath); + Logger.StartingProcessing(entityPath: processor.EntityPath); await processor.StartProcessingAsync(cancellationToken: cancellationToken); } } @@ -96,18 +96,18 @@ public override async Task StopAsync(CancellationToken cancellationToken) var clients = processorsCache.Select(kvp => (key: kvp.Key, proc: kvp.Value)).ToList(); foreach (var (key, proc) in clients) { - Logger.LogDebug("Stopping client: {Processor}", key); + Logger.StoppingProcessor(processor: key); try { await proc.StopProcessingAsync(cancellationToken); processorsCache.Remove(key); - Logger.LogDebug("Stopped processor for {Processor}", key); + Logger.StoppedProcessor(processor: key); } catch (Exception exception) { - Logger.LogWarning(exception, "Stop processor faulted for {Processor}", key); + Logger.StopProcessorFaulted(processor: key, ex: exception); } } } @@ -121,7 +121,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) // log warning when trying to publish scheduled message if (scheduled != null) { - Logger.LogWarning("InMemory EventBus uses a short-lived timer that is not persisted for scheduled publish"); + Logger.SchedulingShortLived(); } using var scope = CreateScope(); @@ -153,10 +153,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) // Get the queue and send the message accordingly var sender = await GetSenderAsync(registration, cancellationToken); - Logger.LogInformation("Sending {Id} to '{EntityPath}'. Scheduled: {Scheduled}", - @event.Id, - sender.EntityPath, - scheduled); + Logger.SendingMessage(eventId: @event.Id, entityPath: sender.EntityPath, scheduled: scheduled); if (scheduled != null) { var seqNum = await sender.ScheduleMessageAsync(message: message, cancellationToken: cancellationToken); @@ -178,7 +175,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) // log warning when trying to publish scheduled message if (scheduled != null) { - Logger.LogWarning("InMemory EventBus uses a short-lived timer that is not persisted for scheduled publish"); + Logger.SchedulingShortLived(); } using var scope = CreateScope(); @@ -217,11 +214,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) // Get the queue and send the message accordingly var sender = await GetSenderAsync(registration, cancellationToken); - Logger.LogInformation("Sending {EventsCount} messages to '{EntityPath}'. Scheduled: {Scheduled}. Events:\r\n- {Ids}", - events.Count, - sender.EntityPath, - scheduled, - string.Join("\r\n- ", events.Select(e => e.Id))); + Logger.SendingMessages(events: events, entityPath: sender.EntityPath, scheduled: scheduled); if (scheduled != null) { var seqNums = await sender.ScheduleMessagesAsync(messages: messages, cancellationToken: cancellationToken); @@ -251,7 +244,7 @@ public override async Task CancelAsync(string id, // get the entity and cancel the message accordingly var sender = await GetSenderAsync(registration, cancellationToken); - Logger.LogInformation("Canceling scheduled message: {SequenceNumber} on {EntityPath}", seqNum, sender.EntityPath); + Logger.CancelingMessage(sequenceNumber: seqNum, entityPath: sender.EntityPath); await sender.CancelScheduledMessageAsync(sequenceNumber: seqNum, cancellationToken: cancellationToken); } @@ -273,10 +266,7 @@ public override async Task CancelAsync(IList ids, // get the entity and cancel the message accordingly var sender = await GetSenderAsync(registration, cancellationToken); - Logger.LogInformation("Canceling {EventsCount} scheduled messages on {EntityPath}:\r\n- {SequenceNumbers}", - ids.Count, - sender.EntityPath, - string.Join("\r\n- ", seqNums)); + Logger.CancelingMessages(sequenceNumbers: seqNums, entityPath: sender.EntityPath); await sender.CancelScheduledMessagesAsync(sequenceNumbers: seqNums, cancellationToken: cancellationToken); } @@ -321,15 +311,13 @@ private async Task GetProcessorAsync(EventRegistration reg, E if (reg.EntityKind == EntityKind.Queue) { // Create the processor for the Queue - Logger.LogDebug("Creating processor for queue '{QueueName}'", topicName); + Logger.CreatingQueueProcessor(queueName: topicName); processor = inMemoryClient.CreateProcessor(queueName: topicName, options: inpo); } else { // Create the processor for the Subscription - Logger.LogDebug("Creating processor for topic '{TopicName}' and subscription '{Subscription}'", - topicName, - subscriptionName); + Logger.CreatingSubscriptionProcessor(topicName: topicName, subscriptionName: subscriptionName); processor = inMemoryClient.CreateProcessor(topicName: topicName, subscriptionName: subscriptionName, options: inpo); @@ -377,7 +365,7 @@ private async Task OnMessageReceivedAsync(EventRegistration r activity?.AddTag(ActivityTagNames.MessagingDestination, destination); // name of the queue/subscription activity?.AddTag(ActivityTagNames.MessagingDestinationKind, "queue"); // the spec does not know subscription so we can only use queue for both - Logger.LogDebug("Processing '{MessageId}' from '{EntityPath}'", messageId, entityPath); + Logger.ProcessingMessage(messageId: messageId, entityPath: entityPath); using var scope = CreateScope(); var contentType = message.ContentType is not null ? new ContentType(message.ContentType) : null; var context = await DeserializeAsync(scope: scope, @@ -387,10 +375,7 @@ private async Task OnMessageReceivedAsync(EventRegistration r identifier: message.SequenceNumber.ToString(), cancellationToken: cancellationToken); - Logger.LogInformation("Received message: '{SequenceNumber}' containing Event '{Id}' from '{EntityPath}'", - message.SequenceNumber, - context.Id, - entityPath); + Logger.ReceivedMessage(sequenceNumber: message.SequenceNumber, eventId: context.Id, entityPath: entityPath); // set the extras context.SetInMemoryReceivedMessage(message); diff --git a/src/Tingle.EventBus.Transports.Kafka/ILoggerExtensions.cs b/src/Tingle.EventBus.Transports.Kafka/ILoggerExtensions.cs new file mode 100644 index 00000000..f59710e3 --- /dev/null +++ b/src/Tingle.EventBus.Transports.Kafka/ILoggerExtensions.cs @@ -0,0 +1,27 @@ +using Confluent.Kafka; + +namespace Microsoft.Extensions.Logging; + +/// +/// Extensions on for the EventBus +/// +internal static partial class ILoggerExtensions +{ + [LoggerMessage(100, LogLevel.Warning, "Kafka does not support delay or scheduled publish.")] + public static partial void SchedulingNotSupported(this ILogger logger); + + [LoggerMessage(101, LogLevel.Warning, "Kafka does not support batching. The events will be looped through one by one.")] + public static partial void BatchingNotSupported(this ILogger logger); + + [LoggerMessage(102, LogLevel.Information, "Consumer recevied data at {Offset}")] + public static partial void ConsumerReceivedData(this ILogger logger, TopicPartitionOffset offset); + + [LoggerMessage(103, LogLevel.Trace, "Reached end of topic {Topic}, Partition:{Partition}, Offset:{Offset}.")] + public static partial void EndOfTopic(this ILogger logger, string topic, Partition partition, Offset offset); + + [LoggerMessage(104, LogLevel.Debug, "Processing '{MessageKey}")] + public static partial void ProcessingMessage(this ILogger logger, string messageKey); + + [LoggerMessage(105, LogLevel.Information, "Received event: '{MessageKey}' containing Event '{EventId}'")] + public static partial void ReceivedEvent(this ILogger logger, string messageKey, string? eventId); +} diff --git a/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs b/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs index 188d30fb..f929fc8f 100644 --- a/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs +++ b/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs @@ -115,7 +115,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) // log warning when trying to publish scheduled message if (scheduled != null) { - Logger.LogWarning("Kafka does not support delay or scheduled publish"); + Logger.SchedulingNotSupported(); } using var scope = CreateScope(); @@ -150,12 +150,12 @@ public override async Task StopAsync(CancellationToken cancellationToken) CancellationToken cancellationToken = default) { // log warning when doing batch - Logger.LogWarning("Kafka does not support batching. The events will be looped through one by one"); + Logger.BatchingNotSupported(); // log warning when trying to publish scheduled message if (scheduled != null) { - Logger.LogWarning("Kafka does not support delay or scheduled publish"); + Logger.SchedulingNotSupported(); } using var scope = CreateScope(); @@ -220,14 +220,11 @@ private async Task ProcessAsync(CancellationToken cancellationToken) var result = consumer.Consume(cancellationToken); if (result.IsPartitionEOF) { - Logger.LogTrace("Reached end of topic {Topic}, Partition:{Partition}, Offset:{Offset}.", - result.Topic, - result.Partition, - result.Offset); + Logger.EndOfTopic(result.Topic, result.Partition, result.Offset); continue; } - Logger.LogDebug("Received message at {TopicPartitionOffset}", result.TopicPartitionOffset); + Logger.ConsumerReceivedData(result.TopicPartitionOffset); // get the registration for topic var topic = result.Topic; @@ -285,7 +282,7 @@ private async Task OnEventReceivedAsync(EventRegistration reg activity?.AddTag(ActivityTagNames.EventBusConsumerType, typeof(TConsumer).FullName); activity?.AddTag(ActivityTagNames.MessagingSystem, Name); - Logger.LogDebug("Processing '{MessageKey}", messageKey); + Logger.ProcessingMessage(messageKey); using var scope = CreateScope(); var contentType = contentType_str == null ? null : new ContentType(contentType_str); var context = await DeserializeAsync(scope: scope, @@ -294,9 +291,7 @@ private async Task OnEventReceivedAsync(EventRegistration reg registration: reg, identifier: result.Offset.ToString(), cancellationToken: cancellationToken); - Logger.LogInformation("Received event: '{MessageKey}' containing Event '{Id}'", - messageKey, - context.Id); + Logger.ReceivedEvent(messageKey, context.Id); var (successful, _) = await ConsumeAsync(ecr: ecr, @event: context, scope: scope, diff --git a/src/Tingle.EventBus/EventBus.cs b/src/Tingle.EventBus/EventBus.cs index eb87594d..62a90506 100644 --- a/src/Tingle.EventBus/EventBus.cs +++ b/src/Tingle.EventBus/EventBus.cs @@ -90,7 +90,7 @@ public EventBus(IReadinessProvider readinessProvider, activity?.AddTag(ActivityTagNames.MessagingConversationId, @event.CorrelationId); // Publish on the transport - logger.SendingEvent(@event, transport.Name, scheduled); + logger.SendingEvent(eventId: @event.Id, transportName: transport.Name, scheduled: scheduled); return await transport.PublishAsync(@event: @event, registration: reg, scheduled: scheduled, diff --git a/src/Tingle.EventBus/Extensions/ILoggerExtensions.cs b/src/Tingle.EventBus/Extensions/ILoggerExtensions.cs index f234bc73..bf62877d 100644 --- a/src/Tingle.EventBus/Extensions/ILoggerExtensions.cs +++ b/src/Tingle.EventBus/Extensions/ILoggerExtensions.cs @@ -1,200 +1,114 @@ using Tingle.EventBus; using Tingle.EventBus.Configuration; -using Tingle.EventBus.Serialization; namespace Microsoft.Extensions.Logging; /// /// Extensions on for the EventBus /// -internal static class ILoggerExtensions +internal static partial class ILoggerExtensions { #region Bus (100 series) - private static readonly Action _delayedBusStartup - = LoggerMessage.Define( - eventId: new EventId(101, nameof(DelayedBusStartup)), - logLevel: LogLevel.Information, - formatString: "Delaying bus startup for '{StartupDelay}'."); - - private static readonly Action _delayedBusStartupError - = LoggerMessage.Define( - eventId: new EventId(102, nameof(DelayedBusStartupError)), - logLevel: LogLevel.Error, - formatString: "Starting bus delayed error."); - - private static readonly Action _startupReadinessCheck - = LoggerMessage.Define( - eventId: new EventId(103, nameof(StartupReadinessCheck)), - logLevel: LogLevel.Information, - formatString: "Performing readiness check before starting bus."); - - private static readonly Action _startupReadinessCheckFailed - = LoggerMessage.Define( - eventId: new EventId(104, nameof(StartupReadinessCheckFailed)), - logLevel: LogLevel.Error, - formatString: "Startup readiness check failed or timedout."); - - private static readonly Action _startingBus - = LoggerMessage.Define( - eventId: new EventId(105, nameof(StartingBus)), - logLevel: LogLevel.Debug, - formatString: "Starting bus with {TransportsCount} transports."); - - private static readonly Action _stoppingBus - = LoggerMessage.Define( - eventId: new EventId(106, nameof(StoppingBus)), - logLevel: LogLevel.Debug, - formatString: "Stopping bus."); - - public static void DelayedBusStartup(this ILogger logger, TimeSpan delay) => _delayedBusStartup(logger, delay, null); - public static void DelayedBusStartupError(this ILogger logger, Exception ex) => _delayedBusStartupError(logger, ex); - public static void StartupReadinessCheck(this ILogger logger) => _startupReadinessCheck(logger, null); - public static void StartupReadinessCheckFailed(this ILogger logger, Exception ex) => _startupReadinessCheckFailed(logger, ex); - public static void StartingBus(this ILogger logger, int count) => _startingBus(logger, count, null); - public static void StoppingBus(this ILogger logger) => _stoppingBus(logger, null); + [LoggerMessage(101, LogLevel.Information, "Delaying bus startup for '{StartupDelay}'.")] + public static partial void DelayedBusStartup(this ILogger logger, TimeSpan startupDelay); - #endregion + [LoggerMessage(102, LogLevel.Error, "Starting bus delayed error.")] + public static partial void DelayedBusStartupError(this ILogger logger, Exception ex); - #region Transports (200 series) + [LoggerMessage(103, LogLevel.Information, "Performing readiness check before starting bus.")] + public static partial void StartupReadinessCheck(this ILogger logger); - private static readonly Action _startingTransport - = LoggerMessage.Define( - eventId: new EventId(201, nameof(StartingTransport)), - logLevel: LogLevel.Debug, - formatString: "Starting transport. Consumers: {ConsumersCount}, EmptyResultsDelay: '{EmptyResultsDelay}'"); + [LoggerMessage(104, LogLevel.Error, "Startup readiness check failed or timedout.")] + public static partial void StartupReadinessCheckFailed(this ILogger logger, Exception ex); - private static readonly Action _stoppingTransport - = LoggerMessage.Define( - eventId: new EventId(202, nameof(StoppingTransport)), - logLevel: LogLevel.Debug, - formatString: "Stopping transport."); + [LoggerMessage(105, LogLevel.Debug, "Starting bus with {TransportsCount} transports.")] + public static partial void StartingBus(this ILogger logger, int transportsCount); - public static void StartingTransport(this ILogger logger, int count, TimeSpan emptyResultsDelay) - => _startingTransport(logger, count, emptyResultsDelay, null); + [LoggerMessage(106, LogLevel.Debug, "Stopping bus.")] + public static partial void StoppingBus(this ILogger logger); - public static void StoppingTransport(this ILogger logger) => _stoppingTransport(logger, null); + #endregion + + #region Transports (200 series) + + [LoggerMessage(201, LogLevel.Debug, "Starting transport. Consumers: {ConsumersCount}, EmptyResultsDelay: '{EmptyResultsDelay}'")] + public static partial void StartingTransport(this ILogger logger, int consumersCount, TimeSpan emptyResultsDelay); + + [LoggerMessage(202, LogLevel.Debug, "Stopping transport.")] + public static partial void StoppingTransport(this ILogger logger); #endregion #region Events (300 series) - private static readonly Action _sendingEvent - = LoggerMessage.Define( - eventId: new EventId(301, nameof(SendingEvent)), - logLevel: LogLevel.Information, - formatString: "Sending event '{EventId}' using '{TransportName}' transport"); - - private static readonly Action _sendingEventWithScheduled - = LoggerMessage.Define( - eventId: new EventId(301, nameof(SendingEvent)), - logLevel: LogLevel.Information, - formatString: "Sending event '{EventId}' using '{TransportName}' transport. Scheduled: {Scheduled:o}, Delay: {ReadableDelay} ({RetryDelay})"); - - private static readonly Action, Exception?> _sendingEvents - = LoggerMessage.Define>( - eventId: new EventId(302, nameof(SendingEvents)), - logLevel: LogLevel.Information, - formatString: "Sending {EventsCount} events using '{TransportName}' transport.\r\nEvents: {EventIds}"); - - private static readonly Action, Exception?> _sendingEventsWithScheduled - = LoggerMessage.Define>( - eventId: new EventId(302, nameof(SendingEvents)), - logLevel: LogLevel.Information, - formatString: "Sending {EventsCount} events using '{TransportName}' transport. Scheduled: {Scheduled:o}, Delay: {ReadableDelay} ({RetryDelay}).\r\nEvents: {EventIds}"); - - private static readonly Action _cancelingEvent - = LoggerMessage.Define( - eventId: new EventId(303, nameof(CancelingEvent)), - logLevel: LogLevel.Information, - formatString: "Canceling event '{EventId}' on '{TransportName}' transport"); - - private static readonly Action, Exception?> _cancelingEvents - = LoggerMessage.Define>( - eventId: new EventId(303, nameof(CancelingEvents)), - logLevel: LogLevel.Information, - formatString: "Canceling {EventsCount} events on '{TransportName}' transport.\r\nEvents: {EventIds}"); - - private static readonly Action _consumeFailedTransportHandling - = LoggerMessage.Define( - eventId: new EventId(304, nameof(ConsumeFailed)), - logLevel: LogLevel.Error, - formatString: "Event processing failed. Transport specific handling in play. (EventId:{EventId})"); - - private static readonly Action _consumeFailedDeadletter - = LoggerMessage.Define( - eventId: new EventId(304, nameof(ConsumeFailed)), - logLevel: LogLevel.Error, - formatString: "Event processing failed. Moving to deadletter. (EventId:{EventId})"); - - private static readonly Action _consumeFailedDiscard - = LoggerMessage.Define( - eventId: new EventId(304, nameof(ConsumeFailed)), - logLevel: LogLevel.Error, - formatString: "Event processing failed. Discarding event. (EventId:{EventId})"); - - public static void SendingEvent(this ILogger logger, string? eventId, string transportName, DateTimeOffset? scheduled = null) + [LoggerMessage(301, LogLevel.Information, "Sending event '{EventId}' using '{TransportName}' transport.")] + private static partial void SendingEvent(this ILogger logger, string? eventId, string transportName); + + [LoggerMessage(302, LogLevel.Information, "Sending event '{EventId}' using '{TransportName}' transport. Scheduled: {Scheduled:o}, Delay: {ReadableDelay} ({RetryDelay})")] + private static partial void SendingScheduledEvent(this ILogger logger, string? eventId, string transportName, DateTimeOffset scheduled, string readableDelay, TimeSpan retryDelay); + + public static void SendingEvent(this ILogger logger, string? eventId, string transportName, DateTimeOffset? scheduled) { + if (!logger.IsEnabled(LogLevel.Information)) return; if (scheduled == null) { - _sendingEvent(logger, eventId, transportName, null); + logger.SendingEvent(eventId, transportName); } else { var (readableDelay, delay) = GetDelay(scheduled.Value); - _sendingEventWithScheduled(logger, eventId, transportName, scheduled.Value, readableDelay, delay, null); + logger.SendingScheduledEvent(eventId, transportName, scheduled.Value, readableDelay, delay); } } - public static void SendingEvent(this ILogger logger, EventContext @event, string transportName, DateTimeOffset? scheduled = null) - { - SendingEvent(logger, @event.Id, transportName, scheduled); - } + [LoggerMessage(303, LogLevel.Information, "Sending {EventsCount} events using '{TransportName}' transport.\r\nEvents: {EventIds}")] + private static partial void SendingEvents(this ILogger logger, int eventsCount, string transportName, IList eventIds); + + [LoggerMessage(304, LogLevel.Information, "Sending {EventsCount} events using '{TransportName}' transport. Scheduled: {Scheduled:o}, Delay: {ReadableDelay} ({RetryDelay}).\r\nEvents: {EventIds}")] + private static partial void SendingScheduledEvents(this ILogger logger, int eventsCount, string transportName, DateTimeOffset scheduled, string readableDelay, TimeSpan retryDelay, IList eventIds); public static void SendingEvents(this ILogger logger, IList eventIds, string transportName, DateTimeOffset? scheduled = null) { + if (!logger.IsEnabled(LogLevel.Information)) return; if (scheduled == null) { - _sendingEvents(logger, eventIds.Count, transportName, eventIds, null); + logger.SendingEvents(eventsCount: eventIds.Count, transportName: transportName, eventIds: eventIds); } else { var (readableDelay, delay) = GetDelay(scheduled.Value); - _sendingEventsWithScheduled(logger, eventIds.Count, transportName, scheduled.Value, readableDelay, delay, eventIds, null); + logger.SendingScheduledEvents(eventIds.Count, transportName, scheduled.Value, readableDelay, delay, eventIds); } } - public static void SendingEvents(this ILogger logger, IList events, string transportName, DateTimeOffset? scheduled = null) - { - SendingEvents(logger, events.Select(e => e.Id).ToList(), transportName, scheduled); - } - public static void SendingEvents(this ILogger logger, IList> events, string transportName, DateTimeOffset? scheduled = null) where T : class { - SendingEvents(logger, events.Select(e => e.Id).ToList(), transportName, scheduled); + if (!logger.IsEnabled(LogLevel.Information)) return; + logger.SendingEvents(events.Select(e => e.Id).ToList(), transportName, scheduled); } - public static void CancelingEvent(this ILogger logger, string eventId, string transportName) - { - _cancelingEvent(logger, eventId, transportName, null); - } + [LoggerMessage(305, LogLevel.Information, "Canceling event '{EventId}' on '{TransportName}' transport")] + public static partial void CancelingEvent(this ILogger logger, string eventId, string transportName); - public static void CancelingEvents(this ILogger logger, IList eventIds, string transportName) - { - _cancelingEvents(logger, eventIds.Count, transportName, eventIds, null); - } + [LoggerMessage(306, LogLevel.Information, "Canceling {EventsCount} events on '{TransportName}' transport.\r\nEvents: {EventIds}")] + public static partial void CancelingEvents(this ILogger logger, int eventsCount, IList eventIds, string transportName); + public static void CancelingEvents(this ILogger logger, IList eventIds, string transportName) => logger.CancelingEvents(eventIds.Count, eventIds, transportName); + + [LoggerMessage(307, LogLevel.Error, "Event processing failed. {Action} (EventId: {EventId})")] + public static partial void ConsumeFailed(this ILogger logger, string action, string? eventId, Exception ex); - public static void ConsumeFailed(this ILogger logger, string? eventId, UnhandledConsumerErrorBehaviour? behaviour, Exception ex) + public static void ConsumeFailed(this ILogger logger, UnhandledConsumerErrorBehaviour? behaviour, string? eventId, Exception ex) { - Action action = behaviour switch + var action = behaviour switch { - UnhandledConsumerErrorBehaviour.Deadletter => _consumeFailedDeadletter, - UnhandledConsumerErrorBehaviour.Discard => _consumeFailedDiscard, - _ => _consumeFailedTransportHandling, + UnhandledConsumerErrorBehaviour.Deadletter => "Moving to deadletter.", + UnhandledConsumerErrorBehaviour.Discard => "Discarding event.", + _ => "Transport specific handling in play.", }; - action?.Invoke(logger, eventId, ex); + logger.ConsumeFailed(action, eventId, ex); } private static (string readableDelay, TimeSpan delay) GetDelay(DateTimeOffset scheduled) @@ -207,67 +121,24 @@ private static (string readableDelay, TimeSpan delay) GetDelay(DateTimeOffset sc #region Readiness (400 series) - private static readonly Action _readinessCheck - = LoggerMessage.Define( - eventId: new EventId(401, nameof(ReadinessCheck)), - logLevel: LogLevel.Information, - formatString: "Performing readiness check. Timeout: '{ReadinessTimeout}'."); - - private static readonly Action _readinessCheckTimedout - = LoggerMessage.Define( - eventId: new EventId(402, nameof(StartupReadinessCheckFailed)), - logLevel: LogLevel.Error, - formatString: "Startup readiness check failed or timedout after '{ReadinessTimeout}'."); - - private static readonly Action _readinessCheckDisabled - = LoggerMessage.Define( - eventId: new EventId(401, nameof(ReadinessCheckDisabled)), - logLevel: LogLevel.Debug, - formatString: "Readiness check is disabled. Assumes ready by default."); - - public static void ReadinessCheck(this ILogger logger, TimeSpan timeout) => _readinessCheck(logger, timeout, null); + [LoggerMessage(401, LogLevel.Information, "Performing readiness check. Timeout: '{ReadinessTimeout}'.")] + public static partial void ReadinessCheck(this ILogger logger, TimeSpan readinessTimeout); - public static void ReadinessCheckTimedout(this ILogger logger, TimeSpan timeout) => _readinessCheckTimedout(logger, timeout, null); + [LoggerMessage(402, LogLevel.Error, "Startup readiness check failed or timedout after '{ReadinessTimeout}'.")] + public static partial void ReadinessCheckTimedout(this ILogger logger, TimeSpan readinessTimeout); - public static void ReadinessCheckDisabled(this ILogger logger) => _readinessCheckDisabled(logger, null); + [LoggerMessage(403, LogLevel.Debug, "Readiness check is disabled. Assumes ready by default.")] + public static partial void ReadinessCheckDisabled(this ILogger logger); #endregion #region Serialization (500 series) - private static readonly Action _deserializationResultedInNull - = LoggerMessage.Define( - eventId: new EventId(501, nameof(DeserializationResultedInNull)), - logLevel: LogLevel.Warning, - formatString: "Deserialization resulted in a null which should not happen. Identifier: '{Identifier}', Type: '{EventType}'."); - - private static readonly Action _deserializedEventShouldNotBeNull - = LoggerMessage.Define( - eventId: new EventId(502, nameof(DeserializedEventShouldNotBeNull)), - logLevel: LogLevel.Warning, - formatString: "Deserialized event should not have a null event. Identifier: '{Identifier}', EventId: '{EventId}', Type: '{EventType}'."); - - public static void DeserializationResultedInNull(this ILogger logger, string? id, string eventType) - { - _deserializationResultedInNull(logger, id, eventType, null); - } + [LoggerMessage(501, LogLevel.Warning, "Deserialization resulted in a null which should not happen. Identifier: '{Identifier}', Type: '{EventType}'.")] + public static partial void DeserializationResultedInNull(this ILogger logger, string? identifier, string? eventType); - public static void DeserializationResultedInNull(this ILogger logger, DeserializationContext context) - { - logger.DeserializationResultedInNull(id: context.Identifier, eventType: context.Registration.EventType.FullName!); - } - - public static void DeserializedEventShouldNotBeNull(this ILogger logger, string? id, string? eventId, string eventType) - { - _deserializedEventShouldNotBeNull(logger, id, eventId, eventType, null); - } - - public static void DeserializedEventShouldNotBeNull(this ILogger logger, DeserializationContext context, string? eventId) - { - logger.DeserializedEventShouldNotBeNull(id: context.Identifier, - eventId: eventId, - eventType: context.Registration.EventType.FullName!); - } + [LoggerMessage(502, LogLevel.Warning, "Deserialized event should not have a null event. Identifier: '{Identifier}', EventId: '{EventId}', Type: '{EventType}'.")] + public static partial void DeserializedEventShouldNotBeNull(this ILogger logger, string? identifier, string? eventId, string? eventType); #endregion } diff --git a/src/Tingle.EventBus/Serialization/AbstractEventSerializer.cs b/src/Tingle.EventBus/Serialization/AbstractEventSerializer.cs index de377c3f..b3171f55 100644 --- a/src/Tingle.EventBus/Serialization/AbstractEventSerializer.cs +++ b/src/Tingle.EventBus/Serialization/AbstractEventSerializer.cs @@ -62,13 +62,15 @@ protected AbstractEventSerializer(IOptionsMonitor optionsAccess var envelope = await DeserializeToEnvelopeAsync(stream: stream, contentType: contentType, cancellationToken: cancellationToken); if (envelope is null) { - Logger.DeserializationResultedInNull(context); + Logger.DeserializationResultedInNull(identifier: context.Identifier, eventType: context.Registration.EventType.FullName); return null; } if (envelope.Event is null) { - Logger.DeserializedEventShouldNotBeNull(context, eventId: envelope.Id); + Logger.DeserializedEventShouldNotBeNull(identifier: context.Identifier, + eventId: envelope.Id, + eventType: context.Registration.EventType.FullName); return null; } diff --git a/src/Tingle.EventBus/Transports/EventBusTransportBase.cs b/src/Tingle.EventBus/Transports/EventBusTransportBase.cs index 034bb45f..ce4b7bcd 100644 --- a/src/Tingle.EventBus/Transports/EventBusTransportBase.cs +++ b/src/Tingle.EventBus/Transports/EventBusTransportBase.cs @@ -256,7 +256,7 @@ protected async Task ConsumeAsync(EventCo } catch (Exception ex) { - Logger.ConsumeFailed(@event.Id, ecr.UnhandledErrorBehaviour, ex); + Logger.ConsumeFailed(ecr.UnhandledErrorBehaviour, @event.Id, ex); return new EventConsumeResult(successful: false, exception: ex); } }