Skip to content

Commit

Permalink
Fix ObjectDisposedException, management of Thread Join in Dispose and…
Browse files Browse the repository at this point in the history
… blocked execution of public methods if the istance was not started (#259)
  • Loading branch information
masesdevelopers authored Oct 10, 2023
1 parent 4800029 commit 222cb42
Showing 1 changed file with 40 additions and 3 deletions.
43 changes: 40 additions & 3 deletions src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,10 @@ private void OnTopicPartitionsAssigned(KNetCompactedConsumerRebalanceListener li
{
_consumerAssociatedPartition[listener.ConsumerIndex].Add(partition);
}
_assignmentWaiters[partition].Set();
if (!_assignmentWaiters[partition].SafeWaitHandle.IsClosed)
{
_assignmentWaiters[partition].Set();
}
}
}

Expand All @@ -640,7 +643,10 @@ private void OnTopicPartitionsRevoked(KNetCompactedConsumerRebalanceListener lis
{
_consumerAssociatedPartition[listener.ConsumerIndex].Remove(partition);
}
_assignmentWaiters[topicPartition.Partition()].Reset();
if (!_assignmentWaiters[partition].SafeWaitHandle.IsClosed)
{
_assignmentWaiters[partition].Reset();
}
}
}

Expand All @@ -653,14 +659,18 @@ private void OnTopicPartitionsLost(KNetCompactedConsumerRebalanceListener listen
{
_consumerAssociatedPartition[listener.ConsumerIndex].Remove(partition);
}
_assignmentWaiters[topicPartition.Partition()].Reset();
if (!_assignmentWaiters[partition].SafeWaitHandle.IsClosed)
{
_assignmentWaiters[partition].Reset();
}
}
}

[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
private void AddOrUpdate(TKey key, TValue value)
{
ValidateAccessRights(AccessRightsType.Write);
ValidateStarted();

if (key == null)
throw new ArgumentNullException(nameof(key));
Expand Down Expand Up @@ -743,6 +753,12 @@ private void ValidateAccessRights(AccessRightsType rights)
if (!_accessrights.HasFlag(rights))
throw new InvalidOperationException($"{rights} access flag not set");
}
[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
private void ValidateStarted()
{
if (!IsStarted)
throw new InvalidOperationException("The instance was not started");
}

[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
void BuildConsumers()
Expand Down Expand Up @@ -967,13 +983,15 @@ public bool StartAndWait(int timeout = Timeout.Infinite)
public bool WaitForStateAssignment(int timeout = Timeout.Infinite)
{
ValidateAccessRights(AccessRightsType.Read);
ValidateStarted();
return WaitHandle.WaitAll(_assignmentWaiters, timeout);
}

/// <inheritdoc cref="IKNetCompactedReplicator{TKey, TValue}.SyncWait(int)"/>
public bool SyncWait(int timeout = Timeout.Infinite)
{
ValidateAccessRights(AccessRightsType.Read);
ValidateStarted();
Stopwatch watcher = Stopwatch.StartNew();
bool sync = false;
while (!sync && watcher.ElapsedMilliseconds < (uint)timeout)
Expand All @@ -998,6 +1016,7 @@ public bool SyncWait(int timeout = Timeout.Infinite)
public void Flush()
{
ValidateAccessRights(AccessRightsType.Write);
ValidateStarted();
_producer?.Flush();
}

Expand All @@ -1019,6 +1038,7 @@ public TValue this[TKey key]
get
{
ValidateAccessRights(AccessRightsType.Read);
ValidateStarted();
BuildOnTheFlyConsumer();
if (!new LocalDataStorageEnumerator(_dictionary, _onTheFlyConsumer, StateName).TryGetValue(key, out var data))
{
Expand All @@ -1039,6 +1059,7 @@ public System.Collections.Generic.ICollection<TKey> Keys
get
{
ValidateAccessRights(AccessRightsType.Read);
ValidateStarted();
return _dictionary.Keys;
}
}
Expand All @@ -1053,6 +1074,7 @@ public System.Collections.Generic.ICollection<TValue> Values
get
{
ValidateAccessRights(AccessRightsType.Read);
ValidateStarted();
BuildOnTheFlyConsumer();
return new LocalDataStorageEnumerator(_dictionary, _onTheFlyConsumer, StateName).Values();
}
Expand All @@ -1068,6 +1090,7 @@ public int Count
get
{
ValidateAccessRights(AccessRightsType.Read);
ValidateStarted();
return _dictionary.Count;
}
}
Expand Down Expand Up @@ -1110,6 +1133,7 @@ public void Add(KeyValuePair<TKey, TValue> item)
public void Clear()
{
ValidateAccessRights(AccessRightsType.Write);
ValidateStarted();
_dictionary.Clear();
}

Expand All @@ -1123,6 +1147,7 @@ public void Clear()
public bool Contains(KeyValuePair<TKey, TValue> item)
{
ValidateAccessRights(AccessRightsType.Read);
ValidateStarted();
BuildOnTheFlyConsumer();
return new LocalDataStorageEnumerator(_dictionary, _onTheFlyConsumer, StateName).Contains(item);
}
Expand All @@ -1137,6 +1162,7 @@ public bool Contains(KeyValuePair<TKey, TValue> item)
public bool ContainsKey(TKey key)
{
ValidateAccessRights(AccessRightsType.Read);
ValidateStarted();
return _dictionary.ContainsKey(key);
}

Expand All @@ -1155,6 +1181,7 @@ public bool ContainsKey(TKey key)
public void CopyTo(KeyValuePair<TKey, TValue>[] array, int arrayIndex)
{
ValidateAccessRights(AccessRightsType.Read);
ValidateStarted();
BuildOnTheFlyConsumer();
new LocalDataStorageEnumerator(_dictionary, _onTheFlyConsumer, StateName).CopyTo(array, arrayIndex);
}
Expand All @@ -1167,6 +1194,7 @@ public void CopyTo(KeyValuePair<TKey, TValue>[] array, int arrayIndex)
public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
{
ValidateAccessRights(AccessRightsType.Read);
ValidateStarted();
BuildOnTheFlyConsumer();
return new LocalDataStorageEnumerator(_dictionary, _onTheFlyConsumer, StateName);
}
Expand Down Expand Up @@ -1211,6 +1239,7 @@ public bool Remove(KeyValuePair<TKey, TValue> item)
public bool TryGetValue(TKey key, out TValue value)
{
ValidateAccessRights(AccessRightsType.Read);
ValidateStarted();
BuildOnTheFlyConsumer();
return new LocalDataStorageEnumerator(_dictionary, _onTheFlyConsumer, StateName).TryGetValue(key, out value);
}
Expand All @@ -1219,6 +1248,7 @@ public bool TryGetValue(TKey key, out TValue value)
IEnumerator IEnumerable.GetEnumerator()
{
ValidateAccessRights(AccessRightsType.Read);
ValidateStarted();
BuildOnTheFlyConsumer();
return new LocalDataStorageEnumerator(_dictionary, _onTheFlyConsumer, StateName);
}
Expand All @@ -1234,6 +1264,11 @@ public void Dispose()
{
_consumerPollRun = false;

foreach (var consumerPollThread in _consumerPollThreads)
{
consumerPollThread.Join();
}

if (_consumers != null)
{
foreach (var item in _consumers)
Expand All @@ -1257,6 +1292,8 @@ public void Dispose()
}
}
}

_started = false;
}

#endregion
Expand Down

0 comments on commit 222cb42

Please sign in to comment.