Skip to content

Commit

Permalink
Merge pull request #40 from PDmatrix/feature/39
Browse files Browse the repository at this point in the history
Add job throttle
  • Loading branch information
ZOXEXIVO authored Nov 29, 2021
2 parents e46daa4 + 1e596a0 commit d58a7cc
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 3 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,20 @@ Horarium guarantees that a job would run **exactly once**

Every Horarium instance consults MongoDB about new jobs to run every 100ms (default), thus creating some load on the DB server. This interval can be changed in ```HorariumSettings```

If you want to decrease load, you can use job throttling that will automatically increase interval if there are no jobs available after certain attempts. To enable this feature, pass `JobThrottleSettings` to `HorariumSettings` with property `UseJobThrottle` set to `true`.

```csharp
var settings = new HorariumSettings
{
JobThrottleSettings = new JobThrottleSettings
{
UseJobThrottle = true
}
};
```

For more information about configuration, see `JobThrottleSettings`

## Using Horarium with SimpleInjector

To use Horarium with SimpleInjector one should implement its own `IJobFactory`, using `Container` from `SimpleInjector`. For example:
Expand Down
117 changes: 117 additions & 0 deletions src/Horarium.Test/RunnerJobTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,123 @@ public async Task Start_WontRecoverBeforeIntervalTimeout_AfterFailedDB()
jobRepositoryMock.Verify(r => r.GetReadyJob(It.IsAny<string>(), It.IsAny<TimeSpan>()), Times.Once);
}

[Fact]
public async Task Start_ExecutionWithDelay_WithThrottle()
{
// Arrange
var jobRepositoryMock = new Mock<IJobRepository>();

var settings = new HorariumSettings
{
IntervalStartJob = TimeSpan.FromSeconds(1),
JobThrottleSettings = new JobThrottleSettings
{
UseJobThrottle = true,
IntervalMultiplier = 1,
JobRetrievalAttempts = 1
}
};

var runnerJobs = new RunnerJobs(jobRepositoryMock.Object,
settings,
new JsonSerializerSettings(),
Mock.Of<IHorariumLogger>(),
Mock.Of<IExecutorJob>(),
Mock.Of<IUncompletedTaskList>());

// Act
runnerJobs.Start();
await Task.Delay(settings.IntervalStartJob - TimeSpan.FromMilliseconds(500));
jobRepositoryMock.Invocations.Clear();

await Task.Delay(settings.IntervalStartJob + settings.IntervalStartJob.Multiply(settings.JobThrottleSettings.IntervalMultiplier));

// Assert
jobRepositoryMock.Verify(r => r.GetReadyJob(It.IsAny<string>(), It.IsAny<TimeSpan>()), Times.Once);
}

[Fact]
public async Task Start_ExecutionWithDelay_IncreaseInterval()
{
// Arrange
var jobRepositoryMock = new Mock<IJobRepository>();

jobRepositoryMock.Setup(x => x.GetReadyJob(It.IsAny<string>(), It.IsAny<TimeSpan>()))
.ReturnsAsync(() => null);

var settings = new HorariumSettings
{
IntervalStartJob = TimeSpan.FromSeconds(1),
JobThrottleSettings = new JobThrottleSettings
{
UseJobThrottle = true,
IntervalMultiplier = 1,
JobRetrievalAttempts = 1,
}
};

var runnerJobs = new RunnerJobs(jobRepositoryMock.Object,
settings,
new JsonSerializerSettings(),
Mock.Of<IHorariumLogger>(),
Mock.Of<IExecutorJob>(),
Mock.Of<IUncompletedTaskList>());

// Act
runnerJobs.Start();
await Task.Delay(settings.IntervalStartJob - TimeSpan.FromMilliseconds(500));
jobRepositoryMock.Invocations.Clear();

var interval = settings.IntervalStartJob +
settings.IntervalStartJob.Multiply(settings.JobThrottleSettings.IntervalMultiplier);
await Task.Delay(interval);
interval += settings.IntervalStartJob.Multiply(settings.JobThrottleSettings.IntervalMultiplier);
await Task.Delay(interval);
interval += settings.IntervalStartJob.Multiply(settings.JobThrottleSettings.IntervalMultiplier);
await Task.Delay(interval);

// Assert
jobRepositoryMock.Verify(r => r.GetReadyJob(It.IsAny<string>(), It.IsAny<TimeSpan>()), Times.Exactly(3));
}

