Skip to content

Commit

Permalink
Update management of synchronization check (#252)
Browse files Browse the repository at this point in the history
  • Loading branch information
masesdevelopers authored Oct 5, 2023
1 parent f4ab886 commit be86db7
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 11 deletions.
29 changes: 18 additions & 11 deletions src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,10 @@ public interface IKNetCompactedReplicator<TKey, TValue> : IDictionary<TKey, TVal
/// Start this <see cref="KNetCompactedReplicator{TKey, TValue}"/>: create the <see cref="StateName"/> topic if not available, allocates Producer and Consumers, sets serializer/deserializer
/// Then waits its synchronization with <see cref="StateName"/> topic which stores dictionary data
/// </summary>
/// <param name="timeout">The number of milliseconds to wait, or <see cref="Timeout.Infinite"/> to wait indefinitely</param>
/// <returns><see langword="true"/> if the current instance synchronize within the given <paramref name="timeout"/>; otherwise, <see langword="false"/></returns>
/// <exception cref="InvalidOperationException">Some errors occurred or the provided <see cref="AccessRights"/> do not include the <see cref="AccessRightsType.Read"/> flag</exception>
void StartAndWait(int timeout = Timeout.Infinite);
bool StartAndWait(int timeout = Timeout.Infinite);
/// <summary>
/// Waits for all paritions assignment of the <see cref="StateName"/> topic which stores dictionary data
/// </summary>
Expand All @@ -218,7 +220,7 @@ public interface IKNetCompactedReplicator<TKey, TValue> : IDictionary<TKey, TVal
/// <param name="timeout">The number of milliseconds to wait, or <see cref="Timeout.Infinite"/> to wait indefinitely</param>
/// <returns><see langword="true"/> if the current instance synchronize within the given <paramref name="timeout"/>; otherwise, <see langword="false"/></returns>
/// <exception cref="InvalidOperationException">The provided <see cref="AccessRights"/> do not include the <see cref="AccessRightsType.Read"/> flag</exception>
void SyncWait(int timeout = Timeout.Infinite);
bool SyncWait(int timeout = Timeout.Infinite);
/// <summary>
/// Waits until all outstanding produce requests and delivery report callbacks are completed
/// </summary>
Expand Down Expand Up @@ -733,9 +735,10 @@ private void ValidateAccessRights(AccessRightsType rights)
[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
void BuildConsumers()
{
_consumerConfig ??= ConsumerConfigBuilder.Create().WithEnableAutoCommit(true)
.WithAutoOffsetReset(ConsumerConfigBuilder.AutoOffsetResetTypes.EARLIEST)
.WithAllowAutoCreateTopics(false);
_consumerConfig ??= ConsumerConfigBuilder.Create()
.WithEnableAutoCommit(true)
.WithAutoOffsetReset(ConsumerConfigBuilder.AutoOffsetResetTypes.EARLIEST)
.WithAllowAutoCreateTopics(false);

ConsumerConfig.BootstrapServers = BootstrapServers;
ConsumerConfig.GroupId = GroupId;
Expand All @@ -760,7 +763,7 @@ void BuildConsumers()

for (int i = 0; i < Partitions; i++)
{
_lastPartitionLags[i] = -1;
_lastPartitionLags[i] = -2;
_assignmentWaiters[i] = new ManualResetEvent(false);
}

Expand Down Expand Up @@ -925,12 +928,12 @@ public void Start()
}

/// <inheritdoc cref="IKNetCompactedReplicator{TKey, TValue}.StartAndWait(int)"/>
public void StartAndWait(int timeout = Timeout.Infinite)
public bool StartAndWait(int timeout = Timeout.Infinite)
{
ValidateAccessRights(AccessRightsType.Read);
Start();
WaitForStateAssignment(timeout);
SyncWait(timeout);
return SyncWait(timeout);
}

/// <inheritdoc cref="IKNetCompactedReplicator{TKey, TValue}.WaitForStateAssignment(int)"/>
Expand All @@ -941,23 +944,27 @@ public bool WaitForStateAssignment(int timeout = Timeout.Infinite)
}

/// <inheritdoc cref="IKNetCompactedReplicator{TKey, TValue}.SyncWait(int)"/>
public void SyncWait(int timeout = Timeout.Infinite)
public bool SyncWait(int timeout = Timeout.Infinite)
{
ValidateAccessRights(AccessRightsType.Read);
Stopwatch watcher = Stopwatch.StartNew();
bool sync = false;
while (!sync && watcher.ElapsedMilliseconds < (uint)timeout)
{
bool[] syncs = new bool[ConsumersToAllocate()];
for (int i = 0; i < ConsumersToAllocate(); i++)
{
bool lagInSync = true;
foreach (var partitionIndex in _consumerAssociatedPartition[i])
{
lagInSync &= Interlocked.Read(ref _lastPartitionLags[partitionIndex]) == 0 || Interlocked.Read(ref _lastPartitionLags[partitionIndex]) == -1;
var partitionLag = Interlocked.Read(ref _lastPartitionLags[partitionIndex]);
lagInSync &= partitionLag == 0 || partitionLag == -1;
}
sync = _consumers[i].IsEmpty && lagInSync;
syncs[i] = _consumers[i].IsEmpty && lagInSync;
}
sync = syncs.All(x => x == true);
}
return sync;
}

/// <inheritdoc cref="IKNetCompactedReplicator{TKey, TValue}.Flush"/>
Expand Down
25 changes: 25 additions & 0 deletions tests/KNetCompactedReplicatorTest/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ static void Main(string[] args)
sw.Stop();
Console.WriteLine($"End TestOnConsume in {sw.Elapsed}");
sw = Stopwatch.StartNew();
TestOnlyRead("TestOnConsume", 100, UpdateModeTypes.OnConsume | UpdateModeTypes.Delayed, 5);
sw.Stop();
Console.WriteLine($"End TestOnlyRead for TestOnConsume in {sw.Elapsed}");
sw = Stopwatch.StartNew();
Test("TestOnConsumeLessConsumers", 100, UpdateModeTypes.OnConsume | UpdateModeTypes.Delayed, 5, 2);
sw.Stop();
Console.WriteLine($"End TestOnConsume in {sw.Elapsed}");
Expand Down Expand Up @@ -144,5 +148,26 @@ private static void Test(string topicName, int length, UpdateModeTypes type, int
}
}
}

private static void TestOnlyRead(string topicName, int length, UpdateModeTypes type, int partitions, int? consumers = null)
{
using (var replicator = new KNetCompactedReplicator<int, TestType>()
{
Partitions = partitions,
ConsumerInstances = consumers,
UpdateMode = type,
BootstrapServers = serverToUse,
StateName = topicName,
ValueSerDes = new JsonSerDes<TestType>(),
})
{
replicator.StartAndWait();

foreach (var item in replicator)
{
Console.WriteLine($"Key: {item.Key} - Value: {item.Value}");
}
}
}
}
}

0 comments on commit be86db7

Please sign in to comment.