Skip to content

Commit

Permalink
Add multiple work item consumers (#4049)
Browse files Browse the repository at this point in the history
Co-authored-by: dkurepa <[email protected]>
  • Loading branch information
oleksandr-didyk and dkurepa authored Oct 15, 2024
1 parent c36f058 commit df35cbe
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"WorkItemQueueName": "pcs-workitems",
"QueuePollTimeout": "00:00:05",
"MaxWorkItemRetries": 3,
"QueueMessageInvisibilityTime": "00:05:00"
"QueueMessageInvisibilityTime": "00:15:00"
},
"Maestro": {
"Uri": "http://localhost:8088/",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
},
"AllowedHosts": "*",
"WorkItemQueueName": "pcs-workitems",
"WorkItemConsumerCount": 5,
"WorkItemConsumerOptions": {
"QueuePollTimeout": "00:01:00",
"MaxWorkItemRetries": 3,
"QueueMessageInvisibilityTime": "00:01:00"
"QueueMessageInvisibilityTime": "00:15:00"
},
"EntraAuthentication": {
"Instance": "https://login.microsoftonline.com/",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace ProductConstructionService.WorkItems;
public static class WorkItemConfiguration
{
public const string WorkItemQueueNameConfigurationKey = "WorkItemQueueName";
public const string WorkItemConsumerCountConfigurationKey = "WorkItemConsumerCount";
public const string ReplicaNameKey = "CONTAINER_APP_REPLICA_NAME";
public const int PollingRateSeconds = 10;

Expand All @@ -40,7 +41,19 @@ public static void AddWorkItemQueues(this IHostApplicationBuilder builder, Defau
builder.Configuration.GetRequiredValue(WorkItemQueueNameConfigurationKey);
builder.Services.Configure<WorkItemConsumerOptions>(
builder.Configuration.GetSection(WorkItemConsumerOptions.ConfigurationKey));
builder.Services.AddHostedService<WorkItemConsumer>();

var consumerCount = int.Parse(
builder.Configuration.GetRequiredValue(WorkItemConsumerCountConfigurationKey));

for (int i = 0; i < consumerCount; i++)
{
var consumerId = $"WorkItemConsumer_{i}";

// https://github.com/dotnet/runtime/issues/38751
builder.Services.AddSingleton<IHostedService, WorkItemConsumer>(
p => ActivatorUtilities.CreateInstance<WorkItemConsumer>(p, consumerId));
}

builder.Services.AddTransient<IReminderManagerFactory, ReminderManagerFactory>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
namespace ProductConstructionService.WorkItems;

internal class WorkItemConsumer(
string consumerId,
ILogger<WorkItemConsumer> logger,
IOptions<WorkItemConsumerOptions> options,
WorkItemScopeManager scopeManager,
QueueServiceClient queueServiceClient,
IMetricRecorder metricRecorder)
: BackgroundService
{
private readonly string _consumerId = consumerId;
private readonly ILogger<WorkItemConsumer> _logger = logger;
private readonly IOptions<WorkItemConsumerOptions> _options = options;
private readonly WorkItemScopeManager _scopeManager = scopeManager;
Expand All @@ -31,7 +33,8 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken)
await Task.Yield();

QueueClient queueClient = queueServiceClient.GetQueueClient(_options.Value.WorkItemQueueName);
_logger.LogInformation("Starting to process PCS queue {queueName}", _options.Value.WorkItemQueueName);
_logger.LogInformation("Consumer {consumerId} starting to process PCS queue {queueName}", _consumerId, _options.Value.WorkItemQueueName);

while (!cancellationToken.IsCancellationRequested)
{
await using (WorkItemScope workItemScope = await _scopeManager.BeginWorkItemScopeWhenReadyAsync())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public async Task ReturnWhenWorkingAsync(int pollingRateSeconds)
do
{
status = await _cache.GetAsync();
} while (_autoResetEvent.WaitIfTrue(() => status == Stopped, pollingRateSeconds));
} while (_autoResetEvent.WaitIfTrue(() => status != Working, pollingRateSeconds));
}

public async Task SetStoppedIfStoppingAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public class WorkItemScopeManager
private readonly WorkItemProcessorState _state;
private readonly int _pollingRateSeconds;

private int _activeWorkItems = 0;

public WorkItemScopeManager(
IServiceProvider serviceProvider,
WorkItemProcessorState state,
Expand All @@ -32,6 +34,8 @@ public async Task<WorkItemScope> BeginWorkItemScopeWhenReadyAsync()
await _state.ReturnWhenWorkingAsync(_pollingRateSeconds);

var scope = _serviceProvider.CreateScope();
Interlocked.Increment(ref _activeWorkItems);

return new WorkItemScope(
scope.ServiceProvider.GetRequiredService<IOptions<WorkItemProcessorRegistrations>>(),
WorkItemFinishedAsync,
Expand All @@ -41,7 +45,10 @@ public async Task<WorkItemScope> BeginWorkItemScopeWhenReadyAsync()

private async Task WorkItemFinishedAsync()
{
await _state.SetStoppedIfStoppingAsync();
if (Interlocked.Decrement(ref _activeWorkItems) == 0)
{
await _state.SetStoppedIfStoppingAsync();
}
}

public async Task InitializationFinished()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ public async Task WorkItemsProcessorStatusNormalFlow()

TaskCompletionSource workItemCompletion1 = new();
TaskCompletionSource workItemCompletion2 = new();
Thread t = new(() =>
Thread t = new(async () =>
{
using (_scopeManager.BeginWorkItemScopeWhenReadyAsync()) { }
await using (await _scopeManager.BeginWorkItemScopeWhenReadyAsync()) { }
workItemCompletion1.SetResult();
});
t.Start();
Expand Down

0 comments on commit df35cbe

Please sign in to comment.