diff --git a/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowQueueBase.cs b/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowQueueBase.cs
index 490ebb1..9636951 100644
--- a/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowQueueBase.cs
+++ b/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowQueueBase.cs
@@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
+using Microsoft.Extensions.DependencyInjection;
namespace Orleans.Indexing
{
@@ -16,10 +17,10 @@ namespace Orleans.Indexing
/// the following information:
/// - workflowID: grainID + a sequence number
/// - memberUpdates: the updated values of indexed fields
- ///
+ ///
/// Ordinarily, these workflowRecords are for grains that are active on 's silo. (This may not be true for
/// short periods when a grain migrates to another silo or after the silo recovers from failure).
- ///
+ ///
/// + The grain Q has a dictionary updatesOnWait is an in-memory dictionary that maps each grain G to the
/// workflowRecords for G that are waiting for be updated.
///
@@ -28,65 +29,65 @@ internal class IndexWorkflowQueueBase : IIndexWorkflowQueue
//the persistent state of IndexWorkflowQueue, including:
// - doubly linked list of workflowRecordds
// - the identity of the IndexWorkflowQueue GrainService
- protected IndexWorkflowQueueState queueState;
+ protected GrainState queueState;
//the tail of workflowRecords doubly linked list
- internal IndexWorkflowRecordNode _workflowRecordsTail;
+ internal IndexWorkflowRecordNode workflowRecordsTail;
//the grain storage for the index workflow queue
- private volatile IGrainStorage StorageProvider;
+ volatile IGrainStorage StorageProvider;
- private int _queueSeqNum;
- private Type _grainInterfaceType;
- private string _grainTypeName;
+ int _queueSeqNum;
+ Type _grainInterfaceType;
+ string _grainTypeName;
- private bool HasAnyTotalIndex => GetHasAnyTotalIndex();
- private bool? __hasAnyTotalIndex = null;
+ bool HasAnyTotalIndex => GetHasAnyTotalIndex();
+ bool? __hasAnyTotalIndex = null;
- private bool _isDefinedAsFaultTolerantGrain;
- private bool IsFaultTolerant => _isDefinedAsFaultTolerantGrain && HasAnyTotalIndex;
+ bool _isDefinedAsFaultTolerantGrain;
+ bool IsFaultTolerant => _isDefinedAsFaultTolerantGrain && HasAnyTotalIndex;
- private IIndexWorkflowQueueHandler __handler;
- private IIndexWorkflowQueueHandler Handler => InitWorkflowQueueHandler();
+ IIndexWorkflowQueueHandler __handler;
+ IIndexWorkflowQueueHandler Handler => InitWorkflowQueueHandler();
- private int _isHandlerWorkerIdle;
+ int _isHandlerWorkerIdle;
///
/// This lock is used to queue all the writes to the storage and do them in a single batch, i.e., group commit
- ///
+ ///
/// Works hand-in-hand with pendingWriteRequests and writeRequestIdGen.
///
- private AsyncLock _writeLock;
+ AsyncLock _writeLock;
///
/// Creates a unique ID for each write request to the storage.
- ///
+ ///
/// The values generated by this ID generator are used in pendingWriteRequests
///
- private int _writeRequestIdGen;
+ int _writeRequestIdGen;
///
/// All the write requests that are waiting behind write_lock are accumulated
/// in this data structure, and all of them will be done at once.
///
- private HashSet _pendingWriteRequests;
+ HashSet _pendingWriteRequests;
public const int BATCH_SIZE = int.MaxValue;
- private SiloAddress _silo;
- private SiloIndexManager SiloIndexManager;
- private Lazy _lazyParent;
- private GrainReference _recoveryGrainReference;
+ SiloAddress _silo;
+ SiloIndexManager SiloIndexManager;
+ Lazy _lazyParent;
+ GrainReference _recoveryGrainReference;
internal IndexWorkflowQueueBase(SiloIndexManager siloIndexManager, Type grainInterfaceType, int queueSequenceNumber, SiloAddress silo,
bool isDefinedAsFaultTolerantGrain, Func parentFunc, GrainReference recoveryGrainReference = null)
{
- queueState = new IndexWorkflowQueueState();
+ queueState = new GrainState(new IndexWorkflowQueueEntry());
_grainInterfaceType = grainInterfaceType;
_queueSeqNum = queueSequenceNumber;
_grainTypeName = "Orleans.Indexing.IndexWorkflowQueue-" + IndexUtils.GetFullTypeName(_grainInterfaceType);
- _workflowRecordsTail = null;
+ this.workflowRecordsTail = null;
__handler = null;
_isHandlerWorkerIdle = 1;
@@ -102,13 +103,12 @@ internal IndexWorkflowQueueBase(SiloIndexManager siloIndexManager, Type grainInt
_lazyParent = new Lazy(parentFunc, true);
}
- private IIndexWorkflowQueueHandler InitWorkflowQueueHandler()
- => __handler = _lazyParent.Value.IsGrainService
- ? SiloIndexManager.GetGrainService(
- IndexWorkflowQueueHandlerBase.CreateIndexWorkflowQueueHandlerGrainReference(SiloIndexManager, _grainInterfaceType, _queueSeqNum, _silo))
- : SiloIndexManager.GrainFactory.GetGrain(CreateIndexWorkflowQueuePrimaryKey(_grainInterfaceType, _queueSeqNum));
+ IIndexWorkflowQueueHandler InitWorkflowQueueHandler() =>
+ __handler = _lazyParent.Value.GrainId.IsSystemTarget()
+ ? SiloIndexManager.GetGrainService(IndexWorkflowQueueHandlerBase.CreateIndexWorkflowQueueHandlerGrainReference(SiloIndexManager, _grainInterfaceType, _queueSeqNum, _silo))
+ : SiloIndexManager.GrainFactory.GetGrain(CreateIndexWorkflowQueuePrimaryKey(_grainInterfaceType, _queueSeqNum));
- private async Task EnsureStorage()
+ async Task EnsureStorage()
{
if (this.StorageProvider == null)
{
@@ -117,20 +117,21 @@ private async Task EnsureStorage()
if (this.StorageProvider == null) // Make sure another thread didn't get it
{
var readGrainReference = this._recoveryGrainReference ?? _lazyParent.Value;
- this.StorageProvider = typeof(IndexWorkflowQueueGrainService).GetGrainStorage(this.SiloIndexManager.ServiceProvider);
- await this.StorageProvider.ReadStateAsync(_grainTypeName, readGrainReference, this.queueState);
+ //this.StorageProvider = typeof(IndexWorkflowQueueGrainService).GetGrainStorage(this.SiloIndexManager.ServiceProvider);
+ this.StorageProvider = this.SiloIndexManager.ServiceProvider.GetRequiredService();
+ await this.StorageProvider.ReadStateAsync(_grainTypeName, readGrainReference.GrainId, this.queueState);
}
}
}
}
-
+
public async Task AddAllToQueue(Immutable> workflowRecords)
{
await this.EnsureStorage();
// Note: this can be called with an empty enumeration, to just "wake up" the thread in FT recovery.
- List newWorkflows = workflowRecords.Value;
- foreach (IndexWorkflowRecord newWorkflow in newWorkflows)
+ var newWorkflows = workflowRecords.Value;
+ foreach (var newWorkflow in newWorkflows)
{
AddToQueueNonPersistent(newWorkflow);
}
@@ -142,91 +143,93 @@ public async Task AddAllToQueue(Immutable> workflowRec
public async Task AddToQueue(Immutable workflow)
{
await this.EnsureStorage();
-
AddToQueueNonPersistent(workflow.Value);
-
InitiateWorkerThread();
await (IsFaultTolerant ? PersistState() : Task.CompletedTask);
}
- private void AddToQueueNonPersistent(IndexWorkflowRecord newWorkflow)
+ void AddToQueueNonPersistent(IndexWorkflowRecord newWorkflow)
{
var newWorkflowNode = new IndexWorkflowRecordNode(newWorkflow);
- if (_workflowRecordsTail == null) //if the list is empty
+ if (this.workflowRecordsTail == null) //if the list is empty
{
- _workflowRecordsTail = newWorkflowNode;
+ this.workflowRecordsTail = newWorkflowNode;
queueState.State.WorkflowRecordsHead = newWorkflowNode;
}
else // otherwise append to the end of the list
{
- _workflowRecordsTail.Append(newWorkflowNode, ref _workflowRecordsTail);
+ this.workflowRecordsTail.Append(newWorkflowNode, ref this.workflowRecordsTail);
}
}
public async Task RemoveAllFromQueue(Immutable> workflowRecords)
{
- await this.EnsureStorage();
-
- List newWorkflows = workflowRecords.Value;
- foreach (IndexWorkflowRecord newWorkflow in newWorkflows)
+ await EnsureStorage();
+ var newWorkflows = workflowRecords.Value;
+ foreach (var newWorkflow in newWorkflows)
{
RemoveFromQueueNonPersistent(newWorkflow);
}
await (IsFaultTolerant ? PersistState() : Task.CompletedTask);
}
- private void RemoveFromQueueNonPersistent(IndexWorkflowRecord newWorkflow)
+ void RemoveFromQueueNonPersistent(IndexWorkflowRecord newWorkflow)
{
for (var current = queueState.State.WorkflowRecordsHead; current != null; current = current.Next)
{
if (newWorkflow.Equals(current.WorkflowRecord))
{
- current.Remove(ref queueState.State.WorkflowRecordsHead, ref _workflowRecordsTail);
+ current.Remove(ref queueState.State.WorkflowRecordsHead, ref this.workflowRecordsTail);
return;
}
}
}
- private void InitiateWorkerThread()
+ void InitiateWorkerThread()
{
- if (this.SiloIndexManager.InjectableCode.ShouldRunQueueThread(() => Interlocked.Exchange(ref _isHandlerWorkerIdle, 0) == 1))
+ if (SiloIndexManager.InjectableCode.ShouldRunQueueThread(() => Interlocked.Exchange(ref _isHandlerWorkerIdle, 0) == 1))
{
- IndexWorkflowRecordNode punctuatedHead = AddPunctuationAt(BATCH_SIZE);
+ var punctuatedHead = AddPunctuationAt(BATCH_SIZE);
Handler.HandleWorkflowsUntilPunctuation(punctuatedHead.AsImmutable()).Ignore();
}
}
- private IndexWorkflowRecordNode AddPunctuationAt(int batchSize)
+ IndexWorkflowRecordNode AddPunctuationAt(int batchSize)
{
- if (_workflowRecordsTail == null) throw new WorkflowIndexException("Adding a punctuation to an empty workflow queue is not possible.");
+ if (this.workflowRecordsTail is null)
+ throw new WorkflowIndexException("Adding a punctuation to an empty workflow queue is not possible.");
- var punctuationHead = queueState.State.WorkflowRecordsHead;
- if (punctuationHead.IsPunctuation) throw new WorkflowIndexException("The element at the head of workflow queue cannot be a punctuation.");
+ var punctuationHead = this.queueState.State.WorkflowRecordsHead;
+ if (punctuationHead.IsPunctuation)
+ throw new WorkflowIndexException("The element at the head of workflow queue cannot be a punctuation.");
if (batchSize == int.MaxValue)
{
- var punctuation = _workflowRecordsTail.AppendPunctuation(ref _workflowRecordsTail);
+ var punctuation = this.workflowRecordsTail.AppendPunctuation(ref this.workflowRecordsTail);
return punctuationHead;
}
+
var punctuationLoc = punctuationHead;
for (int i = 1; i < batchSize && punctuationLoc.Next != null; ++i)
{
punctuationLoc = punctuationLoc.Next;
}
- punctuationLoc.AppendPunctuation(ref _workflowRecordsTail);
+
+ punctuationLoc.AppendPunctuation(ref this.workflowRecordsTail);
+
return punctuationHead;
}
- private List RemoveFromQueueUntilPunctuation(IndexWorkflowRecordNode from)
+ List RemoveFromQueueUntilPunctuation(IndexWorkflowRecordNode from)
{
- List workflowRecords = new List();
+ var workflowRecords = new List();
if (from != null && !from.IsPunctuation)
{
workflowRecords.Add(from.WorkflowRecord);
}
- IndexWorkflowRecordNode tmp = from?.Next;
+ var tmp = from?.Next;
while (tmp != null && !tmp.IsPunctuation)
{
workflowRecords.Add(tmp.WorkflowRecord);
@@ -236,20 +239,20 @@ private List RemoveFromQueueUntilPunctuation(IndexWorkflowR
if (tmp == null)
{
- from.Remove(ref queueState.State.WorkflowRecordsHead, ref _workflowRecordsTail);
+ from.Remove(ref queueState.State.WorkflowRecordsHead, ref this.workflowRecordsTail);
}
else
{
from.Next = tmp;
tmp.Prev = from;
- from.Remove(ref queueState.State.WorkflowRecordsHead, ref _workflowRecordsTail);
- tmp.Remove(ref queueState.State.WorkflowRecordsHead, ref _workflowRecordsTail);
+ from.Remove(ref queueState.State.WorkflowRecordsHead, ref this.workflowRecordsTail);
+ tmp.Remove(ref queueState.State.WorkflowRecordsHead, ref this.workflowRecordsTail);
}
return workflowRecords;
}
- private async Task PersistState()
+ async Task PersistState()
{
//create a write-request ID, which is used for group commit
int writeRequestId = ++_writeRequestIdGen;
@@ -271,7 +274,7 @@ private async Task PersistState()
try
{
this.queueState.ETag = StorageProviderUtils.ANY_ETAG;
- await StorageProvider.WriteStateAsync(_grainTypeName, _lazyParent.Value, this.queueState);
+ await StorageProvider.WriteStateAsync(_grainTypeName, _lazyParent.Value.GrainId, this.queueState);
}
finally
{
@@ -284,11 +287,11 @@ private async Task PersistState()
}
}
- private static Immutable EmptyIndexWorkflowRecordNode = new Immutable(null);
+ static Immutable EmptyIndexWorkflowRecordNode = new Immutable(null);
public Task> GiveMoreWorkflowsOrSetAsIdle()
{
- List removedWorkflows = RemoveFromQueueUntilPunctuation(queueState.State.WorkflowRecordsHead);
+ var removedWorkflows = RemoveFromQueueUntilPunctuation(queueState.State.WorkflowRecordsHead);
if (IsFaultTolerant)
{
//The task of removing the workflow record IDs from the grain runs in parallel with persisting the state. At this point, there
@@ -302,7 +305,7 @@ public Task> GiveMoreWorkflowsOrSetAsIdle()
).Ignore();
}
- if (_workflowRecordsTail == null)
+ if (this.workflowRecordsTail == null)
{
_isHandlerWorkerIdle = 1;
return Task.FromResult(EmptyIndexWorkflowRecordNode);
@@ -314,7 +317,7 @@ public Task> GiveMoreWorkflowsOrSetAsIdle()
}
}
- private bool GetHasAnyTotalIndex()
+ bool GetHasAnyTotalIndex()
{
if (!__hasAnyTotalIndex.HasValue)
{
@@ -341,23 +344,34 @@ public async Task>> GetRemainingWorkflowsIn(
public Task Initialize(IIndexWorkflowQueue oldParentGrainService)
=> throw new NotSupportedException();
-#region STATIC HELPER FUNCTIONS
public static GrainReference CreateIndexWorkflowQueueGrainReference(SiloIndexManager siloIndexManager, Type grainInterfaceType, int queueSeqNum, SiloAddress siloAddress)
=> CreateGrainServiceGrainReference(siloIndexManager, grainInterfaceType, queueSeqNum, siloAddress);
public static string CreateIndexWorkflowQueuePrimaryKey(Type grainInterfaceType, int queueSeqNum)
=> $"{IndexUtils.GetFullTypeName(grainInterfaceType)}-{queueSeqNum}";
- private static GrainReference CreateGrainServiceGrainReference(SiloIndexManager siloIndexManager, Type grainInterfaceType, int queueSeqNum, SiloAddress siloAddress)
- => siloIndexManager.MakeGrainServiceGrainReference(IndexingConstants.INDEX_WORKFLOW_QUEUE_GRAIN_SERVICE_TYPE_CODE,
- CreateIndexWorkflowQueuePrimaryKey(grainInterfaceType, queueSeqNum), siloAddress);
+ static GrainReference CreateGrainServiceGrainReference(SiloIndexManager siloIndexManager, Type grainInterfaceType, int queueSeqNum, SiloAddress siloAddress)
+ => siloIndexManager.MakeGrainServiceGrainReference(
+ typeData: IndexingConstants.INDEX_WORKFLOW_QUEUE_GRAIN_SERVICE_TYPE_CODE,
+ systemGrainId: CreateIndexWorkflowQueuePrimaryKey(grainInterfaceType, queueSeqNum),
+ siloAddress: siloAddress);
public static IIndexWorkflowQueue GetIndexWorkflowQueueFromGrainHashCode(SiloIndexManager siloIndexManager, Type grainInterfaceType, int grainHashCode, SiloAddress siloAddress)
{
- int queueSeqNum = StorageProviderUtils.PositiveHash(grainHashCode, siloIndexManager.NumWorkflowQueuesPerInterface);
+ var queueSeqNum = StorageProviderUtils.PositiveHash(grainHashCode, siloIndexManager.NumWorkflowQueuesPerInterface);
var grainReference = CreateGrainServiceGrainReference(siloIndexManager, grainInterfaceType, queueSeqNum, siloAddress);
return siloIndexManager.GetGrainService(grainReference);
}
-#endregion STATIC HELPER FUNCTIONS
+ }
+
+ internal class StorageProviderUtils
+ {
+ public const string ANY_ETAG = "*";
+
+ public static int PositiveHash(int hash, int hashRange)
+ {
+ var positiveHash = ((hash % hashRange) + hashRange) % hashRange;
+ return positiveHash;
+ }
}
}
diff --git a/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowQueueGrainService.cs b/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowQueueGrainService.cs
index 7d6bc08..0045f1d 100644
--- a/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowQueueGrainService.cs
+++ b/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowQueueGrainService.cs
@@ -15,10 +15,10 @@ namespace Orleans.Indexing
/// the following information:
/// - workflowID: grainID + a sequence number
/// - memberUpdates: the updated values of indexed fields
- ///
+ ///
/// Ordinarily, these workflowRecords are for grains that are active on 's silo. (This may not be true for
/// short periods when a grain migrates to another silo or after the silo recovers from failure).
- ///
+ ///
/// + The grain Q has a dictionary updatesOnWait is an in-memory dictionary that maps each grain G to the
/// workflowRecords for G that are waiting for be updated.
///
@@ -26,14 +26,14 @@ namespace Orleans.Indexing
[Reentrant]
internal class IndexWorkflowQueueGrainService : GrainService, IIndexWorkflowQueue
{
- private IndexWorkflowQueueBase _base;
+ IndexWorkflowQueueBase _base;
internal IndexWorkflowQueueGrainService(SiloIndexManager siloIndexManager, Type grainInterfaceType, int queueSequenceNumber, bool isDefinedAsFaultTolerantGrain)
- : base(IndexWorkflowQueueBase.CreateIndexWorkflowQueueGrainReference(siloIndexManager, grainInterfaceType, queueSequenceNumber, siloIndexManager.SiloAddress).GrainIdentity,
+ : base(IndexWorkflowQueueBase.CreateIndexWorkflowQueueGrainReference(siloIndexManager, grainInterfaceType, queueSequenceNumber, siloIndexManager.SiloAddress).GrainId,
siloIndexManager.Silo, siloIndexManager.LoggerFactory)
{
_base = new IndexWorkflowQueueBase(siloIndexManager, grainInterfaceType, queueSequenceNumber, siloIndexManager.SiloAddress, isDefinedAsFaultTolerantGrain,
- () => base.GetGrainReference()); // lazy is needed because the runtime isn't attached until Registered
+ () => base.GrainReference); // lazy is needed because the runtime isn't attached until Registered
}
public Task AddAllToQueue(Immutable> workflowRecords)
diff --git a/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowQueueHandlerBase.cs b/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowQueueHandlerBase.cs
index b6c6577..c9fbb95 100644
--- a/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowQueueHandlerBase.cs
+++ b/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowQueueHandlerBase.cs
@@ -9,27 +9,26 @@ namespace Orleans.Indexing
{
internal class IndexWorkflowQueueHandlerBase : IIndexWorkflowQueueHandler
{
- private IIndexWorkflowQueue __workflowQueue;
- private IIndexWorkflowQueue WorkflowQueue => __workflowQueue ?? InitIndexWorkflowQueue();
+ IIndexWorkflowQueue __workflowQueue;
+ IIndexWorkflowQueue WorkflowQueue => __workflowQueue ?? InitIndexWorkflowQueue();
- private int _queueSeqNum;
- private Type _grainInterfaceType;
+ int _queueSeqNum;
+ Type _grainInterfaceType;
- private bool _isDefinedAsFaultTolerantGrain;
- private bool _hasAnyTotalIndex;
- private bool HasAnyTotalIndex { get { EnsureGrainIndexes(); return _hasAnyTotalIndex; } }
- private bool IsFaultTolerant => _isDefinedAsFaultTolerantGrain && HasAnyTotalIndex;
+ bool _isDefinedAsFaultTolerantGrain;
+ bool _hasAnyTotalIndex;
+ bool HasAnyTotalIndex { get { EnsureGrainIndexes(); return _hasAnyTotalIndex; } }
+ bool IsFaultTolerant => _isDefinedAsFaultTolerantGrain && HasAnyTotalIndex;
- private NamedIndexMap __grainIndexes;
+ NamedIndexMap __grainIndexes;
- private NamedIndexMap GrainIndexes => EnsureGrainIndexes();
+ NamedIndexMap GrainIndexes => EnsureGrainIndexes();
- private SiloAddress _silo;
- private SiloIndexManager _siloIndexManager;
- private Lazy _lazyParent;
+ SiloAddress _silo;
+ SiloIndexManager _siloIndexManager;
+ Lazy _lazyParent;
- internal IndexWorkflowQueueHandlerBase(SiloIndexManager siloIndexManager, Type grainInterfaceType, int queueSeqNum, SiloAddress silo,
- bool isDefinedAsFaultTolerantGrain, Func parentFunc)
+ internal IndexWorkflowQueueHandlerBase(SiloIndexManager siloIndexManager, Type grainInterfaceType, int queueSeqNum, SiloAddress silo, bool isDefinedAsFaultTolerantGrain, Func parentFunc)
{
_grainInterfaceType = grainInterfaceType;
_queueSeqNum = queueSeqNum;
@@ -46,43 +45,42 @@ public async Task HandleWorkflowsUntilPunctuation(Immutable FtRemoveFromActiveWorkflowsInGrainsTasks(Dictionary> grainsToActiveWorkflows)
+ IEnumerable FtRemoveFromActiveWorkflowsInGrainsTasks(Dictionary> grainsToActiveWorkflows)
=> grainsToActiveWorkflows.Select(kvp => kvp.Key.RemoveFromActiveWorkflowIds(kvp.Value));
- private IEnumerable> PrepareIndexUpdateTasks(Dictionary>> updatesToIndexes)
+ IEnumerable> PrepareIndexUpdateTasks(Dictionary>> updatesToIndexes)
=> updatesToIndexes.Select(updt => (indexInfo: this.GrainIndexes[updt.Key], updatesToIndex: updt.Value))
.Where(pair => pair.updatesToIndex.Count > 0)
.Select(pair => pair.indexInfo.IndexInterface.ApplyIndexUpdateBatch(this._siloIndexManager, pair.updatesToIndex.AsImmutable(),
pair.indexInfo.MetaData.IsUniqueIndex, pair.indexInfo.MetaData, _silo));
- private Dictionary>> PopulateUpdatesToIndexes(
- IndexWorkflowRecordNode currentWorkflow, Dictionary> grainsToActiveWorkflows)
+ Dictionary>> PopulateUpdatesToIndexes(IndexWorkflowRecordNode currentWorkflow, Dictionary> grainsToActiveWorkflows)
{
var updatesToIndexes = new Dictionary>>();
- bool faultTolerant = IsFaultTolerant;
+ var faultTolerant = IsFaultTolerant;
for (; !currentWorkflow.IsPunctuation; currentWorkflow = currentWorkflow.Next)
{
- IndexWorkflowRecord workflowRec = currentWorkflow.WorkflowRecord;
- IIndexableGrain g = workflowRec.Grain;
- bool existsInActiveWorkflows = faultTolerant && grainsToActiveWorkflows.TryGetValue(g, out HashSet activeWorkflowRecs)
- && activeWorkflowRecs.Contains(workflowRec.WorkflowId);
+ var workflowRec = currentWorkflow.WorkflowRecord;
+ var g = workflowRec.Grain;
+ var existsInActiveWorkflows = faultTolerant && grainsToActiveWorkflows.TryGetValue(g, out var activeWorkflowRecs) && activeWorkflowRecs.Contains(workflowRec.WorkflowId);
foreach (var (indexName, updt) in currentWorkflow.WorkflowRecord.MemberUpdates.Where(kvp => kvp.Value.OperationType != IndexOperationType.None))
{
@@ -104,10 +102,10 @@ private Dictionary>> P
return updatesToIndexes;
}
- private static HashSet emptyHashset = new HashSet();
- private static Dictionary> emptyDictionary = new Dictionary>();
+ static readonly HashSet emptyHashset = new();
+ static readonly Dictionary> emptyDictionary = new();
- private async Task>> FtGetActiveWorkflowSetsFromGrains(IndexWorkflowRecordNode currentWorkflow)
+ async Task>> GetActiveWorkflowSetsFromGrains(IndexWorkflowRecordNode currentWorkflow)
{
var activeWorkflowSetTasksByGrain = new Dictionary>>>();
var currentWorkflowIds = new HashSet();
@@ -116,10 +114,9 @@ private async Task>> FtGetActiveWorkfl
{
var record = currentWorkflow.WorkflowRecord;
currentWorkflowIds.Add(record.WorkflowId);
- IIndexableGrain g = record.Grain;
- if (!activeWorkflowSetTasksByGrain.ContainsKey(g) && record.MemberUpdates.Where(ups => ups.Value.OperationType != IndexOperationType.None).Any())
+ if (!activeWorkflowSetTasksByGrain.ContainsKey(record.Grain) && record.MemberUpdates.Any(ups => ups.Value.OperationType != IndexOperationType.None))
{
- activeWorkflowSetTasksByGrain[g] = g.AsReference(this._siloIndexManager, this._grainInterfaceType).GetActiveWorkflowIdsSet();
+ activeWorkflowSetTasksByGrain[record.Grain] = record.Grain.AsReference(this._siloIndexManager, this._grainInterfaceType).GetActiveWorkflowIdsSet();
}
}
@@ -135,7 +132,7 @@ private async Task>> FtGetActiveWorkfl
return new Dictionary>();
}
- private NamedIndexMap EnsureGrainIndexes()
+ NamedIndexMap EnsureGrainIndexes()
{
if (__grainIndexes == null)
{
@@ -146,14 +143,15 @@ private NamedIndexMap EnsureGrainIndexes()
}
// TODO clean up some of the duplicated id-generation code.
- private IIndexWorkflowQueue InitIndexWorkflowQueue()
- => __workflowQueue = _lazyParent.Value.IsGrainService
+ IIndexWorkflowQueue InitIndexWorkflowQueue()
+ => __workflowQueue = _lazyParent.Value.GrainId.IsSystemTarget()
? _siloIndexManager.GetGrainService(IndexWorkflowQueueBase.CreateIndexWorkflowQueueGrainReference(_siloIndexManager, _grainInterfaceType, _queueSeqNum, _silo))
: _siloIndexManager.GrainFactory.GetGrain(IndexWorkflowQueueBase.CreateIndexWorkflowQueuePrimaryKey(_grainInterfaceType, _queueSeqNum));
public static GrainReference CreateIndexWorkflowQueueHandlerGrainReference(SiloIndexManager siloIndexManager, Type grainInterfaceType, int queueSeqNum, SiloAddress siloAddress)
=> siloIndexManager.MakeGrainServiceGrainReference(IndexingConstants.INDEX_WORKFLOW_QUEUE_HANDLER_GRAIN_SERVICE_TYPE_CODE,
- IndexWorkflowQueueBase.CreateIndexWorkflowQueuePrimaryKey(grainInterfaceType, queueSeqNum), siloAddress);
+ IndexWorkflowQueueBase.CreateIndexWorkflowQueuePrimaryKey(grainInterfaceType, queueSeqNum),
+ siloAddress);
public Task Initialize(IIndexWorkflowQueue oldParentGrainService)
=> throw new NotSupportedException();
diff --git a/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowQueueHandlerGrainService.cs b/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowQueueHandlerGrainService.cs
index 0fd5449..90bc056 100644
--- a/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowQueueHandlerGrainService.cs
+++ b/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowQueueHandlerGrainService.cs
@@ -8,14 +8,12 @@ namespace Orleans.Indexing
[Reentrant]
internal class IndexWorkflowQueueHandlerGrainService : GrainService, IIndexWorkflowQueueHandler
{
- private IIndexWorkflowQueueHandler _base;
+ IIndexWorkflowQueueHandler _base;
internal IndexWorkflowQueueHandlerGrainService(SiloIndexManager sim, Type grainInterfaceType, int queueSeqNum, bool isDefinedAsFaultTolerantGrain)
- : base(IndexWorkflowQueueHandlerBase.CreateIndexWorkflowQueueHandlerGrainReference(sim, grainInterfaceType, queueSeqNum, sim.SiloAddress).GrainIdentity,
- sim.Silo, sim.LoggerFactory)
+ : base(IndexWorkflowQueueHandlerBase.CreateIndexWorkflowQueueHandlerGrainReference(sim, grainInterfaceType, queueSeqNum, sim.SiloAddress).GrainId, sim.Silo, sim.LoggerFactory)
{
- _base = new IndexWorkflowQueueHandlerBase(sim, grainInterfaceType, queueSeqNum, sim.SiloAddress, isDefinedAsFaultTolerantGrain,
- () => base.GetGrainReference()); // lazy is needed because the runtime isn't attached until Registered
+ _base = new IndexWorkflowQueueHandlerBase(sim, grainInterfaceType, queueSeqNum, sim.SiloAddress, isDefinedAsFaultTolerantGrain, () => base.GrainReference); // lazy is needed because the runtime isn't attached until Registered
}
public Task HandleWorkflowsUntilPunctuation(Immutable workflowRecordsHead)
diff --git a/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowQueueState.cs b/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowQueueState.cs
index 3a27966..5e8a3e6 100644
--- a/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowQueueState.cs
+++ b/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowQueueState.cs
@@ -3,27 +3,16 @@
namespace Orleans.Indexing
{
- ///
- /// The persistent unit for storing the information for an
- ///
- /// This requires GrainState instead of using StateStorageBridge, due to having to set the ETag for upsert.
- [Serializable]
- internal class IndexWorkflowQueueState : GrainState
- {
- public IndexWorkflowQueueState() : base(new IndexWorkflowQueueEntry())
- {
- }
- }
-
///
/// All the information stored for a single
///
[Serializable]
+ [GenerateSerializer]
internal class IndexWorkflowQueueEntry
{
- // Updates that must be propagated to indexes.
+ ///
+ /// The sequence of updates to be applied to indices.
+ ///
internal IndexWorkflowRecordNode WorkflowRecordsHead;
-
- public IndexWorkflowQueueEntry() => this.WorkflowRecordsHead = null;
}
}
diff --git a/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowRecord.cs b/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowRecord.cs
index afcf3fa..737d6c4 100644
--- a/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowRecord.cs
+++ b/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowRecord.cs
@@ -4,6 +4,7 @@
namespace Orleans.Indexing
{
[Serializable]
+ [GenerateSerializer]
internal class IndexWorkflowRecord
{
///
diff --git a/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowRecordNode.cs b/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowRecordNode.cs
index 0675e8f..9b1b510 100644
--- a/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowRecordNode.cs
+++ b/src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowRecordNode.cs
@@ -6,10 +6,11 @@ namespace Orleans.Indexing
{
///
/// A node in the linked list of workflowRecords.
- ///
+ ///
/// This linked list makes the traversal more efficient.
///
[Serializable]
+ [GenerateSerializer]
internal class IndexWorkflowRecordNode
{
internal IndexWorkflowRecord WorkflowRecord;
diff --git a/src/Orleans.Indexing/Core/FaultTolerance/ReincarnatedIndexWorkflowQueue.cs b/src/Orleans.Indexing/Core/FaultTolerance/ReincarnatedIndexWorkflowQueue.cs
index 640d9ea..a378fa5 100644
--- a/src/Orleans.Indexing/Core/FaultTolerance/ReincarnatedIndexWorkflowQueue.cs
+++ b/src/Orleans.Indexing/Core/FaultTolerance/ReincarnatedIndexWorkflowQueue.cs
@@ -2,7 +2,9 @@
using Orleans.Runtime;
using System;
using System.Collections.Generic;
+using System.Threading;
using System.Threading.Tasks;
+using Microsoft.Extensions.DependencyInjection;
namespace Orleans.Indexing
{
@@ -15,10 +17,10 @@ internal class ReincarnatedIndexWorkflowQueue : Grain, IIndexWorkflowQueue
internal SiloIndexManager SiloIndexManager => IndexManager.GetSiloIndexManager(ref __siloIndexManager, base.ServiceProvider);
private SiloIndexManager __siloIndexManager;
- public override Task OnActivateAsync()
+ public override Task OnActivateAsync(CancellationToken ct)
{
DelayDeactivation(ACTIVE_FOR_A_DAY);
- return base.OnActivateAsync();
+ return base.OnActivateAsync(ct);
}
public Task Initialize(IIndexWorkflowQueue oldParentGrainService)
@@ -37,7 +39,7 @@ public Task Initialize(IIndexWorkflowQueue oldParentGrainService)
int queueSequenceNumber = int.Parse(parts[1]);
_base = new IndexWorkflowQueueBase(this.SiloIndexManager, grainInterfaceType, queueSequenceNumber,
- oldParentGrainServiceRef.GrainServiceSiloAddress,
+ this.ServiceProvider.GetRequiredService().SiloAddress,
isDefinedAsFaultTolerantGrain: true /*otherwise it shouldn't have reached here!*/,
parentFunc: () => this.AsWeaklyTypedReference(), recoveryGrainReference:oldParentGrainServiceRef);
}
diff --git a/src/Orleans.Indexing/Core/FaultTolerance/ReincarnatedIndexWorkflowQueueHandler.cs b/src/Orleans.Indexing/Core/FaultTolerance/ReincarnatedIndexWorkflowQueueHandler.cs
index 6c76a9b..b0dba44 100644
--- a/src/Orleans.Indexing/Core/FaultTolerance/ReincarnatedIndexWorkflowQueueHandler.cs
+++ b/src/Orleans.Indexing/Core/FaultTolerance/ReincarnatedIndexWorkflowQueueHandler.cs
@@ -1,26 +1,29 @@
using Orleans.Concurrency;
using System;
+using System.Threading;
using System.Threading.Tasks;
+using Microsoft.Extensions.DependencyInjection;
+using Orleans.Runtime;
namespace Orleans.Indexing
{
[Reentrant]
internal class ReincarnatedIndexWorkflowQueueHandler : Grain, IIndexWorkflowQueueHandler
{
- private IIndexWorkflowQueueHandler _base;
+ IIndexWorkflowQueueHandler _base;
internal SiloIndexManager SiloIndexManager => IndexManager.GetSiloIndexManager(ref __siloIndexManager, base.ServiceProvider);
- private SiloIndexManager __siloIndexManager;
+ SiloIndexManager __siloIndexManager;
- public override Task OnActivateAsync()
+ public override Task OnActivateAsync(CancellationToken ct)
{
DelayDeactivation(ReincarnatedIndexWorkflowQueue.ACTIVE_FOR_A_DAY);
- return base.OnActivateAsync();
+ return base.OnActivateAsync(ct);
}
public Task Initialize(IIndexWorkflowQueue oldParentGrainService)
{
- if (_base == null)
+ if (_base is null)
{
var oldParentGrainServiceRef = oldParentGrainService.AsWeaklyTypedReference();
var parts = oldParentGrainServiceRef.GetPrimaryKeyString().Split('-');
@@ -30,11 +33,13 @@ public Task Initialize(IIndexWorkflowQueue oldParentGrainService)
" The primary key is '" + oldParentGrainServiceRef.GetPrimaryKeyString() + "'");
}
- Type grainInterfaceType = this.SiloIndexManager.CachedTypeResolver.ResolveType(parts[0]);
- int queueSequenceNumber = int.Parse(parts[1]);
+ var grainInterfaceType = this.SiloIndexManager.CachedTypeResolver.ResolveType(parts[0]);
+ var queueSequenceNumber = int.Parse(parts[1]);
+
+ var runtime = ServiceProvider.GetRequiredService();
_base = new IndexWorkflowQueueHandlerBase(this.SiloIndexManager, grainInterfaceType, queueSequenceNumber,
- oldParentGrainServiceRef.GrainServiceSiloAddress,
+ runtime.SiloAddress,
isDefinedAsFaultTolerantGrain: true /*otherwise it shouldn't have reached here!*/,
() => this.AsWeaklyTypedReference());
}
diff --git a/src/Orleans.Indexing/Core/GrainIndexes.cs b/src/Orleans.Indexing/Core/GrainIndexes.cs
index ba579d6..fcf0a14 100644
--- a/src/Orleans.Indexing/Core/GrainIndexes.cs
+++ b/src/Orleans.Indexing/Core/GrainIndexes.cs
@@ -14,16 +14,15 @@ internal class GrainIndexes : IEnumerable>
/// An immutable cached version of IndexInfo (containing IIndexUpdateGenerator) instances for the current indexes on the grain,
/// keyed by interface.
///
- private IDictionary interfaceToIndexMap = new Dictionary();
+ IDictionary interfaceToIndexMap = new Dictionary();
+
internal InterfaceIndexes this[Type interfaceType] => this.interfaceToIndexMap[interfaceType];
internal bool ContainsInterface(Type interfaceType) => this.interfaceToIndexMap.ContainsKey(interfaceType);
internal IReadOnlyDictionary PropertyNullValues { get; }
- private IndexRegistry indexRegistry;
- private GrainIndexes(IndexRegistry registry, IEnumerable indexedInterfaceTypes, IReadOnlyDictionary propertyNullValues)
+ GrainIndexes(IndexRegistry registry, IEnumerable indexedInterfaceTypes, IReadOnlyDictionary propertyNullValues)
{
- this.indexRegistry = registry;
this.PropertyNullValues = propertyNullValues;
this.interfaceToIndexMap = indexedInterfaceTypes.ToDictionary(itf => itf, itf => new InterfaceIndexes(registry[itf]));
}
@@ -43,29 +42,32 @@ internal static bool CreateInstance(IndexRegistry registry, Type grainType, out
internal void MapStateToProperties(object state)
{
+ var grainStateType = state.GetType();
+ this.interfaceToIndexMap.ForEach(kvp => createOrUpdatePropertiesFromState(kvp.Value));
+ return;
+
void createOrUpdatePropertiesFromState(InterfaceIndexes indexes)
{
- var tProperties = indexes.PropertiesType;
- var tGrainState = state.GetType();
+ indexes.Properties = indexes.PropertiesType.IsAssignableFrom(grainStateType) ? state : mapStateToProperties();
+ return;
+
object mapStateToProperties()
{
// Copy named property values from this.State to indexes.Properties. The set of property names will not change.
// Note: TProperties is specified on IIndexableGrain with a "where TProperties: new()" constraint.
- var properties = indexes.Properties ?? Activator.CreateInstance(tProperties);
- tProperties.GetProperties(BindingFlags.Public | BindingFlags.Instance).ForEach(p => p.SetValue(properties, tGrainState.GetProperty(p.Name).GetValue(state)));
+ var properties = indexes.Properties ?? Activator.CreateInstance(indexes.PropertiesType);
+ indexes.PropertiesType
+ .GetProperties(BindingFlags.Public | BindingFlags.Instance)
+ .ForEach(p => p.SetValue(properties, grainStateType.GetProperty(p.Name).GetValue(state)));
return properties;
}
-
- indexes.Properties = tProperties.IsAssignableFrom(tGrainState) ? state : mapStateToProperties();
}
-
- this.interfaceToIndexMap.ForEach(kvp => createOrUpdatePropertiesFromState(kvp.Value));
}
///
/// This method checks the list of cached indexes, and if any index does not have a before-image, it will create
/// one for it. As before-images are stored as an immutable field, a new map is created in this process.
- ///
+ ///
/// This method is called on activation of the grain, and when the UpdateIndexes method detects an inconsistency
/// between the indexes in the index handler and the cached indexes of the current grain.
///
@@ -78,14 +80,14 @@ void addMissingBeforeImages(InterfaceIndexes indexes)
var oldBefImgs = indexes.BeforeImages.Value;
object getImage(string indexName, IIndexUpdateGenerator upGen)
- => !force && oldBefImgs.ContainsKey(indexName) ? oldBefImgs[indexName] : upGen.ExtractIndexImage(indexes.Properties);
+ => !force && oldBefImgs.TryGetValue(indexName, out var value) ? value : upGen.ExtractIndexImage(indexes.Properties);
indexes.BeforeImages = (indexes.NamedIndexes
.ToDictionary(kvp => kvp.Key, kvp => getImage(kvp.Key, kvp.Value.UpdateGenerator)) as IDictionary)
.AsImmutable();
}
- this.MapStateToProperties(state);
+ MapStateToProperties(state);
this.interfaceToIndexMap.ForEach(kvp => addMissingBeforeImages(kvp.Value));
}
@@ -99,7 +101,7 @@ internal void UpdateBeforeImages(InterfaceToUpdatesMap interfaceToUpdatesMap)
void updateBeforeImages(InterfaceIndexes indexes, IReadOnlyDictionary updates)
{
IDictionary befImgs = new Dictionary(indexes.BeforeImages.Value);
- foreach ((var indexName, var opType) in updates.Select(u => (u.Key, u.Value.OperationType)))
+ foreach (var (indexName, opType) in updates.Select(u => (u.Key, u.Value.OperationType)))
{
if (opType == IndexOperationType.Update || opType == IndexOperationType.Insert)
{
diff --git a/src/Orleans.Indexing/Core/IndexFactory.cs b/src/Orleans.Indexing/Core/IndexFactory.cs
index 36b1fa9..aa81441 100644
--- a/src/Orleans.Indexing/Core/IndexFactory.cs
+++ b/src/Orleans.Indexing/Core/IndexFactory.cs
@@ -39,8 +39,7 @@ public IndexFactory(IndexManager im, IGrainFactory gf)
/// the filter expression of the query
/// the observer object to be called on every grain found for the query
/// the result of the query
- public Task GetActiveGrains(Expression> filterExpr,
- IAsyncBatchObserver queryResultObserver) where TIGrain : IIndexableGrain
+ public Task GetActiveGrains(Expression> filterExpr, IAsyncBatchObserver queryResultObserver) where TIGrain : IIndexableGrain
=> this.GetActiveGrains().Where(filterExpr).ObserveResults(queryResultObserver);
///
@@ -53,8 +52,7 @@ public Task GetActiveGrains(Expressionthe filter expression of the query
/// the observer object to be called on every grain found for the query
/// the result of the query
- public Task GetActiveGrains(IStreamProvider streamProvider,
- Expression> filterExpr, IAsyncBatchObserver queryResultObserver) where TIGrain : IIndexableGrain
+ public Task GetActiveGrains(IStreamProvider streamProvider, Expression> filterExpr, IAsyncBatchObserver queryResultObserver) where TIGrain : IIndexableGrain
=> this.GetActiveGrains(streamProvider).Where(filterExpr).ObserveResults(queryResultObserver);
///
@@ -64,7 +62,7 @@ public Task GetActiveGrains(IStreamProvider streamProvider
/// the property type to query over
/// the query to lookup all active grains of a given type
public IOrleansQueryable GetActiveGrains() where TIGrain : IIndexableGrain
- => this.GetActiveGrains(this.indexManager.ServiceProvider.GetRequiredServiceByName(IndexingConstants.INDEXING_STREAM_PROVIDER_NAME));
+ => this.GetActiveGrains(this.indexManager.ServiceProvider.GetRequiredKeyedService(IndexingConstants.INDEXING_STREAM_PROVIDER_NAME));
///
/// This method queries the active grains for the given grain interface.
@@ -159,7 +157,7 @@ internal static void ValidateIndexType(Type idxType, PropertyInfo indexedPropert
internal static void RegisterIndexWorkflowQueueGrainServices(IServiceCollection services, Type grainInterfaceType, IndexingOptions indexingOptions, bool isFaultTolerant)
{
- for (int i = 0; i < indexingOptions.NumWorkflowQueuesPerInterface; ++i)
+ for (var i = 0; i < indexingOptions.NumWorkflowQueuesPerInterface; ++i)
{
var seq = i; // Captured by the lambda
services.AddGrainService(sp => new IndexWorkflowQueueGrainService(sp.GetRequiredService(), grainInterfaceType, seq, isFaultTolerant));
@@ -171,8 +169,7 @@ internal static void RegisterIndexWorkflowQueueGrainServices(IServiceCollection
#region private functions
- private static IIndexUpdateGenerator CreateIndexUpdateGenFromProperty(PropertyInfo indexedProperty)
- => new IndexUpdateGenerator(indexedProperty);
+ static IIndexUpdateGenerator CreateIndexUpdateGenFromProperty(PropertyInfo indexedProperty) => new IndexUpdateGenerator(indexedProperty);
#endregion private functions
}
diff --git a/src/Orleans.Indexing/Core/IndexMetaData.cs b/src/Orleans.Indexing/Core/IndexMetaData.cs
index a1edaf9..e101f59 100644
--- a/src/Orleans.Indexing/Core/IndexMetaData.cs
+++ b/src/Orleans.Indexing/Core/IndexMetaData.cs
@@ -6,6 +6,7 @@ namespace Orleans.Indexing
/// The meta data that is stored beside the index
///
[Serializable]
+ [GenerateSerializer]
public class IndexMetaData
{
private Type _indexType;
diff --git a/src/Orleans.Indexing/Core/IndexRegistry.cs b/src/Orleans.Indexing/Core/IndexRegistry.cs
index 138e00f..fd08fdb 100644
--- a/src/Orleans.Indexing/Core/IndexRegistry.cs
+++ b/src/Orleans.Indexing/Core/IndexRegistry.cs
@@ -5,12 +5,10 @@ namespace Orleans.Indexing
{
internal class IndexRegistry
{
- private IDictionary IndexesByInterfaceType { get; set; } = new Dictionary();
-
- private IDictionary IndexedInterfacesByGrainType = new Dictionary();
- private IDictionary> IndexedInterfaceToGrainTypes = new Dictionary>();
-
- private IDictionary> GrainsToPropertyNullValues = new Dictionary>();
+ IDictionary IndexesByInterfaceType { get; set; } = new Dictionary();
+ IDictionary IndexedInterfacesByGrainType = new Dictionary();
+ IDictionary> IndexedInterfaceToGrainTypes = new Dictionary>();
+ IDictionary> GrainsToPropertyNullValues = new Dictionary>();
internal static IReadOnlyDictionary EmptyPropertyNullValues { get; } = new Dictionary();
@@ -26,7 +24,7 @@ internal bool TryGetValue(Type interfaceType, out NamedIndexMap interfaceIndexes
internal bool ContainsKey(Type interfaceType) => this.IndexesByInterfaceType.ContainsKey(interfaceType);
internal void SetGrainIndexes(Type grainClassType, Type[] indexedInterfaces, IReadOnlyDictionary nullValuesDictionary)
- {
+ {
this.IndexedInterfacesByGrainType[grainClassType] = indexedInterfaces;
foreach (var indexedInterface in indexedInterfaces) {
this.IndexedInterfaceToGrainTypes.GetOrAdd(indexedInterface, new List()).Add(grainClassType);
diff --git a/src/Orleans.Indexing/Core/IndexUpdateGenerator.cs b/src/Orleans.Indexing/Core/IndexUpdateGenerator.cs
index 380994e..6a4dd0f 100644
--- a/src/Orleans.Indexing/Core/IndexUpdateGenerator.cs
+++ b/src/Orleans.Indexing/Core/IndexUpdateGenerator.cs
@@ -4,9 +4,10 @@
namespace Orleans.Indexing
{
///
- ///
+ ///
///
[Serializable]
+ [GenerateSerializer]
internal class IndexUpdateGenerator : IIndexUpdateGenerator
{
PropertyInfo prop;
@@ -20,7 +21,7 @@ public IndexUpdateGenerator(PropertyInfo prop)
public IMemberUpdate CreateMemberUpdate(object gProps, object befImg)
{
- object aftImg = gProps == null ? null : ExtractIndexImage(gProps);
+ var aftImg = gProps == null ? null : ExtractIndexImage(gProps);
return new MemberUpdate(befImg, aftImg);
}
diff --git a/src/Orleans.Indexing/Core/InterfaceIndexes.cs b/src/Orleans.Indexing/Core/InterfaceIndexes.cs
index 4b2a8f3..2e66615 100644
--- a/src/Orleans.Indexing/Core/InterfaceIndexes.cs
+++ b/src/Orleans.Indexing/Core/InterfaceIndexes.cs
@@ -7,8 +7,19 @@ namespace Orleans.Indexing
{
internal class InterfaceIndexes
{
+ ///
+ /// The indexes defined on the indexable object.
+ ///
internal NamedIndexMap NamedIndexes { get; }
+
+ ///
+ /// The indexed properties object.
+ ///
internal object Properties { get; set; }
+
+ ///
+ /// The type of the indexed properties object.
+ ///
internal Type PropertiesType => this.NamedIndexes.PropertiesClassType;
///
diff --git a/src/Orleans.Indexing/Core/Interfaces/IIndexFactory.cs b/src/Orleans.Indexing/Core/Interfaces/IIndexFactory.cs
index 2078ccb..b90ff85 100644
--- a/src/Orleans.Indexing/Core/Interfaces/IIndexFactory.cs
+++ b/src/Orleans.Indexing/Core/Interfaces/IIndexFactory.cs
@@ -16,8 +16,7 @@ public interface IIndexFactory
/// the filter expression of the query
/// the observer object to be called on every grain found for the query
/// the result of the query
- Task GetActiveGrains(Expression> filterExpr,
- IAsyncBatchObserver queryResultObserver) where TIGrain : IIndexableGrain;
+ Task GetActiveGrains(Expression> filterExpr, IAsyncBatchObserver queryResultObserver) where TIGrain : IIndexableGrain;
///
/// This method queries the active grains for the given grain interface and the filter expression. The filter
@@ -29,8 +28,7 @@ Task GetActiveGrains(Expression> f
/// the filter expression of the query
/// the observer object to be called on every grain found for the query
/// the result of the query
- Task GetActiveGrains(IStreamProvider streamProvider,
- Expression> filterExpr, IAsyncBatchObserver queryResultObserver) where TIGrain : IIndexableGrain;
+ Task GetActiveGrains(IStreamProvider streamProvider, Expression> filterExpr, IAsyncBatchObserver queryResultObserver) where TIGrain : IIndexableGrain;
///
/// This method queries the active grains for the given grain interface.
diff --git a/src/Orleans.Indexing/Core/Interfaces/IIndexUpdateGenerator.cs b/src/Orleans.Indexing/Core/Interfaces/IIndexUpdateGenerator.cs
index e6f9d1b..f0d560e 100644
--- a/src/Orleans.Indexing/Core/Interfaces/IIndexUpdateGenerator.cs
+++ b/src/Orleans.Indexing/Core/Interfaces/IIndexUpdateGenerator.cs
@@ -1,7 +1,7 @@
namespace Orleans.Indexing
{
///
- /// This interface specifies a method that each index should define for extracting the part of the grain state it is interested in.
+ /// This interface specifies a method that each index should define for extracting the part of the grain state it is interested in.
/// The interface also specifies a method creating an update object after an update happens on the indexed grain
///
public interface IIndexUpdateGenerator
@@ -14,7 +14,7 @@ public interface IIndexUpdateGenerator
object ExtractIndexImage(object indexedGrainProperties);
///
- /// Creates an update object after receiving the current state of the grain and an earlier image of the grain
+ /// Creates an update object after receiving the current state of the grain and an earlier image of the grain
///
/// the properties of the grain from which we want to extract some state to be indexed
/// the before-image of the indexedGrain, which was captured earlier via a call to ExtractIndexImage(indexedGrain)
diff --git a/src/Orleans.Indexing/Core/Interfaces/IIndexableGrain.cs b/src/Orleans.Indexing/Core/Interfaces/IIndexableGrain.cs
index 05ad048..5fdf352 100644
--- a/src/Orleans.Indexing/Core/Interfaces/IIndexableGrain.cs
+++ b/src/Orleans.Indexing/Core/Interfaces/IIndexableGrain.cs
@@ -25,6 +25,7 @@ namespace Orleans.Indexing
public interface IIndexableGrain : IGrain
{
#region Workflow Fault-Tolerant required methods // TODO: Default implementation in C# 8.0
+
///
/// This method returns the set of active workflow IDs for a Total Index
///
@@ -34,6 +35,7 @@ public interface IIndexableGrain : IGrain
/// This method removes a workflow ID from the list of active workflow IDs for a Total Index
///
Task RemoveFromActiveWorkflowIds(HashSet removedWorkflowId);
+
#endregion Workflow Fault-Tolerant required methods
}
}
diff --git a/src/Orleans.Indexing/Core/MemberUpdateImpl/MemberUpdate.cs b/src/Orleans.Indexing/Core/MemberUpdateImpl/MemberUpdate.cs
index 8721f88..555b8ed 100644
--- a/src/Orleans.Indexing/Core/MemberUpdateImpl/MemberUpdate.cs
+++ b/src/Orleans.Indexing/Core/MemberUpdateImpl/MemberUpdate.cs
@@ -7,14 +7,15 @@ namespace Orleans.Indexing
///
/// MemberUpdate is a generic implementation of IMemberUpdate that relies on a copy of beforeImage and afterImage, without
/// keeping any semantic information about the actual change that happened.
- ///
+ ///
/// This class assumes that befImg and aftImg passed to it won't be altered later on, so they are immutable.
///
[Serializable]
+ [GenerateSerializer]
internal class MemberUpdate : IMemberUpdate
{
- private object _befImg;
- private object _aftImg;
+ object _befImg;
+ object _aftImg;
public IndexUpdateMode UpdateMode => IndexUpdateMode.NonTentative;
@@ -35,7 +36,7 @@ public MemberUpdate(object befImg, object aftImg) : this(befImg, aftImg, GetOper
{
}
- private static IndexOperationType GetOperationType(object befImg, object aftImg)
+ static IndexOperationType GetOperationType(object befImg, object aftImg)
{
if (befImg == null)
{
diff --git a/src/Orleans.Indexing/Core/MemberUpdateImpl/MemberUpdateOverriddenMode.cs b/src/Orleans.Indexing/Core/MemberUpdateImpl/MemberUpdateOverriddenMode.cs
index 01565e7..bea8533 100644
--- a/src/Orleans.Indexing/Core/MemberUpdateImpl/MemberUpdateOverriddenMode.cs
+++ b/src/Orleans.Indexing/Core/MemberUpdateImpl/MemberUpdateOverriddenMode.cs
@@ -6,6 +6,7 @@ namespace Orleans.Indexing
/// This class is a wrapper around another IMemberUpdate which adds an update mode.
///
[Serializable]
+ [GenerateSerializer]
internal class MemberUpdateOverriddenMode : IMemberUpdate
{
private IMemberUpdate _update;
diff --git a/src/Orleans.Indexing/Core/MemberUpdateImpl/MemberUpdateOverriddenOperation.cs b/src/Orleans.Indexing/Core/MemberUpdateImpl/MemberUpdateOverriddenOperation.cs
index 42c4209..ee36e3f 100644
--- a/src/Orleans.Indexing/Core/MemberUpdateImpl/MemberUpdateOverriddenOperation.cs
+++ b/src/Orleans.Indexing/Core/MemberUpdateImpl/MemberUpdateOverriddenOperation.cs
@@ -7,6 +7,7 @@ namespace Orleans.Indexing
/// the actual operation in the original update
///
[Serializable]
+ [GenerateSerializer]
internal class MemberUpdateOverriddenOperation : IMemberUpdate
{
private IMemberUpdate _update;
diff --git a/src/Orleans.Indexing/Core/MemberUpdateImpl/MemberUpdateReverseTentative.cs b/src/Orleans.Indexing/Core/MemberUpdateImpl/MemberUpdateReverseTentative.cs
index adcfda3..a7d68dc 100644
--- a/src/Orleans.Indexing/Core/MemberUpdateImpl/MemberUpdateReverseTentative.cs
+++ b/src/Orleans.Indexing/Core/MemberUpdateImpl/MemberUpdateReverseTentative.cs
@@ -8,6 +8,7 @@ namespace Orleans.Indexing
/// This class is a wrapper around another IMemberUpdate which reverses its operation
///
[Serializable]
+ [GenerateSerializer]
internal class MemberUpdateReverseTentative : IMemberUpdate
{
private IMemberUpdate _update;
diff --git a/src/Orleans.Indexing/Core/NamedIndexMap.cs b/src/Orleans.Indexing/Core/NamedIndexMap.cs
index 9f9aab6..d287965 100644
--- a/src/Orleans.Indexing/Core/NamedIndexMap.cs
+++ b/src/Orleans.Indexing/Core/NamedIndexMap.cs
@@ -7,7 +7,7 @@ namespace Orleans.Indexing
{
internal class NamedIndexMap : IEnumerable>
{
- private IDictionary IndexesByName { get; set; } = new Dictionary();
+ IDictionary IndexesByName { get; set; } = new Dictionary();
internal Type PropertiesClassType { get; }
diff --git a/src/Orleans.Indexing/Core/Utils/ApplicationPartsIndexableGrainLoader.cs b/src/Orleans.Indexing/Core/Utils/ApplicationPartsIndexableGrainLoader.cs
index 4a0dfb7..df51219 100644
--- a/src/Orleans.Indexing/Core/Utils/ApplicationPartsIndexableGrainLoader.cs
+++ b/src/Orleans.Indexing/Core/Utils/ApplicationPartsIndexableGrainLoader.cs
@@ -4,8 +4,9 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
-using Orleans.ApplicationParts;
+//using Orleans.ApplicationParts;
using Orleans.Hosting;
using Orleans.Runtime;
@@ -13,15 +14,15 @@ namespace Orleans.Indexing
{
internal class ApplicationPartsIndexableGrainLoader
{
- private readonly IndexManager indexManager;
- private readonly ILogger logger;
+ readonly IndexManager indexManager;
+ readonly ILogger logger;
- private static readonly Type indexAttrType = typeof(IndexAttribute);
- private static readonly PropertyInfo indexTypeProperty = typeof(IndexAttribute).GetProperty(nameof(IndexAttribute.IndexType));
- private static readonly PropertyInfo isEagerProperty = typeof(IndexAttribute).GetProperty(nameof(IndexAttribute.IsEager));
- private static readonly PropertyInfo isUniqueProperty = typeof(IndexAttribute).GetProperty(nameof(IndexAttribute.IsUnique));
- private static readonly PropertyInfo maxEntriesPerBucketProperty = typeof(IndexAttribute).GetProperty(nameof(IndexAttribute.MaxEntriesPerBucket));
- private static readonly PropertyInfo transactionalVariantProperty = typeof(TransactionalIndexVariantAttribute).GetProperty(nameof(TransactionalIndexVariantAttribute.TransactionalIndexType));
+ static readonly Type indexAttrType = typeof(IndexAttribute);
+ static readonly PropertyInfo indexTypeProperty = typeof(IndexAttribute).GetProperty(nameof(IndexAttribute.IndexType));
+ static readonly PropertyInfo isEagerProperty = typeof(IndexAttribute).GetProperty(nameof(IndexAttribute.IsEager));
+ static readonly PropertyInfo isUniqueProperty = typeof(IndexAttribute).GetProperty(nameof(IndexAttribute.IsUnique));
+ static readonly PropertyInfo maxEntriesPerBucketProperty = typeof(IndexAttribute).GetProperty(nameof(IndexAttribute.MaxEntriesPerBucket));
+ static readonly PropertyInfo transactionalVariantProperty = typeof(TransactionalIndexVariantAttribute).GetProperty(nameof(TransactionalIndexVariantAttribute.TransactionalIndexType));
internal ApplicationPartsIndexableGrainLoader(IndexManager indexManager)
{
@@ -29,10 +30,13 @@ internal ApplicationPartsIndexableGrainLoader(IndexManager indexManager)
this.logger = this.indexManager.LoggerFactory.CreateLoggerWithFullCategoryName();
}
- private static Type[] GetIndexedConcreteGrainClasses(IApplicationPartManager applicationPartManager, ILogger logger = null)
- => applicationPartManager.ApplicationParts.OfType()
- .SelectMany(part => part.Assembly.GetIndexedGrainClasses())
- .ToArray();
+ // private static Type[] GetIndexedConcreteGrainClasses(IApplicationPartManager applicationPartManager, ILogger logger = null)
+ // => applicationPartManager.ApplicationParts.OfType()
+ // .SelectMany(part => part.Assembly.GetIndexedGrainClasses())
+ // .ToArray();
+
+ static Type[] GetIndexedConcreteGrainClasses(HostBuilderContext context, ILogger logger = null)
+ => throw new NotImplementedException();
///
/// This method crawls the assemblies and looks for the index definitions (determined by extending the
@@ -40,7 +44,7 @@ private static Type[] GetIndexedConcreteGrainClasses(IApplicationPartManager app
///
/// An index registry for the silo.
internal IndexRegistry CreateIndexRegistry()
- => GetIndexRegistry(this, GetIndexedConcreteGrainClasses(this.indexManager.ApplicationPartManager, this.logger));
+ => GetIndexRegistry(this, GetIndexedConcreteGrainClasses(default, this.logger));
///
/// This method crawls the assemblies and looks for the index definitions (determined by extending the
@@ -55,7 +59,7 @@ internal static void RegisterGrainServices(HostBuilderContext context, IServiceC
var indexedClasses = new HashSet();
var indexedInterfaces = new HashSet();
- foreach (var grainClassType in GetIndexedConcreteGrainClasses(context.GetApplicationPartManager()))
+ foreach (var grainClassType in GetIndexedConcreteGrainClasses(context))
{
var consistencyScheme = grainClassType.GetConsistencyScheme();
if (consistencyScheme == ConsistencyScheme.Transactional)
@@ -83,7 +87,7 @@ internal static void RegisterGrainServices(HostBuilderContext context, IServiceC
{
// Queues are per-interface, not per-index.
IndexFactory.RegisterIndexWorkflowQueueGrainServices(services, grainInterfaceType, indexingOptions,
- consistencyScheme == ConsistencyScheme.FaultTolerantWorkflow);
+ isFaultTolerant: consistencyScheme == ConsistencyScheme.FaultTolerantWorkflow);
}
createQueues = false;
@@ -117,7 +121,7 @@ internal static IndexRegistry GetIndexRegistry(ApplicationPartsIndexableGrainLoa
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- private static void GetIndexesForASingleGrainType(ApplicationPartsIndexableGrainLoader loader, IndexRegistry registry, Type grainClassType)
+ static void GetIndexesForASingleGrainType(ApplicationPartsIndexableGrainLoader loader, IndexRegistry registry, Type grainClassType)
{
// First see if any indexed interfaces on this grain were already encountered on another grain (unless we're
// in validation mode, which doesn't create the indexes).
@@ -189,7 +193,7 @@ bool isInDict(string propName)
foreach (var indexableBaseInterface in indexableBaseInterfaces)
{
- // ... and its generic argument is a class (TProperties)...
+ // ... and its generic argument is a class (TProperties)...
var propertiesClassType = indexableBaseInterface.GetGenericArguments()[0];
if (propertiesClassType.GetTypeInfo().IsClass)
{
@@ -204,7 +208,7 @@ bool isInDict(string propName)
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- private static bool? CreateIndexesForASingleInterface(ApplicationPartsIndexableGrainLoader loader, IndexRegistry registry,
+ static bool? CreateIndexesForASingleInterface(ApplicationPartsIndexableGrainLoader loader, IndexRegistry registry,
Type propertiesClassType, Type grainInterfaceType, Type grainClassType,
ConsistencyScheme consistencyScheme, bool? grainIndexesAreEager)
{
@@ -261,11 +265,10 @@ bool isInDict(string propName)
return grainIndexesAreEager;
}
- private void CreateIndex(Type propertiesArg, Type grainInterfaceType, NamedIndexMap indexesOnGrain, PropertyInfo property,
- string indexName, Type indexType, bool isEager, bool isUnique, int maxEntriesPerBucket)
+ void CreateIndex(Type propertiesArg, Type grainInterfaceType, NamedIndexMap indexesOnGrain, PropertyInfo property, string indexName, Type indexType, bool isEager, bool isUnique, int maxEntriesPerBucket)
{
indexesOnGrain[indexName] = this.indexManager.IndexFactory.CreateIndex(indexType, indexName, isUnique, isEager, maxEntriesPerBucket, property);
- this.logger.Info($"Index created: Interface = {grainInterfaceType.Name}, property = {propertiesArg.Name}, index = {indexName}");
+ this.logger.Info(IndexingErrorCode.Indexing, $"Index created: Interface = {grainInterfaceType.Name}, property = {propertiesArg.Name}, index = {indexName}");
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@@ -273,7 +276,7 @@ private static void ValidateSingleIndex(IndexAttribute indexAttr, Type grainInte
PropertyInfo propInfo, bool? grainIndexesAreEager, ConsistencyScheme consistencyScheme, bool isEager, bool isUnique)
{
var indexType = (Type)indexTypeProperty.GetValue(indexAttr);
- var isTotalIndex = indexType.IsTotalIndex();
+ //var isTotalIndex = indexType.IsTotalIndex();
var isPerSiloIndex = indexType.IsPartitionedPerSiloIndex();
var isFaultTolerantWorkflow = consistencyScheme == ConsistencyScheme.FaultTolerantWorkflow;
var isTransactional = consistencyScheme == ConsistencyScheme.Transactional;
diff --git a/src/Orleans.Indexing/Core/Utils/AsyncLock.cs b/src/Orleans.Indexing/Core/Utils/AsyncLock.cs
index 63e749d..16e8f6c 100644
--- a/src/Orleans.Indexing/Core/Utils/AsyncLock.cs
+++ b/src/Orleans.Indexing/Core/Utils/AsyncLock.cs
@@ -62,7 +62,7 @@ public Task LockAsync()
: waitTask.ContinueWith(_ => releaser, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
- private class LockReleaser : IDisposable
+ class LockReleaser : IDisposable
{
private AsyncLock target;
diff --git a/src/Orleans.Indexing/Core/Utils/IndexUtils.cs b/src/Orleans.Indexing/Core/Utils/IndexUtils.cs
index 0b96674..77dc6c0 100644
--- a/src/Orleans.Indexing/Core/Utils/IndexUtils.cs
+++ b/src/Orleans.Indexing/Core/Utils/IndexUtils.cs
@@ -236,7 +236,7 @@ internal static void ShallowCopyFrom(this object dest, object src)
internal static IGrainStorage GetGrainStorage(IServiceProvider services, string storageName)
{
var storageProvider = !string.IsNullOrEmpty(storageName)
- ? services.GetServiceByName(storageName)
+ ? services.GetKeyedService(storageName)
: services.GetService();
string failedProviderName() => string.IsNullOrEmpty(storageName) ? "default storage provider" : $"storage provider with the name {storageName}";
return storageProvider ?? throw new IndexConfigurationException($"No {failedProviderName()} was found while attempting to create index state storage.");
@@ -251,7 +251,7 @@ internal static bool RequireIndexInterfaceType(this Type indexType)
internal static bool IsPartitionedPerSiloIndex(this Type indexType)
=> indexType.RequireIndexInterfaceType() && typeof(IActiveHashIndexPartitionedPerSilo).IsAssignableFrom(indexType);
- internal static bool IsTotalIndex(this Type indexType)
+ static bool IsTotalIndex(this Type indexType)
=> indexType.RequireIndexInterfaceType() && typeof(ITotalIndex).IsAssignableFrom(indexType);
internal static bool IsTransactionalIndex(this Type indexType)
diff --git a/src/Orleans.Indexing/Exceptions/IndexConfigurationException.cs b/src/Orleans.Indexing/Exceptions/IndexConfigurationException.cs
index f028734..124b999 100644
--- a/src/Orleans.Indexing/Exceptions/IndexConfigurationException.cs
+++ b/src/Orleans.Indexing/Exceptions/IndexConfigurationException.cs
@@ -7,12 +7,14 @@ namespace Orleans.Indexing
/// This exception is thrown when an indexing configuration exception is encountered.
///
[Serializable]
+ [GenerateSerializer]
public class IndexConfigurationException : IndexException
{
public IndexConfigurationException(string message) : base(message)
{
}
+ [Obsolete]
protected IndexConfigurationException(SerializationInfo info, StreamingContext context)
: base(info, context)
{
diff --git a/src/Orleans.Indexing/Exceptions/IndexException.cs b/src/Orleans.Indexing/Exceptions/IndexException.cs
index a98f08f..53fb25a 100644
--- a/src/Orleans.Indexing/Exceptions/IndexException.cs
+++ b/src/Orleans.Indexing/Exceptions/IndexException.cs
@@ -8,12 +8,14 @@ namespace Orleans.Indexing
/// This exception is thrown when a general indexing exception is encountered, or as a base for more specific subclasses.
///
[Serializable]
+ [GenerateSerializer]
public class IndexException : OrleansException
{
public IndexException(string message) : base(message)
{
}
+ [Obsolete]
protected IndexException(SerializationInfo info, StreamingContext context)
: base(info, context)
{
diff --git a/src/Orleans.Indexing/Exceptions/IndexOperationException.cs b/src/Orleans.Indexing/Exceptions/IndexOperationException.cs
index a0accfd..7cf3849 100644
--- a/src/Orleans.Indexing/Exceptions/IndexOperationException.cs
+++ b/src/Orleans.Indexing/Exceptions/IndexOperationException.cs
@@ -7,12 +7,14 @@ namespace Orleans.Indexing
/// This exception is thrown when an indexing operation exception is encountered.
///
[Serializable]
+ [GenerateSerializer]
public class IndexOperationException : IndexException
{
public IndexOperationException(string message) : base(message)
{
}
+ [Obsolete]
protected IndexOperationException(SerializationInfo info, StreamingContext context)
: base(info, context)
{
diff --git a/src/Orleans.Indexing/Exceptions/UniquenessConstraintViolatedException.cs b/src/Orleans.Indexing/Exceptions/UniquenessConstraintViolatedException.cs
index ee1cfca..dae86a0 100644
--- a/src/Orleans.Indexing/Exceptions/UniquenessConstraintViolatedException.cs
+++ b/src/Orleans.Indexing/Exceptions/UniquenessConstraintViolatedException.cs
@@ -7,12 +7,14 @@ namespace Orleans.Indexing
/// This exception is thrown when a uniqueness constraint defined on an index is violated.
///
[Serializable]
+ [GenerateSerializer]
public class UniquenessConstraintViolatedException : IndexException
{
public UniquenessConstraintViolatedException(string message) : base(message)
{
}
+ [Obsolete]
protected UniquenessConstraintViolatedException(SerializationInfo info, StreamingContext context)
: base(info, context)
{
diff --git a/src/Orleans.Indexing/Exceptions/WorkflowIndexException.cs b/src/Orleans.Indexing/Exceptions/WorkflowIndexException.cs
index 0f751cc..e3aa8ec 100644
--- a/src/Orleans.Indexing/Exceptions/WorkflowIndexException.cs
+++ b/src/Orleans.Indexing/Exceptions/WorkflowIndexException.cs
@@ -7,12 +7,14 @@ namespace Orleans.Indexing
/// This exception is thrown when a workflow indexing exception is encountered.
///
[Serializable]
+ [GenerateSerializer]
public class WorkflowIndexException : IndexException
{
public WorkflowIndexException(string message) : base(message)
{
}
+ [Obsolete]
protected WorkflowIndexException(SerializationInfo info, StreamingContext context)
: base(info, context)
{
diff --git a/src/Orleans.Indexing/Extensions/GrainExtensions.cs b/src/Orleans.Indexing/Extensions/GrainExtensions.cs
index 07b0a35..becb26c 100644
--- a/src/Orleans.Indexing/Extensions/GrainExtensions.cs
+++ b/src/Orleans.Indexing/Extensions/GrainExtensions.cs
@@ -22,7 +22,7 @@ internal static TGrainInterface AsReference(this IAddressable g
///
/// Converts this grain to the grain interface identified by grainInterfaceType.
- ///
+ ///
/// Finally, it casts it to the type provided as TGrainInterface. The caller should make sure that grainInterfaceType extends TGrainInterface.
///
/// output grain interface type, which grainInterfaceType extends it
@@ -33,10 +33,10 @@ internal static TGrainInterface AsReference(this IAddressable g
///
internal static TGrainInterface AsReference(this IAddressable grain, SiloIndexManager siloIndexManager, Type grainInterfaceType) where TGrainInterface: IGrain
=> (grain != null)
- ? (TGrainInterface)siloIndexManager.GrainReferenceRuntime.Convert(grain.AsWeaklyTypedReference(), grainInterfaceType)
+ ? (TGrainInterface)siloIndexManager.GrainReferenceRuntime.Cast(grain.AsWeaklyTypedReference(), grainInterfaceType)
: throw new ArgumentNullException("grain", "Cannot pass null as an argument to AsReference");
- private const string WRONG_GRAIN_ERROR_MSG = "Passing a half baked grain as an argument. It is possible that you instantiated a grain class explicitly, as a regular object and not via Orleans runtime or via proper test mocking";
+ const string WRONG_GRAIN_ERROR_MSG = "Passing a half baked grain as an argument. It is possible that you instantiated a grain class explicitly, as a regular object and not via Orleans runtime or via proper test mocking";
internal static GrainReference AsWeaklyTypedReference(this IAddressable grain)
{
@@ -52,7 +52,7 @@ internal static GrainReference AsWeaklyTypedReference(this IAddressable grain)
}
return grain is GrainService grainService
- ? grainService.GetGrainReference()
+ ? grainService.GrainReference
: throw new ArgumentException(string.Format("AsWeaklyTypedReference has been called on an unexpected type: {0}.", grain.GetType().FullName), "grain");
}
diff --git a/src/Orleans.Indexing/Extensions/IndexExtensions.cs b/src/Orleans.Indexing/Extensions/IndexExtensions.cs
index b973152..baca60b 100644
--- a/src/Orleans.Indexing/Extensions/IndexExtensions.cs
+++ b/src/Orleans.Indexing/Extensions/IndexExtensions.cs
@@ -18,11 +18,11 @@ public static Task ApplyIndexUpdateBatch(this IIndexInterface index, SiloI
{
if (index is IActiveHashIndexPartitionedPerSilo)
{
- var bucketInCurrentSilo = siloIndexManager.GetGrainService(
- GetAHashIndexPartitionedPerSiloGrainReference(siloIndexManager,
- IndexUtils.GetIndexNameFromIndexGrain((IAddressable)index), index.GetType().GetGenericArguments()[1],
- siloAddress
- ));
+ var grainReference = GetActiveHashIndexPartitionedPerSiloGrainReference(
+ siloIndexManager,
+ IndexUtils.GetIndexNameFromIndexGrain((IAddressable)index), index.GetType().GetGenericArguments()[1],
+ siloAddress);
+ var bucketInCurrentSilo = siloIndexManager.GetGrainService(grainReference);
return bucketInCurrentSilo.DirectApplyIndexUpdateBatch(iUpdates, isUniqueIndex, idxMetaData/*, siloAddress*/);
}
return index.DirectApplyIndexUpdateBatch(iUpdates, isUniqueIndex, idxMetaData, siloAddress);
@@ -38,19 +38,21 @@ internal static Task ApplyIndexUpdate(this IIndexInterface index, SiloInde
{
if (index is IActiveHashIndexPartitionedPerSilo)
{
- var bucketInCurrentSilo = siloIndexManager.GetGrainService(
- GetAHashIndexPartitionedPerSiloGrainReference(siloIndexManager,
- IndexUtils.GetIndexNameFromIndexGrain((IAddressable)index), index.GetType().GetGenericArguments()[1],
- siloAddress
- ));
+ var grainReference = GetActiveHashIndexPartitionedPerSiloGrainReference(
+ siloIndexManager,
+ IndexUtils.GetIndexNameFromIndexGrain((IAddressable)index), index.GetType().GetGenericArguments()[1],
+ siloAddress);
+ var bucketInCurrentSilo = siloIndexManager.GetGrainService(grainReference);
return bucketInCurrentSilo.DirectApplyIndexUpdate(updatedGrain, update, idxMetaData.IsUniqueIndex, idxMetaData/*, siloAddress*/);
}
return index.DirectApplyIndexUpdate(updatedGrain, update, idxMetaData.IsUniqueIndex, idxMetaData, siloAddress);
}
- private static GrainReference GetAHashIndexPartitionedPerSiloGrainReference(SiloIndexManager siloIndexManager, string indexName, Type grainInterfaceType, SiloAddress siloAddress)
- => siloIndexManager.MakeGrainServiceGrainReference(IndexingConstants.HASH_INDEX_PARTITIONED_PER_SILO_BUCKET_GRAIN_SERVICE_TYPE_CODE,
- IndexUtils.GetIndexGrainPrimaryKey(grainInterfaceType, indexName), siloAddress);
+ static GrainReference GetActiveHashIndexPartitionedPerSiloGrainReference(SiloIndexManager siloIndexManager, string indexName, Type grainInterfaceType, SiloAddress siloAddress) =>
+ siloIndexManager.MakeGrainServiceGrainReference(
+ typeData: IndexingConstants.HASH_INDEX_PARTITIONED_PER_SILO_BUCKET_GRAIN_SERVICE_TYPE_CODE,
+ systemGrainId: IndexUtils.GetIndexGrainPrimaryKey(grainInterfaceType, indexName),
+ siloAddress: siloAddress);
}
}
diff --git a/src/Orleans.Indexing/Facet/Implementations/FaultTolerantIndexedGrainStateWrapper.cs b/src/Orleans.Indexing/Facet/Implementations/FaultTolerantIndexedGrainStateWrapper.cs
index 2beb47e..4f44de9 100644
--- a/src/Orleans.Indexing/Facet/Implementations/FaultTolerantIndexedGrainStateWrapper.cs
+++ b/src/Orleans.Indexing/Facet/Implementations/FaultTolerantIndexedGrainStateWrapper.cs
@@ -8,6 +8,7 @@ namespace Orleans.Indexing.Facet
///
/// the type of user state
[Serializable]
+ [GenerateSerializer]
public class FaultTolerantIndexedGrainStateWrapper : IndexedGrainStateWrapper
where TGrainState : new()
{
diff --git a/src/Orleans.Indexing/Facet/Implementations/FaultTolerantWorkflowIndexedState.cs b/src/Orleans.Indexing/Facet/Implementations/FaultTolerantWorkflowIndexedState.cs
index bd602ff..c961033 100644
--- a/src/Orleans.Indexing/Facet/Implementations/FaultTolerantWorkflowIndexedState.cs
+++ b/src/Orleans.Indexing/Facet/Implementations/FaultTolerantWorkflowIndexedState.cs
@@ -13,22 +13,22 @@ internal class FaultTolerantWorkflowIndexedState : NonFaultTolerant
ILifecycleParticipant
where TGrainState : class, new()
{
- private readonly IGrainFactory _grainFactory; // TODO: standardize leading _ or not; and don't do this._
+ readonly IGrainFactory grainFactory; // TODO: standardize leading _ or not; and don't do this._
public FaultTolerantWorkflowIndexedState(
IServiceProvider sp,
IIndexedStateConfiguration config,
- IGrainActivationContext context,
+ IGrainContext context,
IGrainFactory grainFactory
) : base(sp, config, context)
{
- this._grainFactory = grainFactory;
+ this.grainFactory = grainFactory;
base.getWorkflowIdFunc = () => this.GenerateUniqueWorkflowId();
}
- private bool _hasAnyTotalIndex;
+ bool _hasAnyTotalIndex;
- private FaultTolerantIndexedGrainStateWrapper ftWrappedState => base.nonTransactionalState.State;
+ FaultTolerantIndexedGrainStateWrapper ftWrappedState => base.nonTransactionalState.State;
internal override IDictionary WorkflowQueues
{
@@ -36,7 +36,7 @@ internal override IDictionary WorkflowQueues
set => this.ftWrappedState.WorkflowQueues = value;
}
- private HashSet ActiveWorkflowsSet
+ HashSet ActiveWorkflowsSet
{
get => this.ftWrappedState.ActiveWorkflowsSet;
set => this.ftWrappedState.ActiveWorkflowsSet = value;
@@ -61,10 +61,9 @@ internal async override Task OnActivateAsync(CancellationToken ct)
// There are some remaining active workflows so they should be handled first.
this.PruneWorkflowQueuesForMissingInterfaceTypes();
await this.HandleRemainingWorkflows()
- .ContinueWith(t => Task.WhenAll(this.PruneActiveWorkflowsSetFromAlreadyHandledWorkflows(t.Result),
- base.FinishActivateAsync()));
+ .ContinueWith(t => Task.WhenAll(this.PruneActiveWorkflowsSetFromAlreadyHandledWorkflows(t.Result), base.FinishActivateAsync()), cancellationToken: ct);
}
- this._hasAnyTotalIndex = base._grainIndexes.HasAnyTotalIndex;
+ this._hasAnyTotalIndex = base.grainIndexes.HasAnyTotalIndex;
}
///
@@ -75,15 +74,17 @@ await this.HandleRemainingWorkflows()
/// a flag to determine whether only unique indexes were updated
/// determine the number of updated unique indexes
/// whether the state should be written to storage if no constraint is violated
- private protected override async Task ApplyIndexUpdates(InterfaceToUpdatesMap interfaceToUpdatesMap,
- bool updateIndexesEagerly, bool onlyUniqueIndexesWereUpdated,
- int numberOfUniqueIndexesUpdated, bool writeStateIfConstraintsAreNotViolated)
+ protected override async Task ApplyIndexUpdates(
+ InterfaceToUpdatesMap interfaceToUpdatesMap,
+ bool updateIndexesEagerly,
+ bool onlyUniqueIndexesWereUpdated,
+ int numberOfUniqueIndexesUpdated,
+ bool writeStateIfConstraintsAreNotViolated)
{
if (interfaceToUpdatesMap.IsEmpty || !this._hasAnyTotalIndex)
{
// Drop down to non-fault-tolerant
- await base.ApplyIndexUpdates(interfaceToUpdatesMap, updateIndexesEagerly, onlyUniqueIndexesWereUpdated,
- numberOfUniqueIndexesUpdated, writeStateIfConstraintsAreNotViolated);
+ await base.ApplyIndexUpdates(interfaceToUpdatesMap, updateIndexesEagerly, onlyUniqueIndexesWereUpdated, numberOfUniqueIndexesUpdated, writeStateIfConstraintsAreNotViolated);
return;
}
@@ -117,7 +118,8 @@ await base.ApplyIndexUpdates(interfaceToUpdatesMap, updateIndexesEagerly, onlyUn
{
// There is no constraint violation, so add the workflow ID to the list of active (committed/in-flight) workflows.
// Note that there is no race condition allowing the lazy update to sneak in before we add these, because grain access
- // is single-threaded unless the method is marked as interleaved; this method is called from this.WriteStateAsync, which
+ // is single-threaded unless the method is marked as interleaved;
+ // this method is called from this.WriteStateAsync, which
// is not marked as interleaved, so the queue handler call to this.GetActiveWorkflowIdsSet blocks until this method exits.
this.AddWorkflowIdsToActiveWorkflows(interfaceToUpdatesMap.Select(kvp => interfaceToUpdatesMap.WorkflowIds[kvp.Key]).ToArray());
await this.WriteStateAsync();
@@ -128,10 +130,10 @@ await base.ApplyIndexUpdates(interfaceToUpdatesMap, updateIndexesEagerly, onlyUn
}
///
- /// Handles the remaining workflows of the grain
+ /// Handles the remaining workflows of the grain
///
/// the actual list of workflow record IDs that were available in the queue(s)
- private Task> HandleRemainingWorkflows()
+ Task> HandleRemainingWorkflows()
{
// A copy of WorkflowQueues is required, because we want to iterate over it and add/remove elements from/to it.
var copyOfWorkflowQueues = new Dictionary(this.WorkflowQueues);
@@ -145,7 +147,7 @@ private Task> HandleRemainingWorkflows()
/// the grain interface type being indexed
/// the previous workflow queue responsible for handling the updates
/// the actual list of workflow record IDs that were available in this queue
- private async Task> HandleRemainingWorkflows(Type grainInterfaceType, IIndexWorkflowQueue oldWorkflowQ)
+ async Task> HandleRemainingWorkflows(Type grainInterfaceType, IIndexWorkflowQueue oldWorkflowQ)
{
// Keeps the reference to the reincarnated workflow queue, if the original workflow queue (GrainService) did not respond.
IIndexWorkflowQueue reincarnatedOldWorkflowQ = null;
@@ -156,6 +158,7 @@ private async Task> HandleRemainingWorkflows(Type grainInterfa
// First, we remove the workflow queue associated with grainInterfaceType (i.e., oldWorkflowQ) so that another call to get the
// workflow queue for grainInterfaceType gets the new workflow queue responsible for grainInterfaceType (otherwise oldWorkflowQ is returned).
this.WorkflowQueues.Remove(grainInterfaceType);
+
var newWorkflowQ = this.GetWorkflowQueue(grainInterfaceType);
// If the same workflow queue is responsible we just check what workflow records are still in process
@@ -174,8 +177,7 @@ private async Task> HandleRemainingWorkflows(Type grainInterfa
try
{
// Get the list of remaining workflow records from oldWorkflowQ.
- remainingWorkflows = await this.SiloIndexManager.InjectableCode
- .GetRemainingWorkflowsIn(() => oldWorkflowQ.GetRemainingWorkflowsIn(this.ActiveWorkflowsSet));
+ remainingWorkflows = await this.SiloIndexManager.InjectableCode.GetRemainingWorkflowsIn(() => oldWorkflowQ.GetRemainingWorkflowsIn(this.ActiveWorkflowsSet));
}
catch
{
@@ -202,11 +204,11 @@ private async Task> HandleRemainingWorkflows(Type grainInterfa
return Enumerable.Empty();
}
- private async Task GetReincarnatedWorkflowQueue(IIndexWorkflowQueue workflowQ)
+ async Task GetReincarnatedWorkflowQueue(IIndexWorkflowQueue workflowQ)
{
var primaryKey = workflowQ.GetPrimaryKeyString();
- var reincarnatedQ = this._grainFactory.GetGrain(primaryKey);
- var reincarnatedQHandler = this._grainFactory.GetGrain(primaryKey);
+ var reincarnatedQ = this.grainFactory.GetGrain(primaryKey);
+ var reincarnatedQHandler = this.grainFactory.GetGrain(primaryKey);
// This is called during OnActivateAsync(), so workflowQ's may be on a different silo than the
// current grain activation.
@@ -214,7 +216,7 @@ private async Task GetReincarnatedWorkflowQueue(IIndexWorkf
return reincarnatedQ;
}
- private Task PruneActiveWorkflowsSetFromAlreadyHandledWorkflows(IEnumerable workflowsInProgress)
+ Task PruneActiveWorkflowsSetFromAlreadyHandledWorkflows(IEnumerable workflowsInProgress)
{
var initialSize = this.ActiveWorkflowsSet.Count;
this.ActiveWorkflowsSet.Clear();
@@ -222,11 +224,11 @@ private Task PruneActiveWorkflowsSetFromAlreadyHandledWorkflows(IEnumerable base._grainIndexes.ContainsInterface(kvp.Key)).ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
+ this.WorkflowQueues = oldQueues.Where(kvp => base.grainIndexes.ContainsInterface(kvp.Key)).ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
}
public override Task>> GetActiveWorkflowIdsSet()
@@ -257,7 +259,7 @@ public override Task RemoveFromActiveWorkflowIds(HashSet removedWorkflowId
/// Adds a workflow ID to the list of active workflows for this fault-tolerant indexable grain
///
/// the workflow IDs to be added
- private void AddWorkflowIdsToActiveWorkflows(Guid[] workflowIds)
+ void AddWorkflowIdsToActiveWorkflows(Guid[] workflowIds)
{
if (this.ActiveWorkflowsSet == null)
{
@@ -269,7 +271,7 @@ private void AddWorkflowIdsToActiveWorkflows(Guid[] workflowIds)
///
/// Generates a unique Guid that does not exist in the list of active workflows.
- ///
+ ///
/// Actually, there is a very unlikely possibility that we end up with a duplicate workflow ID in the following scenario:
/// 1- IndexableGrain G is updated and assigned workflow ID = A
/// 2- workflow record with ID = A is added to the index workflow queue
@@ -277,11 +279,11 @@ private void AddWorkflowIdsToActiveWorkflows(Guid[] workflowIds)
/// 4- G is re-activated and reads it state from storage (which does not include A in its active workflow list)
/// 5- G gets updated and a new workflow with ID = A is generated for it.
/// This ID is assumed to be unique, while it actually is not unique and already exists in the workflow queue.
- ///
+ ///
/// The only way to avoid it is using a centralized unique workflow ID generator, which can be added if necessary.
///
/// a new unique workflow ID
- private Guid GenerateUniqueWorkflowId()
+ Guid GenerateUniqueWorkflowId()
{
var workflowId = Guid.NewGuid();
while (this.ActiveWorkflowsSet != null && this.ActiveWorkflowsSet.Contains(workflowId))
diff --git a/src/Orleans.Indexing/Facet/Implementations/FaultTolerantWorkflowIndexedStateAttribute.cs b/src/Orleans.Indexing/Facet/Implementations/FaultTolerantWorkflowIndexedStateAttribute.cs
index 2ab36e9..7414e54 100644
--- a/src/Orleans.Indexing/Facet/Implementations/FaultTolerantWorkflowIndexedStateAttribute.cs
+++ b/src/Orleans.Indexing/Facet/Implementations/FaultTolerantWorkflowIndexedStateAttribute.cs
@@ -13,9 +13,6 @@ public interface IFaultTolerantWorkflowIndexedStateAttribute
/// Base class for the IIndexedState facet that is implemented by fault-tolerant workflow-based indexing.
///
[AttributeUsage(AttributeTargets.Parameter)]
- public class FaultTolerantWorkflowIndexedStateAttribute : IndexedStateAttribute, IFacetMetadata, IFaultTolerantWorkflowIndexedStateAttribute, IIndexedStateConfiguration
- {
- public FaultTolerantWorkflowIndexedStateAttribute(string stateName, string storageName = null)
- : base(stateName, storageName) { }
- }
+ public class FaultTolerantWorkflowIndexedStateAttribute(string stateName, string storageName = null) :
+ IndexedStateAttribute(stateName, storageName), IFacetMetadata, IFaultTolerantWorkflowIndexedStateAttribute, IIndexedStateConfiguration;
}
diff --git a/src/Orleans.Indexing/Facet/Implementations/FaultTolerantWorkflowIndexedStateAttributeMapper.cs b/src/Orleans.Indexing/Facet/Implementations/FaultTolerantWorkflowIndexedStateAttributeMapper.cs
index 8c9ddde..93f5ca9 100644
--- a/src/Orleans.Indexing/Facet/Implementations/FaultTolerantWorkflowIndexedStateAttributeMapper.cs
+++ b/src/Orleans.Indexing/Facet/Implementations/FaultTolerantWorkflowIndexedStateAttributeMapper.cs
@@ -6,9 +6,9 @@ namespace Orleans.Indexing.Facet
internal class FaultTolerantWorkflowIndexedStateAttributeMapper : IndexedStateAttributeMapperBase,
IAttributeToFactoryMapper
{
- private static readonly MethodInfo CreateMethod = typeof(IIndexedStateFactory).GetMethod(nameof(IIndexedStateFactory.CreateFaultTolerantWorkflowIndexedState));
+ static readonly MethodInfo CreateMethod = typeof(IIndexedStateFactory).GetMethod(nameof(IIndexedStateFactory.CreateFaultTolerantWorkflowIndexedState));
- public Factory GetFactory(ParameterInfo parameter, FaultTolerantWorkflowIndexedStateAttribute attribute)
+ public Factory GetFactory(ParameterInfo parameter, FaultTolerantWorkflowIndexedStateAttribute attribute)
=> base.GetFactory(CreateMethod, parameter, attribute);
}
}
diff --git a/src/Orleans.Indexing/Facet/Implementations/IndexedGrainStateWrapper.cs b/src/Orleans.Indexing/Facet/Implementations/IndexedGrainStateWrapper.cs
index 6d815a6..4e0956c 100644
--- a/src/Orleans.Indexing/Facet/Implementations/IndexedGrainStateWrapper.cs
+++ b/src/Orleans.Indexing/Facet/Implementations/IndexedGrainStateWrapper.cs
@@ -8,6 +8,7 @@ namespace Orleans.Indexing.Facet
///
/// the type of user state
[Serializable]
+ [GenerateSerializer]
public class IndexedGrainStateWrapper
where TGrainState: new()
{
diff --git a/src/Orleans.Indexing/Facet/Implementations/IndexedStateBase.cs b/src/Orleans.Indexing/Facet/Implementations/IndexedStateBase.cs
index 0d43257..d503249 100644
--- a/src/Orleans.Indexing/Facet/Implementations/IndexedStateBase.cs
+++ b/src/Orleans.Indexing/Facet/Implementations/IndexedStateBase.cs
@@ -9,35 +9,26 @@
namespace Orleans.Indexing.Facet
{
- abstract class IndexedStateBase : IIndexedState where TGrainState : class, new()
+ abstract class IndexedStateBase(IServiceProvider sp, IIndexedStateConfiguration config, IGrainContext context)
+ : IIndexedState where TGrainState : class, new()
{
- private protected readonly IServiceProvider ServiceProvider;
- private protected readonly IIndexedStateConfiguration IndexedStateConfig;
- private protected readonly IGrainActivationContext grainActivationContext;
-
- private protected Grain grain;
- private protected IIndexableGrain iIndexableGrain;
-
- private protected Func getWorkflowIdFunc;
-
- private protected GrainIndexes _grainIndexes;
- private protected bool _hasAnyUniqueIndex;
-
- public IndexedStateBase(IServiceProvider sp, IIndexedStateConfiguration config, IGrainActivationContext context)
- {
- this.ServiceProvider = sp;
- this.IndexedStateConfig = config;
- this.grainActivationContext = context;
- }
+ readonly IServiceProvider ServiceProvider = sp;
+ protected readonly IIndexedStateConfiguration IndexedStateConfig = config;
+ protected readonly IGrainContext grainActivationContext = context;
+ protected Grain grain;
+ protected IIndexableGrain iIndexableGrain;
+ protected Func getWorkflowIdFunc;
+ protected GrainIndexes grainIndexes;
+ bool _hasAnyUniqueIndex;
// IndexManager (and therefore logger) cannot be set in ctor because Grain activation has not yet set base.Runtime.
- internal SiloIndexManager SiloIndexManager => IndexManager.GetSiloIndexManager(ref this.__siloIndexManager, this.ServiceProvider);
- private SiloIndexManager __siloIndexManager;
+ internal SiloIndexManager SiloIndexManager => IndexManager.GetSiloIndexManager(ref this.siloIndexManager, this.ServiceProvider);
+ SiloIndexManager siloIndexManager;
- private protected ILogger Logger => this.__logger ?? (this.__logger = this.SiloIndexManager.LoggerFactory.CreateLoggerWithFullCategoryName(this.GetType()));
- private ILogger __logger;
+ protected ILogger Logger => this.__logger ??= this.SiloIndexManager.LoggerFactory.CreateLoggerWithFullCategoryName(this.GetType());
+ ILogger __logger;
- private protected SiloAddress BaseSiloAddress => this.SiloIndexManager.SiloAddress;
+ protected SiloAddress BaseSiloAddress => this.SiloIndexManager.SiloAddress;
#region public API
@@ -49,13 +40,13 @@ public IndexedStateBase(IServiceProvider sp, IIndexedStateConfiguration config,
#region Lifecycle management
- public void Participate(IGrainLifecycle lifecycle)
+ protected void Participate(IGrainLifecycle lifecycle)
{
- lifecycle.Subscribe(GrainLifecycleStage.SetupState, _ => OnSetupStateAsync());
- lifecycle.Subscribe(GrainLifecycleStage.Activate, ct => OnActivateAsync(ct), ct => OnDeactivateAsync(ct));
+ lifecycle.Subscribe(GrainLifecycleStage.SetupState, onStart: _ => OnSetupStateAsync());
+ lifecycle.Subscribe(GrainLifecycleStage.Activate, onStart: ct => OnActivateAsync(ct), onStop: ct => OnDeactivateAsync(ct));
}
- private protected Task OnSetupStateAsync() => this.Initialize(this.grainActivationContext.GrainInstance);
+ Task OnSetupStateAsync() => this.Initialize(this.grain);
internal abstract Task OnActivateAsync(CancellationToken ct);
@@ -63,25 +54,25 @@ public void Participate(IGrainLifecycle lifecycle)
#endregion Lifecycle management
- private Task Initialize(Grain grain)
+ Task Initialize(Grain grain)
{
- if (this.grain == null) // If not already called
+ if (this.grain != null) // If not already called
{
- this.grain = grain;
- this.iIndexableGrain = this.grain.AsReference(this.SiloIndexManager);
+ return Task.CompletedTask;
+ }
- if (!GrainIndexes.CreateInstance(this.SiloIndexManager.IndexRegistry, this.grain.GetType(), out this._grainIndexes)
- || !this._grainIndexes.HasAnyIndexes)
- {
- throw new InvalidOperationException("IndexedState should not be used for a Grain class with no indexes");
- }
- this._hasAnyUniqueIndex = this._grainIndexes.HasAnyUniqueIndex;
+ this.grain = grain;
+ this.iIndexableGrain = this.grain.AsReference(this.SiloIndexManager);
+ if (!GrainIndexes.CreateInstance(this.SiloIndexManager.IndexRegistry, this.grain.GetType(), out this.grainIndexes) || !this.grainIndexes.HasAnyIndexes)
+ {
+ throw new InvalidOperationException("IndexedState should not be used for a Grain class with no indexes");
}
+ this._hasAnyUniqueIndex = this.grainIndexes.HasAnyUniqueIndex;
return Task.CompletedTask;
}
///
- /// After some changes were made to the grain, and the grain is in a consistent state, this method is called to update the
+ /// After some changes were made to the grain, and the grain is in a consistent state, this method is called to update the
/// indexes defined on this grain type.
///
///
@@ -96,18 +87,16 @@ private Task Initialize(Grain grain)
/// Determines whether this method is called upon activation, deactivation, or still-active state of this grain
/// whether only active indexes should be updated
/// whether to write back the state to the storage if no constraint is violated
- private protected async Task UpdateIndexes(IndexUpdateReason updateReason, bool onlyUpdateActiveIndexes, bool writeStateIfConstraintsAreNotViolated)
+ protected async Task UpdateIndexes(IndexUpdateReason updateReason, bool onlyUpdateActiveIndexes, bool writeStateIfConstraintsAreNotViolated)
{
// A flag to determine whether only unique indexes were updated
var onlyUniqueIndexesWereUpdated = this._hasAnyUniqueIndex;
// Gather the dictionary of indexes to their corresponding updates, grouped by interface
- var interfaceToUpdatesMap = this.GenerateMemberUpdates(updateReason, onlyUpdateActiveIndexes,
- out var updateIndexesEagerly, ref onlyUniqueIndexesWereUpdated, out var numberOfUniqueIndexesUpdated);
+ var interfaceToUpdatesMap = this.GenerateMemberUpdates(updateReason, onlyUpdateActiveIndexes, out var updateIndexesEagerly, ref onlyUniqueIndexesWereUpdated, out var numberOfUniqueIndexesUpdated);
// Apply the updates to the indexes defined on this grain
- await this.ApplyIndexUpdates(interfaceToUpdatesMap, updateIndexesEagerly,
- onlyUniqueIndexesWereUpdated, numberOfUniqueIndexesUpdated, writeStateIfConstraintsAreNotViolated);
+ await this.ApplyIndexUpdates(interfaceToUpdatesMap, updateIndexesEagerly, onlyUniqueIndexesWereUpdated, numberOfUniqueIndexesUpdated, writeStateIfConstraintsAreNotViolated);
return interfaceToUpdatesMap;
}
@@ -120,30 +109,35 @@ await this.ApplyIndexUpdates(interfaceToUpdatesMap, updateIndexesEagerly,
/// determine the number of updated unique indexes
/// whether writing back
/// the state to the storage should be done if no constraint is violated
- private protected abstract Task ApplyIndexUpdates(InterfaceToUpdatesMap interfaceToUpdatesMap,
+ protected abstract Task ApplyIndexUpdates(InterfaceToUpdatesMap interfaceToUpdatesMap,
bool updateIndexesEagerly, bool onlyUniqueIndexesWereUpdated,
int numberOfUniqueIndexesUpdated, bool writeStateIfConstraintsAreNotViolated);
- private InterfaceToUpdatesMap GenerateMemberUpdates(IndexUpdateReason updateReason,
- bool onlyUpdateActiveIndexes, out bool updateIndexesEagerly,
- ref bool onlyUniqueIndexesWereUpdated, out int numberOfUniqueIndexesUpdated)
+ InterfaceToUpdatesMap GenerateMemberUpdates(IndexUpdateReason updateReason, bool onlyUpdateActiveIndexes, out bool updateIndexesEagerly, ref bool onlyUniqueIndexesWereUpdated, out int numberOfUniqueIndexesUpdated)
{
(string prevIndexName, var prevIndexIsEager) = (null, false);
-
- var numUniqueIndexes = 0; // Local vars due to restrictions on local functions accessing ref/out params
+ var numUniqueIndexes = 0;
var onlyUniqueIndexes = true;
+ var interfaceToUpdatesMap = new InterfaceToUpdatesMap(
+ updateReason,
+ this.getWorkflowIdFunc,
+ this.grainIndexes.Select(kvp => (kvp.Key, generateNamedMemberUpdates(kvp.Key, indexes: kvp.Value)))
+ );
+ updateIndexesEagerly = prevIndexName != null && prevIndexIsEager;
+ numberOfUniqueIndexesUpdated = numUniqueIndexes;
+ onlyUniqueIndexesWereUpdated = onlyUniqueIndexes;
+ return interfaceToUpdatesMap;
IEnumerable<(string indexName, IMemberUpdate mu)> generateNamedMemberUpdates(Type interfaceType, InterfaceIndexes indexes)
{
var befImgs = indexes.BeforeImages.Value;
- foreach ((var indexName, var indexInfo) in indexes.NamedIndexes
- .Where(kvp => !onlyUpdateActiveIndexes || !kvp.Value.IndexInterface.IsTotalIndex())
- .Select(kvp => (kvp.Key, kvp.Value)))
+ foreach (var (indexName, indexInfo) in indexes.NamedIndexes
+ .Where(kvp => !onlyUpdateActiveIndexes || !kvp.Value.IndexInterface.IsTotalIndex())
+ .Select(kvp => (kvp.Key, kvp.Value)))
{
var mu = updateReason == IndexUpdateReason.OnActivate
- ? indexInfo.UpdateGenerator.CreateMemberUpdate(befImgs[indexName])
- : indexInfo.UpdateGenerator.CreateMemberUpdate(
- updateReason == IndexUpdateReason.OnDeactivate ? null : indexes.Properties, befImgs[indexName]);
+ ? indexInfo.UpdateGenerator.CreateMemberUpdate(befImgs[indexName])
+ : indexInfo.UpdateGenerator.CreateMemberUpdate(updateReason == IndexUpdateReason.OnDeactivate ? null : indexes.Properties, befImgs[indexName]);
if (mu.OperationType != IndexOperationType.None)
{
if (prevIndexName != null && prevIndexIsEager != indexInfo.MetaData.IsEager)
@@ -168,17 +162,11 @@ private InterfaceToUpdatesMap GenerateMemberUpdates(IndexUpdateReason updateReas
}
}
}
-
- var interfaceToUpdatesMap = new InterfaceToUpdatesMap(updateReason, this.getWorkflowIdFunc,
- this._grainIndexes.Select(kvp => (kvp.Key, generateNamedMemberUpdates(kvp.Key, kvp.Value))));
- updateIndexesEagerly = prevIndexName != null ? prevIndexIsEager : false;
- numberOfUniqueIndexesUpdated = numUniqueIndexes;
- onlyUniqueIndexesWereUpdated = onlyUniqueIndexes;
- return interfaceToUpdatesMap;
}
// IIndexableGrain methods; these are overridden only by FaultTolerantWorkflowIndexedState. TODO move to FT only
public virtual Task>> GetActiveWorkflowIdsSet() => throw new NotImplementedException("GetActiveWorkflowIdsSet");
+
public virtual Task RemoveFromActiveWorkflowIds(HashSet removedWorkflowIds) => throw new NotImplementedException("RemoveFromActiveWorkflowIds");
}
}
diff --git a/src/Orleans.Indexing/Facet/Implementations/InterfaceToUpdatesMap.cs b/src/Orleans.Indexing/Facet/Implementations/InterfaceToUpdatesMap.cs
index 25daad3..5753ab6 100644
--- a/src/Orleans.Indexing/Facet/Implementations/InterfaceToUpdatesMap.cs
+++ b/src/Orleans.Indexing/Facet/Implementations/InterfaceToUpdatesMap.cs
@@ -7,15 +7,15 @@ namespace Orleans.Indexing.Facet
{
internal class InterfaceToUpdatesMap: IEnumerable>>
{
- private readonly IReadOnlyDictionary> updatesByInterface;
+ readonly IReadOnlyDictionary> updatesByInterface;
+
internal IReadOnlyDictionary WorkflowIds { get; }
internal IReadOnlyDictionary this[Type interfaceType] => this.updatesByInterface[interfaceType];
internal IndexUpdateReason UpdateReason { get; }
- internal InterfaceToUpdatesMap(IndexUpdateReason updateReason, Func getWorkflowIdFunc,
- IEnumerable<(Type interfaceType, IEnumerable<(string indexName, IMemberUpdate mu)> namedUpdates)> updateEnumerator)
+ internal InterfaceToUpdatesMap(IndexUpdateReason updateReason, Func getWorkflowIdFunc, IEnumerable<(Type interfaceType, IEnumerable<(string indexName, IMemberUpdate mu)> namedUpdates)> updateEnumerator)
{
this.UpdateReason = updateReason;
this.updatesByInterface = updateEnumerator.Select(x => (itf: x.interfaceType, dict: x.namedUpdates.ToDictionary(upd => upd.indexName, upd => upd.mu)))
diff --git a/src/Orleans.Indexing/Facet/Implementations/NonFaultTolerantWorkflowIndexedState.cs b/src/Orleans.Indexing/Facet/Implementations/NonFaultTolerantWorkflowIndexedState.cs
index 38d09da..a7efbd6 100644
--- a/src/Orleans.Indexing/Facet/Implementations/NonFaultTolerantWorkflowIndexedState.cs
+++ b/src/Orleans.Indexing/Facet/Implementations/NonFaultTolerantWorkflowIndexedState.cs
@@ -15,10 +15,10 @@ internal class NonFaultTolerantWorkflowIndexedState
where TGrainState : class, new()
where TWrappedState: IndexedGrainStateWrapper, new()
{
- public NonFaultTolerantWorkflowIndexedState(
+ protected NonFaultTolerantWorkflowIndexedState(
IServiceProvider sp,
IIndexedStateConfiguration config,
- IGrainActivationContext context
+ IGrainContext context
) : base(sp, config, context)
{
base.getWorkflowIdFunc = () => Guid.NewGuid();
@@ -28,8 +28,8 @@ IGrainActivationContext context
internal override async Task OnActivateAsync(CancellationToken ct)
{
- Debug.Assert(!(this is FaultTolerantWorkflowIndexedState)); // Ensure this is overridden
- base.Logger.Trace($"Activating indexable grain of type {grain.GetType().Name} in silo {this.SiloIndexManager.SiloAddress}.");
+ Debug.Assert(this is not FaultTolerantWorkflowIndexedState); // Ensure this is overridden
+ base.Logger.Trace(IndexingErrorCode.Indexing, $"Activating indexable grain of type {grain.GetType().Name} in silo {this.SiloIndexManager.SiloAddress}.");
await base.InitializeState();
await base.FinishActivateAsync();
}
@@ -42,7 +42,7 @@ internal override async Task OnActivateAsync(CancellationToken ct)
/// a flag to determine whether only unique indexes were updated
/// determine the number of updated unique indexes
/// whether the state should be written to storage if no constraint is violated
- private protected override async Task ApplyIndexUpdates(InterfaceToUpdatesMap interfaceToUpdatesMap,
+ protected override async Task ApplyIndexUpdates(InterfaceToUpdatesMap interfaceToUpdatesMap,
bool updateIndexesEagerly,
bool onlyUniqueIndexesWereUpdated,
int numberOfUniqueIndexesUpdated,
@@ -71,7 +71,7 @@ private protected override async Task ApplyIndexUpdates(InterfaceToUpdatesMap in
// so they are not visible to readers before making sure that all uniqueness constraints are satisfied.
await this.ApplyIndexUpdatesEagerly(interfaceToUpdatesMap, UpdateIndexType.Unique, updateEagerUniqueIndexesTentatively);
}
- catch (UniquenessConstraintViolatedException ex)
+ catch (UniquenessConstraintViolatedException)
{
// If any uniqueness constraint is violated and we have more than one unique index defined, then all tentative
// updates must be undone, then the exception is thrown back to the user code.
@@ -79,7 +79,8 @@ private protected override async Task ApplyIndexUpdates(InterfaceToUpdatesMap in
{
await this.UndoTentativeChangesToUniqueIndexesEagerly(interfaceToUpdatesMap);
}
- throw ex;
+ //throw ex;
+ throw;
}
}
@@ -111,18 +112,17 @@ await Task.WhenAll(new[]
this.UpdateBeforeImages(interfaceToUpdatesMap);
}
- private Task UndoTentativeChangesToUniqueIndexesEagerly(InterfaceToUpdatesMap interfaceToUpdatesMap)
+ Task UndoTentativeChangesToUniqueIndexesEagerly(InterfaceToUpdatesMap interfaceToUpdatesMap)
=> Task.WhenAll(interfaceToUpdatesMap.Select(kvp => base.ApplyIndexUpdatesEagerly(kvp.Key, MemberUpdateReverseTentative.Reverse(kvp.Value),
UpdateIndexType.Unique, isTentative: false)));
///
/// Lazily Applies updates to the indexes defined on this grain
- ///
+ ///
/// The lazy update involves adding a workflow record to the corresponding IIndexWorkflowQueue for this grain.
///
/// the dictionary of updates for each index by interface
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- private void ApplyIndexUpdatesLazilyWithoutWait(InterfaceToUpdatesMap updatesByInterface)
- => base.ApplyIndexUpdatesLazily(updatesByInterface).Ignore();
+ void ApplyIndexUpdatesLazilyWithoutWait(InterfaceToUpdatesMap updatesByInterface) => base.ApplyIndexUpdatesLazily(updatesByInterface).Ignore();
}
}
diff --git a/src/Orleans.Indexing/Facet/Implementations/NonFaultTolerantWorkflowIndexedStateAttributeMapper.cs b/src/Orleans.Indexing/Facet/Implementations/NonFaultTolerantWorkflowIndexedStateAttributeMapper.cs
index 559512f..aeed201 100644
--- a/src/Orleans.Indexing/Facet/Implementations/NonFaultTolerantWorkflowIndexedStateAttributeMapper.cs
+++ b/src/Orleans.Indexing/Facet/Implementations/NonFaultTolerantWorkflowIndexedStateAttributeMapper.cs
@@ -8,7 +8,7 @@ internal class NonFaultTolerantWorkflowIndexedStateAttributeMapper : IndexedStat
{
private static readonly MethodInfo CreateMethod = typeof(IIndexedStateFactory).GetMethod(nameof(IIndexedStateFactory.CreateNonFaultTolerantWorkflowIndexedState));
- public Factory GetFactory(ParameterInfo parameter, NonFaultTolerantWorkflowIndexedStateAttribute attribute)
+ public Factory GetFactory(ParameterInfo parameter, NonFaultTolerantWorkflowIndexedStateAttribute attribute)
=> base.GetFactory(CreateMethod, parameter, attribute);
}
}
diff --git a/src/Orleans.Indexing/Facet/Implementations/NonTransactionalState.cs b/src/Orleans.Indexing/Facet/Implementations/NonTransactionalState.cs
index 308c88b..2d9f76d 100644
--- a/src/Orleans.Indexing/Facet/Implementations/NonTransactionalState.cs
+++ b/src/Orleans.Indexing/Facet/Implementations/NonTransactionalState.cs
@@ -7,7 +7,7 @@ namespace Orleans.Indexing.Facet
internal class NonTransactionalState : ITransactionalState
where TGrainState : class, new()
{
- private readonly IStorage storage;
+ readonly IStorage storage;
private NonTransactionalState(IStorage storage) // private; use Create()
=> this.storage = storage;
diff --git a/src/Orleans.Indexing/Facet/Implementations/TransactionalIndexedState.cs b/src/Orleans.Indexing/Facet/Implementations/TransactionalIndexedState.cs
index bef2d10..5a5fd97 100644
--- a/src/Orleans.Indexing/Facet/Implementations/TransactionalIndexedState.cs
+++ b/src/Orleans.Indexing/Facet/Implementations/TransactionalIndexedState.cs
@@ -20,7 +20,7 @@ class TransactionalIndexedState : IndexedStateBase,
public TransactionalIndexedState(
IServiceProvider sp,
IIndexedStateConfiguration config,
- IGrainActivationContext context,
+ IGrainContext context,
ITransactionalStateFactory transactionalStateFactory
) : base(sp, config, context)
{
@@ -72,7 +72,7 @@ public async override Task PerformUpdate(Func PerformUpdate(Func wrappedState, bool forUpdate)
{
// State initialization is deferred as we must be in a transaction context to access it.
- wrappedState.EnsureNullValues(base._grainIndexes.PropertyNullValues);
+ wrappedState.EnsureNullValues(base.grainIndexes.PropertyNullValues);
if (forUpdate)
{
// Apply the deferred BeforeImage update.
- _grainIndexes.UpdateBeforeImages(wrappedState.UserState, force:true);
+ this.grainIndexes.UpdateBeforeImages(wrappedState.UserState, force:true);
}
}
@@ -104,7 +104,7 @@ void EnsureStateInitialized(IndexedGrainStateWrapper wrappedState,
/// determine the number of updated unique indexes; unused for transactional indexes
/// whether the state should be written to storage if no constraint is violated;
/// must always be true for transactional indexes
- private protected override async Task ApplyIndexUpdates(InterfaceToUpdatesMap interfaceToUpdatesMap,
+ protected override async Task ApplyIndexUpdates(InterfaceToUpdatesMap interfaceToUpdatesMap,
bool updateIndexesEagerly,
bool onlyUniqueIndexesWereUpdated,
int numberOfUniqueIndexesUpdated,
@@ -118,7 +118,7 @@ private protected override async Task ApplyIndexUpdates(InterfaceToUpdatesMap in
Debug.Assert(updateIndexesEagerly, "Transactional indexes cannot be configured to be lazy; this misconfiguration should have been caught in ValidateSingleIndex.");
IEnumerable getIndexUpdateTasks(Type grainInterfaceType, IReadOnlyDictionary updates)
{
- var indexInterfaces = this._grainIndexes[grainInterfaceType];
+ var indexInterfaces = this.grainIndexes[grainInterfaceType];
foreach (var (indexName, mu) in updates.Where(kvp => kvp.Value.OperationType != IndexOperationType.None).OrderBy(kvp => kvp.Key))
{
var indexInfo = indexInterfaces.NamedIndexes[indexName];
diff --git a/src/Orleans.Indexing/Facet/Implementations/TransactionalIndexedStateAttributeMapper.cs b/src/Orleans.Indexing/Facet/Implementations/TransactionalIndexedStateAttributeMapper.cs
index 5ba92db..b06825f 100644
--- a/src/Orleans.Indexing/Facet/Implementations/TransactionalIndexedStateAttributeMapper.cs
+++ b/src/Orleans.Indexing/Facet/Implementations/TransactionalIndexedStateAttributeMapper.cs
@@ -8,7 +8,7 @@ class TransactionalIndexedStateAttributeMapper : IndexedStateAttributeMapperBase
{
private static readonly MethodInfo CreateMethod = typeof(IIndexedStateFactory).GetMethod(nameof(IIndexedStateFactory.CreateTransactionalIndexedState));
- public Factory GetFactory(ParameterInfo parameter, TransactionalIndexedStateAttribute attribute)
+ public Factory GetFactory(ParameterInfo parameter, TransactionalIndexedStateAttribute attribute)
=> base.GetFactory(CreateMethod, parameter, attribute);
}
}
diff --git a/src/Orleans.Indexing/Facet/Implementations/WorkflowIndexedStateBase.cs b/src/Orleans.Indexing/Facet/Implementations/WorkflowIndexedStateBase.cs
index 18c9800..84453b6 100644
--- a/src/Orleans.Indexing/Facet/Implementations/WorkflowIndexedStateBase.cs
+++ b/src/Orleans.Indexing/Facet/Implementations/WorkflowIndexedStateBase.cs
@@ -10,23 +10,17 @@
namespace Orleans.Indexing.Facet
{
- internal abstract class WorkflowIndexedStateBase : IndexedStateBase
- where TGrainState : class, new()
- where TWrappedState: IndexedGrainStateWrapper, new()
+ internal abstract class WorkflowIndexedStateBase(IServiceProvider sp, IIndexedStateConfiguration config, IGrainContext context)
+ : IndexedStateBase(sp, config, context) where TGrainState : class, new() where TWrappedState : IndexedGrainStateWrapper, new()
{
- private protected NonTransactionalState nonTransactionalState;
-
- public WorkflowIndexedStateBase(IServiceProvider sp, IIndexedStateConfiguration config, IGrainActivationContext context)
- : base(sp, config, context)
- {
- }
+ protected NonTransactionalState nonTransactionalState;
// A cache for the workflow queues, one for each grain interface type that the current IndexableGrain implements
internal virtual IDictionary WorkflowQueues { get; set; }
internal override Task OnDeactivateAsync(CancellationToken ct)
{
- base.Logger.Trace($"Deactivating indexable grain of type {base.grain.GetType().Name} in silo {this.SiloIndexManager.SiloAddress}.");
+ base.Logger.Trace(IndexingErrorCode.Indexing, $"Deactivating indexable grain of type {base.grain.GetType().Name} in silo {this.SiloIndexManager.SiloAddress}.");
return this.RemoveFromActiveIndexes();
}
@@ -40,26 +34,26 @@ public async override Task PerformUpdate(Func updateFunction(wrappedState.UserState));
- this._grainIndexes.MapStateToProperties(this.nonTransactionalState.State.UserState);
+ this.grainIndexes.MapStateToProperties(this.nonTransactionalState.State.UserState);
await base.UpdateIndexes(IndexUpdateReason.WriteState, onlyUpdateActiveIndexes: false, writeStateIfConstraintsAreNotViolated: true);
return result;
}
#endregion public API
- private protected Task WriteStateAsync() => this.nonTransactionalState.PerformUpdate();
+ protected Task WriteStateAsync() => this.nonTransactionalState.PerformUpdate();
- private protected async Task InitializeState()
+ protected async Task InitializeState()
{
var storage = base.SiloIndexManager.GetStorageBridge(base.grain, base.IndexedStateConfig.StorageName);
this.nonTransactionalState = await NonTransactionalState.CreateAsync(storage);
await this.PerformRead();
- this.nonTransactionalState.State.EnsureNullValues(base._grainIndexes.PropertyNullValues);
- base._grainIndexes.AddMissingBeforeImages(this.nonTransactionalState.State.UserState);
+ this.nonTransactionalState.State.EnsureNullValues(base.grainIndexes.PropertyNullValues);
+ base.grainIndexes.AddMissingBeforeImages(this.nonTransactionalState.State.UserState);
}
- private protected Task FinishActivateAsync()
+ protected Task FinishActivateAsync()
{
Debug.Assert(this.grain != null, "Initialize() not called");
return this.InsertIntoActiveIndexes();
@@ -68,10 +62,10 @@ private protected Task FinishActivateAsync()
///
/// Inserts the current grain to the active indexes only if it already has a persisted state
///
- protected Task InsertIntoActiveIndexes()
+ Task InsertIntoActiveIndexes()
{
// Check if it contains anything to be indexed
- return this._grainIndexes.HasIndexImages
+ return this.grainIndexes.HasIndexImages
? this.UpdateIndexes(IndexUpdateReason.OnActivate, onlyUpdateActiveIndexes: true, writeStateIfConstraintsAreNotViolated: false)
: Task.CompletedTask;
}
@@ -82,7 +76,7 @@ protected Task InsertIntoActiveIndexes()
protected Task RemoveFromActiveIndexes()
{
// Check if it has anything indexed
- return this._grainIndexes.HasIndexImages
+ return this.grainIndexes.HasIndexImages
? this.UpdateIndexes(IndexUpdateReason.OnDeactivate, onlyUpdateActiveIndexes: true, writeStateIfConstraintsAreNotViolated: false)
: Task.CompletedTask;
}
@@ -95,8 +89,7 @@ protected Task RemoveFromActiveIndexes()
/// indicates whether updates to indexes should be tentatively done. That is, the update
/// won't be visible to readers, but prevents writers from overwriting them and violating constraints
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- private protected Task ApplyIndexUpdatesEagerly(InterfaceToUpdatesMap interfaceToUpdatesMap,
- UpdateIndexType updateIndexTypes, bool isTentative = false)
+ protected Task ApplyIndexUpdatesEagerly(InterfaceToUpdatesMap interfaceToUpdatesMap, UpdateIndexType updateIndexTypes, bool isTentative = false)
=> Task.WhenAll(interfaceToUpdatesMap.Select(kvp => this.ApplyIndexUpdatesEagerly(kvp.Key, kvp.Value, updateIndexTypes, isTentative)));
///
@@ -108,10 +101,10 @@ private protected Task ApplyIndexUpdatesEagerly(InterfaceToUpdatesMap interfaceT
/// indicates whether updates to indexes should be tentatively done. That is, the update
/// won't be visible to readers, but prevents writers from overwriting them and violating constraints
///
- private protected Task ApplyIndexUpdatesEagerly(Type grainInterfaceType, IReadOnlyDictionary updates,
- UpdateIndexType updateIndexTypes, bool isTentative)
+ protected Task ApplyIndexUpdatesEagerly(Type grainInterfaceType, IReadOnlyDictionary updates, UpdateIndexType updateIndexTypes, bool isTentative)
{
- var indexInterfaces = this._grainIndexes[grainInterfaceType];
+ var indexInterfaces = this.grainIndexes[grainInterfaceType];
+
IEnumerable> getUpdateTasks()
{
foreach (var (indexName, mu) in updates.Where(kvp => kvp.Value.OperationType != IndexOperationType.None))
@@ -121,8 +114,7 @@ IEnumerable> getUpdateTasks()
{
// If the caller asks for the update to be tentative, then it will be wrapped inside a MemberUpdateTentative
var updateToIndex = isTentative ? new MemberUpdateOverriddenMode(mu, IndexUpdateMode.Tentative) : mu;
- yield return indexInfo.IndexInterface.ApplyIndexUpdate(this.SiloIndexManager,
- this.iIndexableGrain, updateToIndex.AsImmutable(), indexInfo.MetaData, this.BaseSiloAddress);
+ yield return indexInfo.IndexInterface.ApplyIndexUpdate(this.SiloIndexManager, this.iIndexableGrain, updateToIndex.AsImmutable(), indexInfo.MetaData, this.BaseSiloAddress);
}
}
}
@@ -133,17 +125,19 @@ IEnumerable> getUpdateTasks()
///
/// Lazily applies updates to the indexes defined on this grain
- ///
+ ///
/// The lazy update involves adding a workflow record to the corresponding IIndexWorkflowQueue for this grain.
///
/// the dictionary of updates for each index by interface
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- private protected Task ApplyIndexUpdatesLazily(InterfaceToUpdatesMap interfaceToUpdatesMap)
- => Task.WhenAll(interfaceToUpdatesMap.Select(kvp => this.GetWorkflowQueue(kvp.Key).AddToQueue(new IndexWorkflowRecord(interfaceToUpdatesMap.WorkflowIds[kvp.Key],
- base.iIndexableGrain, kvp.Value).AsImmutable())));
+ protected Task ApplyIndexUpdatesLazily(InterfaceToUpdatesMap interfaceToUpdatesMap) =>
+ Task.WhenAll(
+ interfaceToUpdatesMap.Select(kvp =>
+ this.GetWorkflowQueue(kvp.Key)
+ .AddToQueue(new IndexWorkflowRecord(interfaceToUpdatesMap.WorkflowIds[kvp.Key], base.iIndexableGrain, kvp.Value).AsImmutable())));
- private protected void UpdateBeforeImages(InterfaceToUpdatesMap interfaceToUpdatesMap)
- => this._grainIndexes.UpdateBeforeImages(interfaceToUpdatesMap);
+ protected void UpdateBeforeImages(InterfaceToUpdatesMap interfaceToUpdatesMap)
+ => this.grainIndexes.UpdateBeforeImages(interfaceToUpdatesMap);
///
/// Find the corresponding workflow queue for a given grain interface type that the current IndexableGrain implements
@@ -157,9 +151,9 @@ internal IIndexWorkflowQueue GetWorkflowQueue(Type grainInterfaceType)
this.WorkflowQueues = new Dictionary();
}
- return this.WorkflowQueues.GetOrAdd(grainInterfaceType,
- () => IndexWorkflowQueueBase.GetIndexWorkflowQueueFromGrainHashCode(this.SiloIndexManager, grainInterfaceType,
- this.grain.AsReference(this.SiloIndexManager, grainInterfaceType).GetHashCode(), this.BaseSiloAddress));
+ return this.WorkflowQueues.GetOrAdd(
+ grainInterfaceType,
+ () => IndexWorkflowQueueBase.GetIndexWorkflowQueueFromGrainHashCode(this.SiloIndexManager, grainInterfaceType, this.grain.AsReference(this.SiloIndexManager, grainInterfaceType).GetHashCode(), this.BaseSiloAddress));
}
}
}
diff --git a/src/Orleans.Indexing/Facet/Infrastructure/IndexedStateAttributeMapperBase.cs b/src/Orleans.Indexing/Facet/Infrastructure/IndexedStateAttributeMapperBase.cs
index 03b24f1..a0c6257 100644
--- a/src/Orleans.Indexing/Facet/Infrastructure/IndexedStateAttributeMapperBase.cs
+++ b/src/Orleans.Indexing/Facet/Infrastructure/IndexedStateAttributeMapperBase.cs
@@ -6,7 +6,7 @@ namespace Orleans.Indexing.Facet
{
abstract class IndexedStateAttributeMapperBase
{
- public Factory GetFactory(MethodInfo creator, ParameterInfo parameter, IIndexedStateConfiguration indexingConfig)
+ public Factory GetFactory(MethodInfo creator, ParameterInfo parameter, IIndexedStateConfiguration indexingConfig)
{
// Use generic type args to specialize the generic method and create the factory lambda.
var genericCreate = creator.MakeGenericMethod(parameter.ParameterType.GetGenericArguments());
@@ -14,7 +14,7 @@ public Factory GetFactory(MethodInfo creator, P
return context => this.Create(context, genericCreate, args);
}
- private object Create(IGrainActivationContext context, MethodInfo genericCreate, object[] args)
+ private object Create(IGrainContext context, MethodInfo genericCreate, object[] args)
{
var factory = context.ActivationServices.GetRequiredService();
return genericCreate.Invoke(factory, args);
diff --git a/src/Orleans.Indexing/Facet/Infrastructure/IndexedStateFactory.cs b/src/Orleans.Indexing/Facet/Infrastructure/IndexedStateFactory.cs
index d0454c0..49ac165 100644
--- a/src/Orleans.Indexing/Facet/Infrastructure/IndexedStateFactory.cs
+++ b/src/Orleans.Indexing/Facet/Infrastructure/IndexedStateFactory.cs
@@ -1,13 +1,14 @@
using Microsoft.Extensions.DependencyInjection;
using Orleans.Runtime;
+using Orleans.Serialization.TypeSystem;
namespace Orleans.Indexing.Facet
{
public class IndexedStateFactory : IIndexedStateFactory
{
- private readonly IGrainActivationContext activationContext;
+ private readonly IGrainContext activationContext;
- public IndexedStateFactory(IGrainActivationContext activationContext, ITypeResolver typeResolver, IGrainFactory grainFactory)
+ public IndexedStateFactory(IGrainContext activationContext, TypeResolver typeResolver, IGrainFactory grainFactory)
=> this.activationContext = activationContext;
public INonFaultTolerantWorkflowIndexedState CreateNonFaultTolerantWorkflowIndexedState(IIndexedStateConfiguration config)
@@ -22,7 +23,7 @@ public ITransactionalIndexedState CreateTransactionalIndexedState this.CreateIndexedState>(config);
- private TWrappedIndexedStateImplementation CreateIndexedState(IIndexedStateConfiguration config)
+ TWrappedIndexedStateImplementation CreateIndexedState(IIndexedStateConfiguration config)
where TWrappedIndexedStateImplementation : ILifecycleParticipant
{
var indexedState = ActivatorUtilities.CreateInstance(this.activationContext.ActivationServices, config);
diff --git a/src/Orleans.Indexing/Hosting/ClientBuilderExtensions.cs b/src/Orleans.Indexing/Hosting/ClientBuilderExtensions.cs
index a01dd06..403297d 100644
--- a/src/Orleans.Indexing/Hosting/ClientBuilderExtensions.cs
+++ b/src/Orleans.Indexing/Hosting/ClientBuilderExtensions.cs
@@ -19,11 +19,11 @@ public static IClientBuilder UseIndexing(this IClientBuilder builder, Action
public static IClientBuilder UseIndexing(this IClientBuilder builder, Action> configureAction = null)
{
- return builder
- .ConfigureDefaults()
- .AddSimpleMessageStreamProvider(IndexingConstants.INDEXING_STREAM_PROVIDER_NAME)
- .ConfigureServices(services => services.UseIndexing(configureAction))
- .ConfigureApplicationParts(parts => parts.AddFrameworkPart(typeof(SiloBuilderExtensions).Assembly));
+ return builder;
+ //.ConfigureDefaults()
+ //.AddSimpleMessageStreamProvider(IndexingConstants.INDEXING_STREAM_PROVIDER_NAME)
+ //.ConfigureServices(services => services.UseIndexing(configureAction))
+ //.ConfigureApplicationParts(parts => parts.AddFrameworkPart(typeof(SiloBuilderExtensions).Assembly));
}
///
diff --git a/src/Orleans.Indexing/Hosting/IndexManager.cs b/src/Orleans.Indexing/Hosting/IndexManager.cs
index f6dd0b6..125e9c7 100644
--- a/src/Orleans.Indexing/Hosting/IndexManager.cs
+++ b/src/Orleans.Indexing/Hosting/IndexManager.cs
@@ -1,10 +1,11 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
-using Orleans.ApplicationParts;
+//using Orleans.ApplicationParts;
using Orleans.Runtime;
using System;
using System.Threading;
using System.Threading.Tasks;
+using Orleans.Serialization.TypeSystem;
namespace Orleans.Indexing
{
@@ -13,9 +14,7 @@ namespace Orleans.Indexing
///
internal class IndexManager : ILifecycleParticipant
{
- internal IApplicationPartManager ApplicationPartManager;
-
- internal ITypeResolver CachedTypeResolver { get; }
+ internal TypeResolver CachedTypeResolver { get; }
internal IndexRegistry IndexRegistry { get; private set; }
@@ -27,15 +26,16 @@ internal class IndexManager : ILifecycleParticipant
// Note: For similar reasons as SiloIndexManager.__silo, __indexFactory relies on 'this' to have returned from its ctor.
internal IndexFactory IndexFactory => this.__indexFactory ?? (__indexFactory = this.ServiceProvider.GetRequiredService());
- private IndexFactory __indexFactory;
+ IndexFactory __indexFactory;
internal ILoggerFactory LoggerFactory { get; }
- public IndexManager(IServiceProvider sp, IGrainFactory gf, IApplicationPartManager apm, ILoggerFactory lf, ITypeResolver typeResolver)
+ public IndexManager(IServiceProvider sp, IGrainFactory gf, ILoggerFactory lf, TypeResolver typeResolver)
{
+
+
this.ServiceProvider = sp;
this.GrainFactory = gf;
- this.ApplicationPartManager = apm;
this.LoggerFactory = lf;
this.CachedTypeResolver = typeResolver;
@@ -44,7 +44,7 @@ public IndexManager(IServiceProvider sp, IGrainFactory gf, IApplicationPartManag
public void Participate(IClusterClientLifecycle lifecycle)
{
- if (!(this is SiloIndexManager))
+ if (this is not SiloIndexManager)
{
lifecycle.Subscribe(this.GetType().FullName, ServiceLifecycleStage.ApplicationServices, ct => this.OnStartAsync(ct), ct => this.OnStopAsync(ct));
}
diff --git a/src/Orleans.Indexing/Hosting/SiloBuilderExtensions.cs b/src/Orleans.Indexing/Hosting/SiloBuilderExtensions.cs
index bb74858..1bff5f3 100644
--- a/src/Orleans.Indexing/Hosting/SiloBuilderExtensions.cs
+++ b/src/Orleans.Indexing/Hosting/SiloBuilderExtensions.cs
@@ -14,21 +14,21 @@ public static class SiloBuilderExtensions
///
/// Configure silo to use indexing using a configure action.
///
- public static ISiloHostBuilder UseIndexing(this ISiloHostBuilder builder, Action configureOptions = null)
+ public static ISiloBuilder UseIndexing(this ISiloBuilder builder, Action configureOptions = null)
{
// This is necessary to get the configured NumWorkflowQueuesPerInterface for IndexFactory.RegisterIndexWorkflowQueueGrainServices.
var indexingOptions = new IndexingOptions();
configureOptions?.Invoke(indexingOptions);
return builder
- .ConfigureDefaults()
- .AddSimpleMessageStreamProvider(IndexingConstants.INDEXING_STREAM_PROVIDER_NAME)
- .AddMemoryGrainStorage(IndexingConstants.INDEXING_WORKFLOWQUEUE_STORAGE_PROVIDER_NAME)
- .AddMemoryGrainStorage(IndexingConstants.INDEXING_STORAGE_PROVIDER_NAME)
- .AddMemoryGrainStorage(IndexingConstants.MEMORY_STORAGE_PROVIDER_NAME)
- .ConfigureApplicationParts(parts => parts.AddFrameworkPart(typeof(SiloBuilderExtensions).Assembly))
+ //.ConfigureDefaults()
+ //.AddSimpleMessageStreamProvider(IndexingConstants.INDEXING_STREAM_PROVIDER_NAME)
+ //.AddMemoryGrainStorage(IndexingConstants.INDEXING_WORKFLOWQUEUE_STORAGE_PROVIDER_NAME)
+ //.AddMemoryGrainStorage(IndexingConstants.INDEXING_STORAGE_PROVIDER_NAME)
+ //.AddMemoryGrainStorage(IndexingConstants.MEMORY_STORAGE_PROVIDER_NAME)
+ //.ConfigureApplicationParts(parts => parts.AddFrameworkPart(typeof(SiloBuilderExtensions).Assembly))
.ConfigureServices(services => services.UseIndexing(indexingOptions))
- .ConfigureServices((context, services) => ApplicationPartsIndexableGrainLoader.RegisterGrainServices(context, services, indexingOptions))
+ //.ConfigureServices((context, services) => ApplicationPartsIndexableGrainLoader.RegisterGrainServices(context, services, indexingOptions))
.UseTransactions();
}
@@ -41,8 +41,10 @@ private static IServiceCollection UseIndexing(this IServiceCollection services,
services.AddSingleton()
.AddFromExisting();
+
services.AddSingleton()
.AddFromExisting, SiloIndexManager>();
+
services.AddFromExisting();
// Facet Factory and Mappers
@@ -57,7 +59,7 @@ private static IServiceCollection UseIndexing(this IServiceCollection services,
}
internal static void AddGrainService(this IServiceCollection services, Func creationFunc)
- => services.AddSingleton(sp => creationFunc(sp));
+ => services.AddSingleton(creationFunc);
///
/// Registers an existing registration of as a provider of service type .
diff --git a/src/Orleans.Indexing/Hosting/SiloIndexManager.cs b/src/Orleans.Indexing/Hosting/SiloIndexManager.cs
index 813c5d3..8071de2 100644
--- a/src/Orleans.Indexing/Hosting/SiloIndexManager.cs
+++ b/src/Orleans.Indexing/Hosting/SiloIndexManager.cs
@@ -1,6 +1,6 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
-using Orleans.ApplicationParts;
+//using Orleans.ApplicationParts;
using Orleans.Core;
using Orleans.Indexing.TestInjection;
using Orleans.Runtime;
@@ -8,9 +8,17 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
+using Orleans.GrainReferences;
+using Orleans.Runtime.Services;
+using Orleans.Serialization.TypeSystem;
namespace Orleans.Indexing
{
+ internal class IndexingGrainServiceClient(IServiceProvider sp) : GrainServiceClient(sp) where TGrainService : IGrainService
+ {
+ public TGrainService GetGrainServicePublic(SiloAddress destination) => base.GetGrainService(destination);
+ }
+
///
/// This class is instantiated internally only in the Silo.
///
@@ -20,24 +28,29 @@ class SiloIndexManager : IndexManager, ILifecycleParticipant
// Note: this.Silo must not be called until the Silo ctor has returned to the ServiceProvider which then
// sets the Singleton; if called during the Silo ctor, the Singleton is not found so another Silo is
- // constructed. Thus we cannot have the Silo on the IndexManager ctor params or retrieve it during
+ // constructed. Thus, we cannot have the Silo on the IndexManager ctor params or retrieve it during
// IndexManager ctor, because ISiloLifecycle participants are constructed during the Silo ctor.
internal Silo Silo => _silo ?? (_silo = this.ServiceProvider.GetRequiredService());
- private Silo _silo;
+ Silo _silo;
internal IInjectableCode InjectableCode { get; }
internal IGrainReferenceRuntime GrainReferenceRuntime { get; }
internal IGrainServiceFactory GrainServiceFactory { get; }
-
- public SiloIndexManager(IServiceProvider sp, IGrainFactory gf, IApplicationPartManager apm, ILoggerFactory lf, ITypeResolver tr)
- : base(sp, gf, apm, lf, tr)
+ internal GrainServiceClient GrainServiceClient { get; }
+
+ internal IGrainReferenceActivator GrainReferenceActivator =>
+ base.ServiceProvider.GetRequiredService();
+
+ public SiloIndexManager(IServiceProvider sp, IGrainFactory gf, ILoggerFactory lf, TypeResolver tr)
+ : base(sp, gf, lf, tr)
{
this.InjectableCode = this.ServiceProvider.GetService() ?? new ProductionInjectableCode();
this.GrainReferenceRuntime = this.ServiceProvider.GetRequiredService();
this.GrainServiceFactory = this.ServiceProvider.GetRequiredService();
+ this.GrainServiceClient = sp.GetRequiredService>();
}
public void Participate(ISiloLifecycle lifecycle)
@@ -48,13 +61,16 @@ public void Participate(ISiloLifecycle lifecycle)
internal Task> GetSiloHosts(bool onlyActive = false)
=> this.GrainFactory.GetGrain(0).GetHosts(onlyActive);
- public GrainReference MakeGrainServiceGrainReference(int typeData, string systemGrainId, SiloAddress siloAddress)
- => GrainServiceFactory.MakeGrainServiceReference(typeData, systemGrainId, siloAddress);
+ public GrainReference MakeGrainServiceGrainReference(int typeData, string systemGrainId, SiloAddress siloAddress) =>
+ GrainReferenceActivator.CreateReference(SystemTargetGrainId.CreateGrainServiceGrainId(typeData, systemGrainId, siloAddress));
+
internal T GetGrainService(GrainReference grainReference) where T : IGrainService
- => GrainServiceFactory.CastToGrainServiceReference(grainReference);
+ => this.GrainServiceFactory.CastToGrainServiceReference(grainReference);
internal IStorage GetStorageBridge(Grain grain, string storageName) where TGrainState : class, new()
- => new StateStorageBridge(grain.GetType().FullName, grain.GrainReference, IndexUtils.GetGrainStorage(this.ServiceProvider, storageName), this.LoggerFactory);
+ => new StateStorageBridge(grain.GetType().FullName!, grain.GrainContext, IndexUtils.GetGrainStorage(this.ServiceProvider, storageName));
+
+
}
}
diff --git a/src/Orleans.Indexing/Indexes/ActiveIndexes/ActiveHashIndexPartitionedPerSiloBucketImplGrainService.cs b/src/Orleans.Indexing/Indexes/ActiveIndexes/ActiveHashIndexPartitionedPerSiloBucketImplGrainService.cs
index a9440b8..5f906f3 100644
--- a/src/Orleans.Indexing/Indexes/ActiveIndexes/ActiveHashIndexPartitionedPerSiloBucketImplGrainService.cs
+++ b/src/Orleans.Indexing/Indexes/ActiveIndexes/ActiveHashIndexPartitionedPerSiloBucketImplGrainService.cs
@@ -16,20 +16,20 @@ namespace Orleans.Indexing
{
///
/// A simple implementation of a single-grain in-memory hash-index.
- ///
+ ///
/// TODO: Generic GrainServices are not supported yet, and that's why the implementation is non-generic.
+ /// Per comments for , we cannot use generics here.
///
- //Per comments for IActiveHashIndexPartitionedPerSiloBucket, we cannot use generics here.
- //type of hash-index key
- //type of grain that is being indexed
+ ///// type of hash-index key
+ ///// type of grain that is being indexed
[StorageProvider(ProviderName = IndexingConstants.MEMORY_STORAGE_PROVIDER_NAME)]
[Reentrant]
internal class ActiveHashIndexPartitionedPerSiloBucketImplGrainService/**/ : GrainService, IActiveHashIndexPartitionedPerSiloBucket/* where V : IIndexableGrain*/
{
- private HashIndexBucketState state;
- private readonly SiloIndexManager siloIndexManager;
- private readonly ILogger logger;
- private readonly string _parentIndexName;
+ HashIndexBucketState state;
+ readonly SiloIndexManager siloIndexManager;
+ readonly ILogger logger;
+ readonly string _parentIndexName;
public ActiveHashIndexPartitionedPerSiloBucketImplGrainService(SiloIndexManager siloIndexManager, Type grainInterfaceType, string parentIndexName)
: base(GetGrainIdentity(siloIndexManager, grainInterfaceType, parentIndexName), siloIndexManager.Silo, siloIndexManager.LoggerFactory)
@@ -45,10 +45,10 @@ public ActiveHashIndexPartitionedPerSiloBucketImplGrainService(SiloIndexManager
this.logger = siloIndexManager.LoggerFactory.CreateLoggerWithFullCategoryName();
}
- private static IGrainIdentity GetGrainIdentity(SiloIndexManager siloIndexManager, Type grainInterfaceType, string indexName)
- => GetGrainReference(siloIndexManager, grainInterfaceType, indexName).GrainIdentity;
+ static GrainId GetGrainIdentity(SiloIndexManager siloIndexManager, Type grainInterfaceType, string indexName)
+ => GetGrainReference(siloIndexManager, grainInterfaceType, indexName).GrainId;
- internal static GrainReference GetGrainReference(SiloIndexManager siloIndexManager, Type grainInterfaceType, string indexName, SiloAddress siloAddress = null)
+ static GrainReference GetGrainReference(SiloIndexManager siloIndexManager, Type grainInterfaceType, string indexName, SiloAddress siloAddress = null)
=> siloIndexManager.MakeGrainServiceGrainReference(IndexingConstants.HASH_INDEX_PARTITIONED_PER_SILO_BUCKET_GRAIN_SERVICE_TYPE_CODE,
IndexUtils.GetIndexGrainPrimaryKey(grainInterfaceType, indexName),
siloAddress ?? siloIndexManager.SiloAddress);
@@ -59,20 +59,20 @@ internal static void RegisterGrainService(IServiceCollection services, Type grai
public async Task DirectApplyIndexUpdateBatch(Immutable>> iUpdates, bool isUnique, IndexMetaData idxMetaData, SiloAddress siloAddress = null)
{
- logger.Trace($"ParentIndex {_parentIndexName}: Started calling DirectApplyIndexUpdateBatch with the following parameters: isUnique = {isUnique}," +
- $" siloAddress = {siloAddress}, iUpdates = {MemberUpdate.UpdatesToString(iUpdates.Value)}", isUnique, siloAddress);
+ logger.Trace(IndexingErrorCode.Indexing, $"ParentIndex {_parentIndexName}: Started calling DirectApplyIndexUpdateBatch with the following parameters: isUnique = {isUnique}," +
+ $" siloAddress = {siloAddress}, iUpdates = {MemberUpdate.UpdatesToString(iUpdates.Value)}", isUnique, siloAddress);
await Task.WhenAll(iUpdates.Value.Select(kvp => DirectApplyIndexUpdates(kvp.Key, kvp.Value, isUnique, idxMetaData, siloAddress)));
- logger.Trace($"Finished calling DirectApplyIndexUpdateBatch with the following parameters: isUnique = {isUnique}, siloAddress = {siloAddress}," +
- $" iUpdates = {MemberUpdate.UpdatesToString(iUpdates.Value)}");
+ logger.Trace(IndexingErrorCode.Indexing, $"Finished calling DirectApplyIndexUpdateBatch with the following parameters: isUnique = {isUnique}, siloAddress = {siloAddress}," +
+ $" iUpdates = {MemberUpdate.UpdatesToString(iUpdates.Value)}");
return true;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- private async Task DirectApplyIndexUpdates(IIndexableGrain g, IList updates, bool isUniqueIndex, IndexMetaData idxMetaData, SiloAddress siloAddress)
+ async Task DirectApplyIndexUpdates(IIndexableGrain g, IList updates, bool isUniqueIndex, IndexMetaData idxMetaData, SiloAddress siloAddress)
{
- foreach (IMemberUpdate updt in updates)
+ foreach (var updt in updates)
{
await DirectApplyIndexUpdate(g, updt, isUniqueIndex, idxMetaData, siloAddress);
}
@@ -82,12 +82,12 @@ public Task DirectApplyIndexUpdate(V updatedGrain, Immutable this.DirectApplyIndexUpdate(updatedGrain, iUpdate.Value, isUniqueIndex, idxMetaData, siloAddress);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- private Task DirectApplyIndexUpdate(V updatedGrain, IMemberUpdate updt, bool isUniqueIndex, IndexMetaData idxMetaData, SiloAddress siloAddress)
+ Task DirectApplyIndexUpdate(V updatedGrain, IMemberUpdate updt, bool isUniqueIndex, IndexMetaData idxMetaData, SiloAddress siloAddress)
// Updates the index bucket synchronously (note that no other thread can run concurrently before we reach an await operation,
// when execution is yielded back to the Orleans scheduler, so no concurrency control mechanism (e.g., locking) is required).
=> Task.FromResult(HashIndexBucketUtils.UpdateBucketState(updatedGrain, updt, state, isUniqueIndex, idxMetaData));
- private Exception LogException(string message, IndexingErrorCode errorCode)
+ Exception LogException(string message, IndexingErrorCode errorCode)
{
var e = new Exception(message);
this.logger.Error(errorCode, message, e);
@@ -142,7 +142,7 @@ public Task LookupUniqueAsync(K key)
IndexingErrorCode.IndexingIndexIsNotReadyYet_GrainServiceBucket5);
}
- public Task Dispose()
+ public new Task Dispose()
{
state.IndexStatus = IndexStatus.Disposed;
state.IndexMap.Clear();
diff --git a/src/Orleans.Indexing/Indexes/ActiveIndexes/ActiveHashIndexPartitionedPerSiloImpl.cs b/src/Orleans.Indexing/Indexes/ActiveIndexes/ActiveHashIndexPartitionedPerSiloImpl.cs
index aa21d86..ea2d5d5 100644
--- a/src/Orleans.Indexing/Indexes/ActiveIndexes/ActiveHashIndexPartitionedPerSiloImpl.cs
+++ b/src/Orleans.Indexing/Indexes/ActiveIndexes/ActiveHashIndexPartitionedPerSiloImpl.cs
@@ -1,137 +1,140 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Threading.Tasks;
-using Microsoft.Extensions.Logging;
-using Orleans.Concurrency;
-using Orleans.Runtime;
-
-namespace Orleans.Indexing
-{
- ///
- /// A simple implementation of a single-grain in-memory hash-index
- ///
- /// type of hash-index key
- /// type of grain that is being indexed
- [Reentrant]
- //[StatelessWorker]
- //TODO: because of a bug in OrleansStreams, this grain cannot be StatelessWorker. It should be fixed later. TODO which bug?
- //TODO: basically, this class does not even need to be a grain, but it's not possible to call a GrainService from a non-grain
- public class ActiveHashIndexPartitionedPerSiloImpl : Grain, IActiveHashIndexPartitionedPerSilo where V : class, IIndexableGrain
- {
- private IndexStatus _status;
-
- // IndexManager (and therefore logger) cannot be set in ctor because Grain activation has not yet set base.Runtime.
- internal SiloIndexManager SiloIndexManager => IndexManager.GetSiloIndexManager(ref __siloIndexManager, base.ServiceProvider);
- private SiloIndexManager __siloIndexManager;
-
- private ILogger Logger => __logger ?? (__logger = this.SiloIndexManager.LoggerFactory.CreateLoggerWithFullCategoryName