Skip to content

Commit

Permalink
Added some new fix for Java.Lang.NullPointerException (#257)
Browse files Browse the repository at this point in the history
  • Loading branch information
masesdevelopers authored Oct 9, 2023
1 parent 03f8e78 commit 7264351
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 56 deletions.
4 changes: 3 additions & 1 deletion src/documentation/articles/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ namespace MASES.KNetTemplate.KNetConsumer

using (var consumer = new KafkaConsumer<string, string>(props))
{
consumer.Subscribe(Collections.singleton(topicToUse));
var topics = Collections.Singleton(topicToUse);
consumer.Subscribe(topics);
while (!resetEvent.WaitOne(0))
{
var records = consumer.Poll((long)TimeSpan.FromMilliseconds(200).TotalMilliseconds);
Expand All @@ -293,6 +294,7 @@ namespace MASES.KNetTemplate.KNetConsumer
Console.WriteLine($"Offset = {item.Offset}, Key = {item.Key}, Value = {item.Value}");
}
}
topics?.Dispose(); // needed to avoid Java.Lang.NullPointerException in some conditions where .NET GC retires topics too early
}
}

Expand Down
15 changes: 11 additions & 4 deletions src/net/KNet/Specific/Consumer/KNetConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,18 @@ public void ConsumeAsync(long timeoutMs)
Duration duration = TimeSpan.FromMilliseconds(timeoutMs);
if (consumedRecords == null) throw new ArgumentException("Cannot be used since constructor was called with useJVMCallback set to true.");
if (!threadRunning) throw new InvalidOperationException("Dispatching thread is not running.");
var results = this.Poll(duration);
consumedRecords.Enqueue(results);
lock (consumedRecords)
try
{
System.Threading.Monitor.Pulse(consumedRecords);
var results = this.Poll(duration);
consumedRecords.Enqueue(results);
lock (consumedRecords)
{
System.Threading.Monitor.Pulse(consumedRecords);
}
}
finally
{
duration?.Dispose();
}
}
/// <inheritdoc cref="IKNetConsumer{K, V}.Consume(long, Action{KNetConsumerRecord{K, V}})"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,12 @@ public static async Task<DescribeTopicsResult> DescribeTopicsAsync(this IAdmin a
/// </summary>
public static async Task<DescribeTopicsResult> DescribeTopicAsync(this IAdmin admin, string topicName)
{
return await Execute(admin.DescribeTopics, Collections.Singleton(topicName));
var topics = Collections.Singleton(topicName);
try
{
return await Execute(admin.DescribeTopics, topics);
}
finally { topics?.Dispose(); }
}
/// <summary>
/// Async version of <see cref="IAdmin.DescribeTopics(Collection{string}, DescribeTopicsOptions)"/>
Expand Down
101 changes: 64 additions & 37 deletions src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* Refer to LICENSE for more information.
*/

