Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

orleans 8 upgrade wip #5

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 90 additions & 76 deletions src/Orleans.Indexing/Core/FaultTolerance/IndexWorkflowQueueBase.cs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,25 @@ namespace Orleans.Indexing
/// the following information:
/// - workflowID: grainID + a sequence number
/// - memberUpdates: the updated values of indexed fields
///
///
/// Ordinarily, these workflowRecords are for grains that are active on <see cref="IndexWorkflowQueueGrainService"/>'s silo. (This may not be true for
/// short periods when a grain migrates to another silo or after the silo recovers from failure).
///
///
/// + The <see cref="IndexWorkflowQueueGrainService"/> grain Q has a dictionary updatesOnWait is an in-memory dictionary that maps each grain G to the
/// workflowRecords for G that are waiting for be updated.
/// </summary>
[StorageProvider(ProviderName = IndexingConstants.INDEXING_WORKFLOWQUEUE_STORAGE_PROVIDER_NAME)]
[Reentrant]
internal class IndexWorkflowQueueGrainService : GrainService, IIndexWorkflowQueue
{
private IndexWorkflowQueueBase _base;
IndexWorkflowQueueBase _base;

internal IndexWorkflowQueueGrainService(SiloIndexManager siloIndexManager, Type grainInterfaceType, int queueSequenceNumber, bool isDefinedAsFaultTolerantGrain)
: base(IndexWorkflowQueueBase.CreateIndexWorkflowQueueGrainReference(siloIndexManager, grainInterfaceType, queueSequenceNumber, siloIndexManager.SiloAddress).GrainIdentity,
: base(IndexWorkflowQueueBase.CreateIndexWorkflowQueueGrainReference(siloIndexManager, grainInterfaceType, queueSequenceNumber, siloIndexManager.SiloAddress).GrainId,
siloIndexManager.Silo, siloIndexManager.LoggerFactory)
{
_base = new IndexWorkflowQueueBase(siloIndexManager, grainInterfaceType, queueSequenceNumber, siloIndexManager.SiloAddress, isDefinedAsFaultTolerantGrain,
() => base.GetGrainReference()); // lazy is needed because the runtime isn't attached until Registered
() => base.GrainReference); // lazy is needed because the runtime isn't attached until Registered
}

public Task AddAllToQueue(Immutable<List<IndexWorkflowRecord>> workflowRecords)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,26 @@ namespace Orleans.Indexing
{
internal class IndexWorkflowQueueHandlerBase : IIndexWorkflowQueueHandler
{
private IIndexWorkflowQueue __workflowQueue;
private IIndexWorkflowQueue WorkflowQueue => __workflowQueue ?? InitIndexWorkflowQueue();
IIndexWorkflowQueue __workflowQueue;
IIndexWorkflowQueue WorkflowQueue => __workflowQueue ?? InitIndexWorkflowQueue();

private int _queueSeqNum;
private Type _grainInterfaceType;
int _queueSeqNum;
Type _grainInterfaceType;

private bool _isDefinedAsFaultTolerantGrain;
private bool _hasAnyTotalIndex;
private bool HasAnyTotalIndex { get { EnsureGrainIndexes(); return _hasAnyTotalIndex; } }
private bool IsFaultTolerant => _isDefinedAsFaultTolerantGrain && HasAnyTotalIndex;
bool _isDefinedAsFaultTolerantGrain;
bool _hasAnyTotalIndex;
bool HasAnyTotalIndex { get { EnsureGrainIndexes(); return _hasAnyTotalIndex; } }
bool IsFaultTolerant => _isDefinedAsFaultTolerantGrain && HasAnyTotalIndex;

private NamedIndexMap __grainIndexes;
NamedIndexMap __grainIndexes;

private NamedIndexMap GrainIndexes => EnsureGrainIndexes();
NamedIndexMap GrainIndexes => EnsureGrainIndexes();

private SiloAddress _silo;
private SiloIndexManager _siloIndexManager;
private Lazy<GrainReference> _lazyParent;
SiloAddress _silo;
SiloIndexManager _siloIndexManager;
Lazy<GrainReference> _lazyParent;

internal IndexWorkflowQueueHandlerBase(SiloIndexManager siloIndexManager, Type grainInterfaceType, int queueSeqNum, SiloAddress silo,
bool isDefinedAsFaultTolerantGrain, Func<GrainReference> parentFunc)
internal IndexWorkflowQueueHandlerBase(SiloIndexManager siloIndexManager, Type grainInterfaceType, int queueSeqNum, SiloAddress silo, bool isDefinedAsFaultTolerantGrain, Func<GrainReference> parentFunc)
{
_grainInterfaceType = grainInterfaceType;
_queueSeqNum = queueSeqNum;
Expand All @@ -46,43 +45,42 @@ public async Task HandleWorkflowsUntilPunctuation(Immutable<IndexWorkflowRecordN
{
try
{
for (var workflowNode = workflowRecords.Value; workflowNode != null; workflowNode = (await this.WorkflowQueue.GiveMoreWorkflowsOrSetAsIdle()).Value)
for (var workflowNode = workflowRecords.Value; workflowNode != null; workflowNode = (await WorkflowQueue.GiveMoreWorkflowsOrSetAsIdle()).Value)
{
var grainsToActiveWorkflows = IsFaultTolerant ? await this.FtGetActiveWorkflowSetsFromGrains(workflowNode) : emptyDictionary;
var grainsToActiveWorkflows = IsFaultTolerant ? await GetActiveWorkflowSetsFromGrains(workflowNode) : emptyDictionary;
var updatesToIndexes = this.PopulateUpdatesToIndexes(workflowNode, grainsToActiveWorkflows);
await Task.WhenAll(PrepareIndexUpdateTasks(updatesToIndexes));
if (this.IsFaultTolerant)
if (IsFaultTolerant)
{
Task.WhenAll(this.FtRemoveFromActiveWorkflowsInGrainsTasks(grainsToActiveWorkflows)).Ignore();
Task.WhenAll(FtRemoveFromActiveWorkflowsInGrainsTasks(grainsToActiveWorkflows)).Ignore();
}
}
}
catch (Exception e)
catch (Exception)
{
throw e; // TODO empty handler; add logic or remove
//throw e; // TODO empty handler; add logic or remove
throw;
}
}

private IEnumerable<Task> FtRemoveFromActiveWorkflowsInGrainsTasks(Dictionary<IIndexableGrain, HashSet<Guid>> grainsToActiveWorkflows)
IEnumerable<Task> FtRemoveFromActiveWorkflowsInGrainsTasks(Dictionary<IIndexableGrain, HashSet<Guid>> grainsToActiveWorkflows)
=> grainsToActiveWorkflows.Select(kvp => kvp.Key.RemoveFromActiveWorkflowIds(kvp.Value));

private IEnumerable<Task<bool>> PrepareIndexUpdateTasks(Dictionary<string, IDictionary<IIndexableGrain, IList<IMemberUpdate>>> updatesToIndexes)
IEnumerable<Task<bool>> PrepareIndexUpdateTasks(Dictionary<string, IDictionary<IIndexableGrain, IList<IMemberUpdate>>> updatesToIndexes)
=> updatesToIndexes.Select(updt => (indexInfo: this.GrainIndexes[updt.Key], updatesToIndex: updt.Value))
.Where(pair => pair.updatesToIndex.Count > 0)
.Select(pair => pair.indexInfo.IndexInterface.ApplyIndexUpdateBatch(this._siloIndexManager, pair.updatesToIndex.AsImmutable(),
pair.indexInfo.MetaData.IsUniqueIndex, pair.indexInfo.MetaData, _silo));

private Dictionary<string, IDictionary<IIndexableGrain, IList<IMemberUpdate>>> PopulateUpdatesToIndexes(
IndexWorkflowRecordNode currentWorkflow, Dictionary<IIndexableGrain, HashSet<Guid>> grainsToActiveWorkflows)
Dictionary<string, IDictionary<IIndexableGrain, IList<IMemberUpdate>>> PopulateUpdatesToIndexes(IndexWorkflowRecordNode currentWorkflow, Dictionary<IIndexableGrain, HashSet<Guid>> grainsToActiveWorkflows)
{
var updatesToIndexes = new Dictionary<string, IDictionary<IIndexableGrain, IList<IMemberUpdate>>>();
bool faultTolerant = IsFaultTolerant;
var faultTolerant = IsFaultTolerant;
for (; !currentWorkflow.IsPunctuation; currentWorkflow = currentWorkflow.Next)
{
IndexWorkflowRecord workflowRec = currentWorkflow.WorkflowRecord;
IIndexableGrain g = workflowRec.Grain;
bool existsInActiveWorkflows = faultTolerant && grainsToActiveWorkflows.TryGetValue(g, out HashSet<Guid> activeWorkflowRecs)
&& activeWorkflowRecs.Contains(workflowRec.WorkflowId);
var workflowRec = currentWorkflow.WorkflowRecord;
var g = workflowRec.Grain;
var existsInActiveWorkflows = faultTolerant && grainsToActiveWorkflows.TryGetValue(g, out var activeWorkflowRecs) && activeWorkflowRecs.Contains(workflowRec.WorkflowId);

foreach (var (indexName, updt) in currentWorkflow.WorkflowRecord.MemberUpdates.Where(kvp => kvp.Value.OperationType != IndexOperationType.None))
{
Expand All @@ -104,10 +102,10 @@ private Dictionary<string, IDictionary<IIndexableGrain, IList<IMemberUpdate>>> P
return updatesToIndexes;
}

private static HashSet<Guid> emptyHashset = new HashSet<Guid>();
private static Dictionary<IIndexableGrain, HashSet<Guid>> emptyDictionary = new Dictionary<IIndexableGrain, HashSet<Guid>>();
static readonly HashSet<Guid> emptyHashset = new();
static readonly Dictionary<IIndexableGrain, HashSet<Guid>> emptyDictionary = new();

private async Task<Dictionary<IIndexableGrain, HashSet<Guid>>> FtGetActiveWorkflowSetsFromGrains(IndexWorkflowRecordNode currentWorkflow)
async Task<Dictionary<IIndexableGrain, HashSet<Guid>>> GetActiveWorkflowSetsFromGrains(IndexWorkflowRecordNode currentWorkflow)
{
var activeWorkflowSetTasksByGrain = new Dictionary<IIndexableGrain, Task<Immutable<HashSet<Guid>>>>();
var currentWorkflowIds = new HashSet<Guid>();
Expand All @@ -116,10 +114,9 @@ private async Task<Dictionary<IIndexableGrain, HashSet<Guid>>> FtGetActiveWorkfl
{
var record = currentWorkflow.WorkflowRecord;
currentWorkflowIds.Add(record.WorkflowId);
IIndexableGrain g = record.Grain;
if (!activeWorkflowSetTasksByGrain.ContainsKey(g) && record.MemberUpdates.Where(ups => ups.Value.OperationType != IndexOperationType.None).Any())
if (!activeWorkflowSetTasksByGrain.ContainsKey(record.Grain) && record.MemberUpdates.Any(ups => ups.Value.OperationType != IndexOperationType.None))
{
activeWorkflowSetTasksByGrain[g] = g.AsReference<IIndexableGrain>(this._siloIndexManager, this._grainInterfaceType).GetActiveWorkflowIdsSet();
activeWorkflowSetTasksByGrain[record.Grain] = record.Grain.AsReference<IIndexableGrain>(this._siloIndexManager, this._grainInterfaceType).GetActiveWorkflowIdsSet();
}
}

Expand All @@ -135,7 +132,7 @@ private async Task<Dictionary<IIndexableGrain, HashSet<Guid>>> FtGetActiveWorkfl
return new Dictionary<IIndexableGrain, HashSet<Guid>>();
}

private NamedIndexMap EnsureGrainIndexes()
NamedIndexMap EnsureGrainIndexes()
{
if (__grainIndexes == null)
{
Expand All @@ -146,14 +143,15 @@ private NamedIndexMap EnsureGrainIndexes()
}

// TODO clean up some of the duplicated id-generation code.
private IIndexWorkflowQueue InitIndexWorkflowQueue()
=> __workflowQueue = _lazyParent.Value.IsGrainService
IIndexWorkflowQueue InitIndexWorkflowQueue()
=> __workflowQueue = _lazyParent.Value.GrainId.IsSystemTarget()
? _siloIndexManager.GetGrainService<IIndexWorkflowQueue>(IndexWorkflowQueueBase.CreateIndexWorkflowQueueGrainReference(_siloIndexManager, _grainInterfaceType, _queueSeqNum, _silo))
: _siloIndexManager.GrainFactory.GetGrain<IIndexWorkflowQueue>(IndexWorkflowQueueBase.CreateIndexWorkflowQueuePrimaryKey(_grainInterfaceType, _queueSeqNum));

public static GrainReference CreateIndexWorkflowQueueHandlerGrainReference(SiloIndexManager siloIndexManager, Type grainInterfaceType, int queueSeqNum, SiloAddress siloAddress)
=> siloIndexManager.MakeGrainServiceGrainReference(IndexingConstants.INDEX_WORKFLOW_QUEUE_HANDLER_GRAIN_SERVICE_TYPE_CODE,
IndexWorkflowQueueBase.CreateIndexWorkflowQueuePrimaryKey(grainInterfaceType, queueSeqNum), siloAddress);
IndexWorkflowQueueBase.CreateIndexWorkflowQueuePrimaryKey(grainInterfaceType, queueSeqNum),
siloAddress);

public Task Initialize(IIndexWorkflowQueue oldParentGrainService)
=> throw new NotSupportedException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@ namespace Orleans.Indexing
[Reentrant]
internal class IndexWorkflowQueueHandlerGrainService : GrainService, IIndexWorkflowQueueHandler
{
private IIndexWorkflowQueueHandler _base;
IIndexWorkflowQueueHandler _base;

internal IndexWorkflowQueueHandlerGrainService(SiloIndexManager sim, Type grainInterfaceType, int queueSeqNum, bool isDefinedAsFaultTolerantGrain)
: base(IndexWorkflowQueueHandlerBase.CreateIndexWorkflowQueueHandlerGrainReference(sim, grainInterfaceType, queueSeqNum, sim.SiloAddress).GrainIdentity,
sim.Silo, sim.LoggerFactory)
: base(IndexWorkflowQueueHandlerBase.CreateIndexWorkflowQueueHandlerGrainReference(sim, grainInterfaceType, queueSeqNum, sim.SiloAddress).GrainId, sim.Silo, sim.LoggerFactory)
{
_base = new IndexWorkflowQueueHandlerBase(sim, grainInterfaceType, queueSeqNum, sim.SiloAddress, isDefinedAsFaultTolerantGrain,
() => base.GetGrainReference()); // lazy is needed because the runtime isn't attached until Registered
_base = new IndexWorkflowQueueHandlerBase(sim, grainInterfaceType, queueSeqNum, sim.SiloAddress, isDefinedAsFaultTolerantGrain, () => base.GrainReference); // lazy is needed because the runtime isn't attached until Registered
}

public Task HandleWorkflowsUntilPunctuation(Immutable<IndexWorkflowRecordNode> workflowRecordsHead)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,16 @@

namespace Orleans.Indexing
{
/// <summary>
/// The persistent unit for storing the information for an <see cref="IndexWorkflowQueueGrainService"/>
/// </summary>
/// <remarks>This requires GrainState instead of using StateStorageBridge, due to having to set the ETag for upsert.</remarks>
[Serializable]
internal class IndexWorkflowQueueState : GrainState<IndexWorkflowQueueEntry>
{
public IndexWorkflowQueueState() : base(new IndexWorkflowQueueEntry())
{
}
}

/// <summary>
/// All the information stored for a single <see cref="IndexWorkflowQueueGrainService"/>
/// </summary>
[Serializable]
[GenerateSerializer]
internal class IndexWorkflowQueueEntry
{
// Updates that must be propagated to indexes.
/// <summary>
/// The sequence of updates to be applied to indices.
/// </summary>
internal IndexWorkflowRecordNode WorkflowRecordsHead;

public IndexWorkflowQueueEntry() => this.WorkflowRecordsHead = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace Orleans.Indexing
{
[Serializable]
[GenerateSerializer]
internal class IndexWorkflowRecord
{
/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ namespace Orleans.Indexing
{
/// <summary>
/// A node in the linked list of workflowRecords.
///
///
/// This linked list makes the traversal more efficient.
/// </summary>
[Serializable]
[GenerateSerializer]
internal class IndexWorkflowRecordNode
{
internal IndexWorkflowRecord WorkflowRecord;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
using Orleans.Runtime;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;

namespace Orleans.Indexing
{
Expand All @@ -15,10 +17,10 @@ internal class ReincarnatedIndexWorkflowQueue : Grain, IIndexWorkflowQueue
internal SiloIndexManager SiloIndexManager => IndexManager.GetSiloIndexManager(ref __siloIndexManager, base.ServiceProvider);
private SiloIndexManager __siloIndexManager;

public override Task OnActivateAsync()
public override Task OnActivateAsync(CancellationToken ct)
{
DelayDeactivation(ACTIVE_FOR_A_DAY);
return base.OnActivateAsync();
return base.OnActivateAsync(ct);
}

public Task Initialize(IIndexWorkflowQueue oldParentGrainService)
Expand All @@ -37,7 +39,7 @@ public Task Initialize(IIndexWorkflowQueue oldParentGrainService)
int queueSequenceNumber = int.Parse(parts[1]);

_base = new IndexWorkflowQueueBase(this.SiloIndexManager, grainInterfaceType, queueSequenceNumber,
oldParentGrainServiceRef.GrainServiceSiloAddress,
this.ServiceProvider.GetRequiredService<IGrainRuntime>().SiloAddress,
isDefinedAsFaultTolerantGrain: true /*otherwise it shouldn't have reached here!*/,
parentFunc: () => this.AsWeaklyTypedReference(), recoveryGrainReference:oldParentGrainServiceRef);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
using Orleans.Concurrency;
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Orleans.Runtime;

namespace Orleans.Indexing
{
[Reentrant]
internal class ReincarnatedIndexWorkflowQueueHandler : Grain, IIndexWorkflowQueueHandler
{
private IIndexWorkflowQueueHandler _base;
IIndexWorkflowQueueHandler _base;

internal SiloIndexManager SiloIndexManager => IndexManager.GetSiloIndexManager(ref __siloIndexManager, base.ServiceProvider);
private SiloIndexManager __siloIndexManager;
SiloIndexManager __siloIndexManager;

public override Task OnActivateAsync()
public override Task OnActivateAsync(CancellationToken ct)
{
DelayDeactivation(ReincarnatedIndexWorkflowQueue.ACTIVE_FOR_A_DAY);
return base.OnActivateAsync();
return base.OnActivateAsync(ct);
}

public Task Initialize(IIndexWorkflowQueue oldParentGrainService)
{
if (_base == null)
if (_base is null)
{
var oldParentGrainServiceRef = oldParentGrainService.AsWeaklyTypedReference();
var parts = oldParentGrainServiceRef.GetPrimaryKeyString().Split('-');
Expand All @@ -30,11 +33,13 @@ public Task Initialize(IIndexWorkflowQueue oldParentGrainService)
" The primary key is '" + oldParentGrainServiceRef.GetPrimaryKeyString() + "'");
}

Type grainInterfaceType = this.SiloIndexManager.CachedTypeResolver.ResolveType(parts[0]);
int queueSequenceNumber = int.Parse(parts[1]);
var grainInterfaceType = this.SiloIndexManager.CachedTypeResolver.ResolveType(parts[0]);
var queueSequenceNumber = int.Parse(parts[1]);

var runtime = ServiceProvider.GetRequiredService<IGrainRuntime>();

_base = new IndexWorkflowQueueHandlerBase(this.SiloIndexManager, grainInterfaceType, queueSequenceNumber,
oldParentGrainServiceRef.GrainServiceSiloAddress,
runtime.SiloAddress,
isDefinedAsFaultTolerantGrain: true /*otherwise it shouldn't have reached here!*/,
() => this.AsWeaklyTypedReference());
}
Expand Down
Loading