Skip to content

Commit

Permalink
Fix System.ObjectDisposedException raised from Dispose of `KNetComp…
Browse files Browse the repository at this point in the history
…actedReplicator` (#234)

* #233 (comment): fix wrong behavior on Dispose
  • Loading branch information
masesdevelopers authored Sep 6, 2023
1 parent 973a760 commit 871bbd8
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 28 deletions.
11 changes: 7 additions & 4 deletions src/net/KNet/Specific/Consumer/KNetConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,11 @@ void CallbackMessage(KNetConsumerRecord<K, V> message)
/// <inheritdoc cref="IDisposable.Dispose"/>
public override void Dispose()
{
base.Dispose();
IExecute("setCallback", null);
consumerCallback?.Dispose();
if (consumerCallback != null)
{
IExecute("setCallback", null);
consumerCallback?.Dispose();
}
threadRunning = false;
if (consumedRecords != null)
{
Expand All @@ -422,6 +424,7 @@ public override void Dispose()
_keyDeserializer?.Dispose();
_valueDeserializer?.Dispose();
}
base.Dispose();
}
/// <inheritdoc cref="IKNetConsumer{K, V}.SetCallback(Action{KNetConsumerRecord{K, V}})"/>
public void SetCallback(Action<KNetConsumerRecord<K, V>> cb)
Expand Down Expand Up @@ -451,7 +454,7 @@ void ConsumeHandler(object o)
System.Threading.Interlocked.Decrement(ref dequeing);
}
}
else
else if (threadRunning)
{
lock (consumedRecords)
{
Expand Down
51 changes: 32 additions & 19 deletions src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,24 @@ public void CopyTo(KeyValuePair<TKey, TValue>[] array, int arrayIndex)

Array.Copy(values.ToArray(), 0, array, arrayIndex, values.Count);
}

[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
static void OnDemandRetrieve(IKNetConsumer<TKey, TValue> consumer, string topic, TKey key, ILocalDataStorage data)
{
var topicPartition = new TopicPartition(topic, data.Partition);
consumer.Assign(Collections.SingletonList(topicPartition));
consumer.Seek(topicPartition, data.Offset);
var results = consumer.Poll(TimeSpan.FromMinutes(1));
if (results == null) throw new InvalidOperationException("Failed to get records from remote.");
foreach (var result in results)
{
if (!Equals(result.Key, key)) continue;
if (data.Offset != result.Offset) throw new IndexOutOfRangeException($"Requested offset is {data.Offset} while received offset is {result.Offset}");
data.HasValue = true;
data.Value = result.Value;
break;
}
}
}

#endregion
Expand Down Expand Up @@ -480,23 +498,7 @@ public Func<IKNetCompactedReplicator<TKey, TValue>, bool, KeyValuePair<TKey, TVa
#endregion

#region Private methods
[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
static void OnDemandRetrieve(IKNetConsumer<TKey, TValue> consumer, string topic, TKey key, ILocalDataStorage data)
{
var topicPartition = new TopicPartition(topic, data.Partition);
consumer.Assign(Collections.SingletonList(topicPartition));
consumer.Seek(topicPartition, data.Offset);
var results = consumer.Poll(TimeSpan.FromMinutes(1));
if (results == null) throw new InvalidOperationException("Failed to get records from remote.");
foreach (var result in results)
{
if (!Equals(result.Key, key)) continue;
if (data.Offset != result.Offset) throw new IndexOutOfRangeException($"Requested offset is {data.Offset} while received offset is {result.Offset}");
data.HasValue = true;
data.Value = result.Value;
break;
}
}

[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
void CheckStarted()
{
Expand Down Expand Up @@ -752,7 +754,13 @@ void ConsumerPollHandler(object o)
try
{
_consumers[index].ConsumeAsync(100);
if (_assignmentWaiters[index].WaitOne(0))
bool execute = false;
lock (_assignmentWaiters[index])
{
if (_assignmentWaiters[index].SafeWaitHandle.IsClosed) continue;
else execute = _assignmentWaiters[index].WaitOne(0);
}
if (execute)
{
try
{
Expand Down Expand Up @@ -1121,14 +1129,19 @@ public void Dispose()
}
}

_onTheFlyConsumer?.Dispose();

_producer?.Flush();
_producer?.Dispose();

if (_assignmentWaiters != null)
{
foreach (var item in _assignmentWaiters)
{
item?.Close();
lock (item)
{
item?.Close();
}
}
}
}
Expand Down
16 changes: 11 additions & 5 deletions tests/KNetCompactedReplicatorTest/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
using Org.Apache.Kafka.Clients.Consumer;
using Org.Apache.Kafka.Clients.Producer;
using System;
using System.Diagnostics;
using System.Threading;

namespace MASES.KNetTest
Expand Down Expand Up @@ -71,13 +72,18 @@ static void Main(string[] args)
{
serverToUse = args[0];
}

TestValues("TestValues", 100, UpdateModeTypes.OnDelivery | UpdateModeTypes.Delayed);

var sw = Stopwatch.StartNew();
TestValues("TestValues", 100, UpdateModeTypes.OnDelivery);
sw.Stop();
Console.WriteLine($"End TestValues in {sw.Elapsed}");
sw = Stopwatch.StartNew();
Test("TestOnDelivery", 100, UpdateModeTypes.OnDelivery | UpdateModeTypes.Delayed);

sw.Stop();
Console.WriteLine($"End TestOnDelivery in {sw.Elapsed}");
sw = Stopwatch.StartNew();
Test("TestOnConsume", 100, UpdateModeTypes.OnConsume | UpdateModeTypes.Delayed);

sw.Stop();
Console.WriteLine($"End TestOnConsume in {sw.Elapsed}");
Console.CancelKeyPress += Console_CancelKeyPress;
Console.WriteLine("Press Ctrl-C to exit");
resetEvent.WaitOne();
Expand Down

0 comments on commit 871bbd8

Please sign in to comment.