using Java.Time;
using Java.Util;
using MASES.JCOBridge.C2JBridge;
using MASES.KNet.Admin;
Expand Down Expand Up @@ -175,13 +176,13 @@ public interface IKNetCompactedReplicator<TKey, TValue> : IDictionary<TKey, TVal
/// </summary>
ProducerConfigBuilder ProducerConfig { get; }
/// <summary>
/// Get or set <see cref="KNetSerDes{TKey}"/> to use in <see cref="KNetCompactedReplicator{TKey, TValue}"/>, by default it creates a default one based on <typeparamref name="TKey"/>
/// Get or set an instance of <see cref="IKNetSerDes{TKey}"/> to use in <see cref="KNetCompactedReplicator{TKey, TValue}"/>, by default it creates a default one based on <typeparamref name="TKey"/>
/// </summary>
KNetSerDes<TKey> KeySerDes { get; }
IKNetSerDes<TKey> KeySerDes { get; }
/// <summary>
/// Get or set <see cref="KNetSerDes{TValue}"/> to use in <see cref="KNetCompactedReplicator{TKey, TValue}"/>, by default it creates a default one based on <typeparamref name="TValue"/>
/// Get or set an instance of <see cref="IKNetSerDes{TValue}"/> to use in <see cref="KNetCompactedReplicator{TKey, TValue}"/>, by default it creates a default one based on <typeparamref name="TValue"/>
/// </summary>
KNetSerDes<TValue> ValueSerDes { get; }
IKNetSerDes<TValue> ValueSerDes { get; }
/// <summary>
/// <see langword="true"/> if the instance was started
/// </summary>
Expand Down Expand Up @@ -383,17 +384,28 @@ public void CopyTo(KeyValuePair<TKey, TValue>[] array, int arrayIndex)
static void OnDemandRetrieve(IKNetConsumer<TKey, TValue> consumer, string topic, TKey key, ILocalDataStorage data)
{
var topicPartition = new TopicPartition(topic, data.Partition);
consumer.Assign(Collections.SingletonList(topicPartition));
consumer.Seek(topicPartition, data.Offset);
var results = consumer.Poll(TimeSpan.FromMinutes(1));
if (results == null) throw new InvalidOperationException("Failed to get records from remote.");
foreach (var result in results)
var topics = Collections.SingletonList(topicPartition);
Duration duration = TimeSpan.FromMinutes(1);
try
{
if (!Equals(result.Key, key)) continue;
if (data.Offset != result.Offset) throw new IndexOutOfRangeException($"Requested offset is {data.Offset} while received offset is {result.Offset}");
data.HasValue = true;
data.Value = result.Value;
break;
consumer.Assign(topics);
consumer.Seek(topicPartition, data.Offset);
var results = consumer.Poll(duration);
if (results == null) throw new InvalidOperationException("Failed to get records from remote.");
foreach (var result in results)
{
if (!Equals(result.Key, key)) continue;
if (data.Offset != result.Offset) throw new IndexOutOfRangeException($"Requested offset is {data.Offset} while received offset is {result.Offset}");
data.HasValue = true;
data.Value = result.Value;
break;
}
}
finally
{
topicPartition?.Dispose();
topics?.Dispose();
duration?.Dispose();
}
}
}
Expand Down Expand Up @@ -464,8 +476,8 @@ public override void OnPartitionsLost(Java.Util.Collection<Org.Apache.Kafka.Comm
private ManualResetEvent[] _assignmentWaiters;
private long[] _lastPartitionLags = null;

private KNetSerDes<TKey> _keySerDes = null;
private KNetSerDes<TValue> _valueSerDes = null;
private IKNetSerDes<TKey> _keySerDes = null;
private IKNetSerDes<TValue> _valueSerDes = null;

private bool _started = false;

Expand Down Expand Up @@ -529,10 +541,10 @@ public Func<IKNetCompactedReplicator<TKey, TValue>, bool, KeyValuePair<TKey, TVa
public ProducerConfigBuilder ProducerConfig { get { return _producerConfig; } set { CheckStarted(); _producerConfig = value; } }

/// <inheritdoc cref="IKNetCompactedReplicator{TKey, TValue}.KeySerDes"/>
public KNetSerDes<TKey> KeySerDes { get { return _keySerDes; } set { CheckStarted(); _keySerDes = value; } }
public IKNetSerDes<TKey> KeySerDes { get { return _keySerDes; } set { CheckStarted(); _keySerDes = value; } }

/// <inheritdoc cref="IKNetCompactedReplicator{TKey, TValue}.ValueSerDes"/>
public KNetSerDes<TValue> ValueSerDes { get { return _valueSerDes; } set { CheckStarted(); _valueSerDes = value; } }
public IKNetSerDes<TValue> ValueSerDes { get { return _valueSerDes; } set { CheckStarted(); _valueSerDes = value; } }

/// <inheritdoc cref="IKNetCompactedReplicator{TKey, TValue}.IsStarted"/>
public bool IsStarted => _started;
Expand Down Expand Up @@ -820,35 +832,45 @@ void BuildProducer()
void ConsumerPollHandler(object o)
{
int index = (int)o;
_consumers[index].Subscribe(Collections.Singleton(StateName), _consumerListeners[index]);
while (_consumerPollRun)
var topics = Collections.Singleton(StateName);
try
{
try
_consumers[index].Subscribe(topics, _consumerListeners[index]);
while (_consumerPollRun)
{
_consumers[index].ConsumeAsync(100);
lock (_consumerAssociatedPartition)
try
{
foreach (var partitionIndex in _consumerAssociatedPartition[index])
_consumers[index].ConsumeAsync(100);
lock (_consumerAssociatedPartition)
{
bool execute = false;
if (_assignmentWaiters[partitionIndex].SafeWaitHandle.IsClosed) continue;
else execute = _assignmentWaiters[partitionIndex].WaitOne(0);
if (execute)
foreach (var partitionIndex in _consumerAssociatedPartition[index])
{
try
{
var lag = _consumers[index].CurrentLag(new TopicPartition(StateName, partitionIndex));
Interlocked.Exchange(ref _lastPartitionLags[partitionIndex], lag.IsPresent() ? lag.AsLong : -1);
}
catch (Java.Lang.IllegalStateException)
bool execute = false;
if (_assignmentWaiters[partitionIndex].SafeWaitHandle.IsClosed) continue;
else execute = _assignmentWaiters[partitionIndex].WaitOne(0);
if (execute)
{
Interlocked.Exchange(ref _lastPartitionLags[partitionIndex], -1);
try
{
var lag = _consumers[index].CurrentLag(new TopicPartition(StateName, partitionIndex));
Interlocked.Exchange(ref _lastPartitionLags[partitionIndex], lag.IsPresent() ? lag.AsLong : -1);
}
catch (Java.Lang.IllegalStateException)
{
Interlocked.Exchange(ref _lastPartitionLags[partitionIndex], -1);
}
}
}
}
}
catch { }
}
catch { }
}
catch { }
finally
{
_consumers[index].Unsubscribe();
topics?.Dispose();
}
}

Expand Down Expand Up @@ -882,10 +904,11 @@ public void Start()
}
catch (TopicExistsException)
{
var topics = Collections.Singleton(StateName);
// recover partitions of the topic
try
{
var result = _admin.DescribeTopics(Collections.Singleton(StateName));
var result = _admin.DescribeTopics(topics);
if (result != null)
{
var map = result.AllTopicNames().Get();
Expand All @@ -900,6 +923,10 @@ public void Start()
{

}
finally
{
topics?.Dispose();
}
}
}

Expand Down
12 changes: 9 additions & 3 deletions tests/KNetBenchmark/ProgramKNet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,11 @@ static Stopwatch ConsumeKNet(int testNum, string topicName, int length, int nump
};

var consumer = KNetConsumer();
var topics = Collections.Singleton(topicName);
try
{
int counter = 0;
consumer.Subscribe(Collections.Singleton(topicName), rebalanceListener);
consumer.Subscribe(topics, rebalanceListener);
consumer.SetCallback((message) =>
{
if (CheckOnConsume)
Expand Down Expand Up @@ -268,6 +269,7 @@ static Stopwatch ConsumeKNet(int testNum, string topicName, int length, int nump
{
if (!SharedObjects) consumer.Dispose();
rebalanceListener?.Dispose();
topics?.Dispose();
}
}
catch (Java.Util.Concurrent.ExecutionException ex)
Expand Down Expand Up @@ -296,10 +298,11 @@ static Stopwatch ConsumeProduceKNet(string topicName, int length, int numpacket,

var consumer = KNetConsumer();
var producer = KNetProducer();
var topics = Collections.Singleton(topicName);
try
{
int counter = 0;
consumer.Subscribe(Collections.Singleton(topicName), rebalanceListener);
consumer.Subscribe(topics, rebalanceListener);
while (true)
{
var records = consumer.Poll(TimeSpan.FromMinutes(1));
Expand Down Expand Up @@ -331,6 +334,7 @@ static Stopwatch ConsumeProduceKNet(string topicName, int length, int numpacket,
consumer.Dispose();
producer.Dispose();
}
topics?.Dispose();
}
}
catch (Java.Util.Concurrent.ExecutionException ex)
Expand All @@ -351,6 +355,7 @@ static Stopwatch ConsumeProduceKNet(string topicName, int length, int numpacket,
System.Threading.Thread thread = new System.Threading.Thread(() =>
{
ConsumerRebalanceListener rebalanceListener = null;
var topics = Collections.Singleton(topicName);
try
{
rebalanceListener = new()
Expand All @@ -365,7 +370,7 @@ static Stopwatch ConsumeProduceKNet(string topicName, int length, int numpacket,
startEvent.Set();
}
};
consumer.Subscribe(Collections.Singleton(topicName), rebalanceListener);
consumer.Subscribe(topics, rebalanceListener);
Java.Time.Duration duration = TimeSpan.FromSeconds(1);
int counter = 0;
while (true)
Expand Down Expand Up @@ -399,6 +404,7 @@ static Stopwatch ConsumeProduceKNet(string topicName, int length, int numpacket,
consumer.Dispose();
}
startEvent.Set();
topics?.Dispose();
}
});

Expand Down
14 changes: 10 additions & 4 deletions tests/KNetBenchmark/ProgramKafka.cs
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,12 @@ static Stopwatch ConsumeKafka(int testNum, string topicName, int length, int num
};

var consumer = KafkaConsumer();
Java.Time.Duration duration = TimeSpan.FromMinutes(1);
var topics = Collections.Singleton(topicName);
try
{
Java.Time.Duration duration = TimeSpan.FromMinutes(1);
int counter = 0;
consumer.Subscribe(Collections.Singleton(topicName), rebalanceListener);
consumer.Subscribe(topics, rebalanceListener);
while (true)
{
var records = consumer.Poll(duration);
Expand Down Expand Up @@ -318,6 +319,8 @@ static Stopwatch ConsumeKafka(int testNum, string topicName, int length, int num
{
if (!SharedObjects) consumer.Dispose();
rebalanceListener?.Dispose();
duration?.Dispose();
topics?.Dispose();
}
}
catch (Java.Util.Concurrent.ExecutionException ex)
Expand All @@ -337,6 +340,8 @@ static Stopwatch ConsumeKafka(int testNum, string topicName, int length, int num

System.Threading.Thread thread = new System.Threading.Thread(() =>
{
Java.Time.Duration duration = TimeSpan.FromSeconds(1);
var topics = Collections.Singleton(topicName);
ConsumerRebalanceListener rebalanceListener = null;
try
{
Expand All @@ -353,8 +358,7 @@ static Stopwatch ConsumeKafka(int testNum, string topicName, int length, int num
}
};
consumer.Subscribe(Collections.Singleton(topicName), rebalanceListener);
Java.Time.Duration duration = TimeSpan.FromSeconds(1);
consumer.Subscribe(topics, rebalanceListener);
int counter = 0;
while (true)
{
Expand Down Expand Up @@ -402,6 +406,8 @@ static Stopwatch ConsumeKafka(int testNum, string topicName, int length, int num
consumer.Dispose();
}
startEvent.Set();
duration?.Dispose();
topics?.Dispose();
}
});
thread.Start();
Expand Down
Loading

0 comments on commit 7264351

Please sign in to comment.