[Fact]
public async Task Start_ExecutionWithDelay_MaxInterval()
{
// Arrange
var jobRepositoryMock = new Mock<IJobRepository>();

jobRepositoryMock.Setup(x => x.GetReadyJob(It.IsAny<string>(), It.IsAny<TimeSpan>()))
.ReturnsAsync(() => null);

var settings = new HorariumSettings
{
IntervalStartJob = TimeSpan.FromSeconds(1),
JobThrottleSettings = new JobThrottleSettings
{
UseJobThrottle = true,
IntervalMultiplier = 1,
JobRetrievalAttempts = 1,
MaxJobThrottleInterval = TimeSpan.FromSeconds(1)
}
};

var runnerJobs = new RunnerJobs(jobRepositoryMock.Object,
settings,
new JsonSerializerSettings(),
Mock.Of<IHorariumLogger>(),
Mock.Of<IExecutorJob>(),
Mock.Of<IUncompletedTaskList>());

// Act
runnerJobs.Start();
await Task.Delay(settings.IntervalStartJob - TimeSpan.FromMilliseconds(500));
jobRepositoryMock.Invocations.Clear();

await Task.Delay(TimeSpan.FromSeconds(5));
// Assert
jobRepositoryMock.Verify(r => r.GetReadyJob(It.IsAny<string>(), It.IsAny<TimeSpan>()), Times.Exactly(5));
}

[Fact]
public async Task Start_NextJobStarted_AddsJobTaskToUncompletedTasks()
{
Expand Down
49 changes: 46 additions & 3 deletions src/Horarium/Handlers/RunnerJobs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public class RunnerJobs : IRunnerJobs
private readonly IExecutorJob _executorJob;
private Task _runnerTask;
private readonly IUncompletedTaskList _uncompletedTaskList;

private readonly TimeSpan _defaultJobThrottleInterval = TimeSpan.FromMilliseconds(100);

private CancellationToken _cancellationToken;
private readonly CancellationTokenSource _cancelTokenSource = new CancellationTokenSource();
Expand Down Expand Up @@ -100,11 +102,29 @@ private async Task<JobMetadata> GetReadyJob()

private async Task StartRunnerInternal(CancellationToken cancellationToken)
{
var jobWaitTime = _settings.IntervalStartJob;

while (true)
{
var isJobRan = await TryRunJob(cancellationToken, jobWaitTime);
if (!_settings.JobThrottleSettings.UseJobThrottle)
{
jobWaitTime = _settings.IntervalStartJob;
continue;
}

jobWaitTime = !isJobRan ? GetNextIntervalStartJob(jobWaitTime) : _settings.IntervalStartJob;
}
}

private async Task<bool> TryRunJob(CancellationToken cancellationToken, TimeSpan waitTime)
{
for (var i = 0; i < _settings.JobThrottleSettings.JobRetrievalAttempts; i++)
{
var job = await GetReadyJob();
var isJobReady = job != null;

if (job != null)
if (isJobReady)
{
_horariumLogger.Debug("Try to Run jobMetadata...");

Expand All @@ -117,11 +137,34 @@ private async Task StartRunnerInternal(CancellationToken cancellationToken)
throw new TaskCanceledException();
}

if (!_settings.IntervalStartJob.Equals(TimeSpan.Zero))
if (isJobReady)
{
await Task.Delay(_settings.IntervalStartJob, cancellationToken);
return true;
}

if (!waitTime.Equals(TimeSpan.Zero))
{
await Task.Delay(waitTime, cancellationToken);
}
}

return false;
}

private TimeSpan GetNextIntervalStartJob(TimeSpan currentInterval)
{
if (currentInterval.Equals(TimeSpan.Zero))
{
return _defaultJobThrottleInterval;
}

var nextInterval =
currentInterval +
TimeSpan.FromTicks((long) (currentInterval.Ticks * _settings.JobThrottleSettings.IntervalMultiplier));

var maxInterval = _settings.JobThrottleSettings.MaxJobThrottleInterval;

return nextInterval > maxInterval ? maxInterval : nextInterval;
}
}
}
2 changes: 2 additions & 0 deletions src/Horarium/HorariumSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public class HorariumSettings

public TimeSpan ObsoleteExecutingJob { get; set; } = TimeSpan.FromMinutes(5);

public JobThrottleSettings JobThrottleSettings { get; set; } = new JobThrottleSettings();

public IJobScopeFactory JobScopeFactory { get; set; } = new DefaultJobScopeFactory();

public IHorariumLogger Logger { get; set; } = new EmptyLogger();
Expand Down
28 changes: 28 additions & 0 deletions src/Horarium/JobThrottleSettings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;

namespace Horarium
{
public class JobThrottleSettings
{
/// <summary>
/// When `true`, IntervalStartJob will automatically increase if there is no jobs available
/// </summary>
public bool UseJobThrottle { get; set; }

/// <summary>
/// After all attempts are exhausted, waiting interval is increased by formula:
/// <c>currentInterval + (currentInterval * intervalMultiplier)</c>
/// </summary>
public int JobRetrievalAttempts { get; set; } = 10;

/// <summary>
/// Multiplier to get the next waiting interval
/// </summary>
public double IntervalMultiplier { get; set; } = 0.25;

/// <summary>
/// Maximum waiting interval
/// </summary>
public TimeSpan MaxJobThrottleInterval { get; set; } = TimeSpan.FromSeconds(30);
}
}

0 comments on commit d58a7cc

Please sign in to comment.