Skip to content

Commit

Permalink
Merge pull request #74 from kirides/issues/73
Browse files Browse the repository at this point in the history
- #73 - implement a few of the missing features
  • Loading branch information
felixclase authored Apr 29, 2024
2 parents 1850239 + 2057af6 commit a51f280
Show file tree
Hide file tree
Showing 19 changed files with 618 additions and 240 deletions.
3 changes: 3 additions & 0 deletions src/main/Hangfire.Storage.SQLite/Assembly.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("Hangfire.Storage.SQLite.Test")]
62 changes: 33 additions & 29 deletions src/main/Hangfire.Storage.SQLite/HangfireDbContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,41 +50,45 @@ public void Init(SQLiteStorageOptions storageOptions)
{
StorageOptions = storageOptions;

TryFewTimesDueToConcurrency(() => InitializePragmas(storageOptions));
TryFewTimesDueToConcurrency(() => Database.CreateTable<AggregatedCounter>());
TryFewTimesDueToConcurrency(() => Database.CreateTable<Counter>());
TryFewTimesDueToConcurrency(() => Database.CreateTable<HangfireJob>());
TryFewTimesDueToConcurrency(() => Database.CreateTable<HangfireList>());
TryFewTimesDueToConcurrency(() => Database.CreateTable<Hash>());
TryFewTimesDueToConcurrency(() => Database.CreateTable<JobParameter>());
TryFewTimesDueToConcurrency(() => Database.CreateTable<JobQueue>());
TryFewTimesDueToConcurrency(() => Database.CreateTable<HangfireServer>());
TryFewTimesDueToConcurrency(() => Database.CreateTable<Set>());
TryFewTimesDueToConcurrency(() => Database.CreateTable<State>());
TryFewTimesDueToConcurrency(() => Database.CreateTable<DistributedLock>());

void TryFewTimesDueToConcurrency(Action action, int times = 10)
TryFewTimesDueToConcurrency(state => state.Item1.InitializePragmas(state.storageOptions),
(this, storageOptions));
}

public void Migrate()
{
TryFewTimesDueToConcurrency(db => db.CreateTable<AggregatedCounter>(), Database);
TryFewTimesDueToConcurrency(db => db.CreateTable<Counter>(), Database);
TryFewTimesDueToConcurrency(db => db.CreateTable<HangfireJob>(), Database);
TryFewTimesDueToConcurrency(db => db.CreateTable<HangfireList>(), Database);
TryFewTimesDueToConcurrency(db => db.CreateTable<Hash>(), Database);
TryFewTimesDueToConcurrency(db => db.CreateTable<JobParameter>(), Database);
TryFewTimesDueToConcurrency(db => db.CreateTable<JobQueue>(), Database);
TryFewTimesDueToConcurrency(db => db.CreateTable<HangfireServer>(), Database);
TryFewTimesDueToConcurrency(db => db.CreateTable<Set>(), Database);
TryFewTimesDueToConcurrency(db => db.CreateTable<State>(), Database);
TryFewTimesDueToConcurrency(db => db.CreateTable<DistributedLock>(), Database);
}

static void TryFewTimesDueToConcurrency<TState>(Action<TState> action, TState state, int times = 10)
{
var current = 0;
while (current < times)
{
var current = 0;
while (current < times)
try
{
action(state);
return;
}
catch (SQLiteException e) when (e.Result == SQLite3.Result.Locked)
{
try
{
action();
return;
}
catch (SQLiteException e) when (e.Result == SQLite3.Result.Locked)
{
// This can happen if too many connections are opened
// at the same time, trying to create tables
Thread.Sleep(10);
}
current++;
// This can happen if too many connections are opened
// at the same time, trying to create tables
Thread.Sleep(10);
}
current++;
}
}


private void InitializePragmas(SQLiteStorageOptions storageOptions)
{
try
Expand Down
55 changes: 54 additions & 1 deletion src/main/Hangfire.Storage.SQLite/HangfireSQLiteConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,28 @@ public override long GetSetCount(string key)
.SetRepository
.Count(_ => _.Key == key);
}

public override long GetSetCount(IEnumerable<string> keys, int limit)
{
if (keys == null)
{
throw new ArgumentNullException(nameof(keys));
}

var count = DbContext
.SetRepository
.Where(_ => keys.Contains(_.Key))
.Take(limit)
.Count();
return Math.Min(count, limit);
}

public override bool GetSetContains(string key, string value)
{
return DbContext
.SetRepository
.Any(x => x.Key == key && x.Value == value);
}

public override string GetFirstByLowestScoreFromSet(string key, double fromScore, double toScore)
{
Expand All @@ -257,6 +279,32 @@ public override string GetFirstByLowestScoreFromSet(string key, double fromScore
.FirstOrDefault();
}

public override List<string> GetFirstByLowestScoreFromSet(string key, double fromScore, double toScore, int count)
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}

if (toScore < fromScore)
{
throw new ArgumentException("The 'toScore' value must be higher or equal to the 'fromScore' value.");
}

var fromScoreDec = fromScore.ToInt64();
var toScoreDec = toScore.ToInt64();

return DbContext
.SetRepository
.Where(_ => _.Key == key &&
_.Score >= fromScoreDec &&
_.Score <= toScoreDec)
.OrderBy(_ => _.Score)
.Select(_ => _.Value)
.Take(count)
.ToList();
}

public override JobData GetJobData(string jobId)
{
if (jobId == null)
Expand Down Expand Up @@ -434,7 +482,7 @@ public override void SetJobParameter(string id, string name, string value)

public override void SetRangeInHash(string key, IEnumerable<KeyValuePair<string, string>> keyValuePairs)
{
using (var transaction = new SQLiteWriteOnlyTransaction(DbContext, _queueProviders))
using (var transaction = CreateWriteTransaction())
{
transaction.SetRangeInHash(key, keyValuePairs);
transaction.Commit();
Expand Down Expand Up @@ -561,6 +609,11 @@ public override TimeSpan GetListTtl(string key)
return result != DateTime.MinValue ? result - DateTime.UtcNow : TimeSpan.FromSeconds(-1);
}

public override DateTime GetUtcDateTime()
{
return DateTime.UtcNow;
}

public override List<string> GetRangeFromList(string key, int startingFrom, int endingAt)
{
if (key == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public void Add(IPersistentJobQueueProvider provider, IEnumerable<string> queues
/// <returns></returns>
public IPersistentJobQueueProvider GetProvider(string queue)
{
return _providersByQueue.ContainsKey(queue)
? _providersByQueue[queue]
return _providersByQueue.TryGetValue(queue, out var value)
? value
: _defaultProvider;
}

Expand Down
10 changes: 9 additions & 1 deletion src/main/Hangfire.Storage.SQLite/SQLiteDistributedLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public void Dispose()

_completed = true;
_heartbeatTimer?.Dispose();
_heartbeatTimer = null;
Release();
}

Expand Down Expand Up @@ -184,7 +185,14 @@ private void StartHeartBeat()
Logger.ErrorFormat("Unable to update heartbeat on the resource '{0}'. The resource is not locked or is locked by another owner.", _resource);
// if we no longer have a lock, stop the heartbeat immediately
_heartbeatTimer?.Dispose();
try
{
_heartbeatTimer?.Dispose();
}
catch (ObjectDisposedException)
{
// well, already disposed?
}
return;
}
}
Expand Down
29 changes: 15 additions & 14 deletions src/main/Hangfire.Storage.SQLite/SQLiteStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,21 @@ public class SQLiteStorage : JobStorage, IDisposable

private readonly SQLiteStorageOptions _storageOptions;

private readonly Dictionary<string, bool> _features = new Dictionary<string, bool>(StringComparer.OrdinalIgnoreCase)
private static readonly Dictionary<string, bool> _features = new Dictionary<string, bool>(StringComparer.OrdinalIgnoreCase)
{
{ "Storage.ExtendedApi", false },
{ "Job.Queue", true },
{ "Connection.GetUtcDateTime", false },
{ "Connection.BatchedGetFirstByLowestScoreFromSet", false },
{ "Connection.GetSetContains", true },
{ "Connection.GetSetCount.Limited", false },
{ "BatchedGetFirstByLowestScoreFromSet", false },
{ "Transaction.AcquireDistributedLock", true },
{ "Transaction.CreateJob", true },
{ "Transaction.SetJobParameter", true },
{ "TransactionalAcknowledge:InMemoryFetchedJob", false },
{ "Monitoring.DeletedStateGraphs", false },
{ "Monitoring.AwaitingJobs", false }
{ JobStorageFeatures.ExtendedApi, true },
{ JobStorageFeatures.JobQueueProperty, true },
{ JobStorageFeatures.Connection.GetUtcDateTime, true },
{ JobStorageFeatures.Connection.BatchedGetFirstByLowest, true },
{ "BatchedGetFirstByLowestScoreFromSet", true }, // ^-- legacy name?
{ JobStorageFeatures.Connection.GetSetContains, true },
{ JobStorageFeatures.Connection.LimitedGetSetCount, true },
{ JobStorageFeatures.Transaction.AcquireDistributedLock, true },
{ JobStorageFeatures.Transaction.CreateJob, false }, // NOTE: implement SQLiteWriteOnlyTransaction.CreateJob(...)
{ JobStorageFeatures.Transaction.SetJobParameter, false }, // NOTE: implement SQLiteWriteOnlyTransaction.SetJobParameter(...)
{ JobStorageFeatures.Transaction.RemoveFromQueue(typeof(SQLiteFetchedJob)), false }, // NOTE: implement SQLiteWriteOnlyTransaction.RemoveFromQueue(...)
{ JobStorageFeatures.Monitoring.DeletedStateGraphs, false },
{ JobStorageFeatures.Monitoring.AwaitingJobs, false }
};

private ConcurrentQueue<PooledHangfireDbContext> _dbContextPool = new ConcurrentQueue<PooledHangfireDbContext>();
Expand Down Expand Up @@ -77,6 +77,7 @@ public SQLiteStorage(SQLiteDbConnectionFactory dbConnectionFactory, SQLiteStorag

using (var dbContext = CreateAndOpenConnection())
{
dbContext.Migrate();
_databasePath = dbContext.Database.DatabasePath;
// Use this to initialize the database as soon as possible
// in case of error, the user will immediately get an exception at startup
Expand Down
56 changes: 47 additions & 9 deletions src/main/Hangfire.Storage.SQLite/SQLiteWriteOnlyTransaction.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using Hangfire.Logging;
using Hangfire.States;
using Hangfire.Storage.SQLite.Entities;
using Newtonsoft.Json;
Expand All @@ -17,8 +16,10 @@ public class SQLiteWriteOnlyTransaction : JobStorageTransaction

private readonly PersistentJobQueueProviderCollection _queueProviders;

private static object _lockObject = new object();

internal readonly List<IDisposable> _acquiredLocks = new List<IDisposable>();

private static readonly ILog Logger = LogProvider.For<SQLiteWriteOnlyTransaction>();

/// <summary>
/// </summary>
/// <param name="connection"></param>
Expand All @@ -36,6 +37,15 @@ private void QueueCommand(Action<HangfireDbContext> action)
_commandQueue.Enqueue(action);
}

public override void AcquireDistributedLock(string resource, TimeSpan timeout)
{
var acquiredLock = SQLiteDistributedLock.Acquire(resource, timeout, _dbContext, _dbContext.StorageOptions);
lock (_acquiredLocks)
{
_acquiredLocks.Add(acquiredLock);
}
}

public override void AddJobState(string jobId, IState state)
{
QueueCommand(_ =>
Expand Down Expand Up @@ -113,16 +123,44 @@ public override void AddToSet(string key, string value, double score)

public override void Commit()
{
Retry.Twice((attempts) => {
lock (_lockObject)
{
try
{
Retry.Twice((attempts) => {
_commandQueue.ToList().ForEach(_ =>
{
_.Invoke(_dbContext);
});
});
}
finally
{
ReleaseAcquiredLocks();
}
}

private void ReleaseAcquiredLocks()
{
lock (_acquiredLocks)
{
foreach (var acquiredLock in _acquiredLocks)
{
try
{
acquiredLock.Dispose();
}
catch (Exception ex)
{
Logger.WarnException("Failed to release a distributed lock", ex);
}
}
});
_acquiredLocks.Clear();
}
}

public override void Dispose()
{
ReleaseAcquiredLocks();
base.Dispose();
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
using Hangfire.Storage.SQLite.Entities;
using Hangfire.Storage.SQLite.Test.Utils;
using System;
using System.Threading;
using Xunit;

namespace Hangfire.Storage.SQLite.Test
{
public class CountersAggregatorFacts
public class CountersAggregatorFacts : SqliteInMemoryTestBase
{
[Fact]
public void CountersAggregatorExecutesProperly()
{
var storage = ConnectionUtils.CreateStorage();
using (var connection = (HangfireSQLiteConnection)storage.GetConnection())
using (var connection = (HangfireSQLiteConnection)Storage.GetConnection())
{
// Arrange
connection.DbContext.Database.Insert(new Counter
Expand All @@ -23,7 +21,7 @@ public void CountersAggregatorExecutesProperly()
ExpireAt = DateTime.UtcNow.AddHours(1)
});

var aggregator = new CountersAggregator(storage, TimeSpan.Zero);
var aggregator = new CountersAggregator(Storage, TimeSpan.Zero);
var cts = new CancellationTokenSource();
cts.Cancel();

Expand Down
Loading

0 comments on commit a51f280

Please sign in to comment.