Skip to content

Commit

Permalink
Fix other possible Java.Lang.NullPointerException, startup synchron…
Browse files Browse the repository at this point in the history
…ization and minor changes (#260)

* Fix Java.Lang.NullPointerException and startup synchronization

* Override GroupId only if it does not exist in external supplied ConsumerConfig

* Added missing properties on KNetConsumerRecords and KNetConsumerRecord

* ConsumeAsync of KNetConsumer enqueue data only if contains information to avoid to waste CPU cycles and returns a boolean indicating if something was enqueued

* Added ConsumePollTimeout to manage timeout of ConsumeAsync, changed SyncWait behavior considering data buffered from ConsumeAsync and removed the case of ConsumeLag not available
  • Loading branch information
masesdevelopers authored Oct 10, 2023
1 parent 4e4acaf commit ea42082
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 68 deletions.
98 changes: 52 additions & 46 deletions src/net/KNet/Specific/Consumer/KNetConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public interface IKNetConsumer<K, V> : IConsumer<byte[], byte[]>
/// </summary>
bool IsCompleting { get; }
/// <summary>
/// <see langword="true"/> if the <see cref="IKNetConsumer{K, V}"/> instance has an empty setnof items in async operation
/// <see langword="true"/> if the <see cref="IKNetConsumer{K, V}"/> instance has an empty set of items in async operation
/// </summary>
bool IsEmpty { get; }
/// <summary>
Expand Down Expand Up @@ -71,7 +71,8 @@ public interface IKNetConsumer<K, V> : IConsumer<byte[], byte[]>
/// KNet async extension for <see cref="Org.Apache.Kafka.Clients.Consumer.Consumer.Poll(Duration)"/>
/// </summary>
/// <param name="timeoutMs">Timeout in milliseconds</param>
void ConsumeAsync(long timeoutMs);
/// <returns><see langword="true"/> if something was enqued for Async operations</returns>
bool ConsumeAsync(long timeoutMs);
/// <summary>
/// KNet sync extension for <see cref="Org.Apache.Kafka.Clients.Consumer.Consumer.Poll(Duration)"/>
/// </summary>
Expand All @@ -86,12 +87,12 @@ public interface IKNetConsumer<K, V> : IConsumer<byte[], byte[]>
/// <typeparam name="V">The value type</typeparam>
public class KNetConsumer<K, V> : KafkaConsumer<byte[], byte[]>, IKNetConsumer<K, V>
{
readonly bool autoCreateSerDes = false;
bool threadRunning = false;
long dequeing = 0;
readonly System.Threading.Thread consumeThread = null;
readonly ConcurrentQueue<KNetConsumerRecords<K, V>> consumedRecords = null;
readonly KNetConsumerCallback<K, V> consumerCallback = null;
readonly bool _autoCreateSerDes = false;
bool _threadRunning = false;
long _dequeing = 0;
readonly System.Threading.Thread _consumeThread = null;
readonly ConcurrentQueue<KNetConsumerRecords<K, V>> _consumedRecords = null;
readonly KNetConsumerCallback<K, V> _consumerCallback = null;
readonly IKNetDeserializer<K> _keyDeserializer;
readonly IKNetDeserializer<V> _valueDeserializer;
/// <summary>
Expand All @@ -106,19 +107,19 @@ public class KNetConsumer<K, V> : KafkaConsumer<byte[], byte[]>, IKNetConsumer<K
public KNetConsumer(Properties props, bool useJVMCallback = false)
: this(props, new KNetSerDes<K>(), new KNetSerDes<V>(), useJVMCallback)
{
autoCreateSerDes = true;
_autoCreateSerDes = true;

if (useJVMCallback)
{
consumerCallback = new KNetConsumerCallback<K, V>(CallbackMessage, _keyDeserializer, _valueDeserializer);
IExecute("setCallback", consumerCallback);
_consumerCallback = new KNetConsumerCallback<K, V>(CallbackMessage, _keyDeserializer, _valueDeserializer);
IExecute("setCallback", _consumerCallback);
}
else
{
consumedRecords = new();
threadRunning = true;
consumeThread = new(ConsumeHandler);
consumeThread.Start();
_consumedRecords = new();
_threadRunning = true;
_consumeThread = new(ConsumeHandler);
_consumeThread.Start();
}
}
/// <summary>
Expand All @@ -136,15 +137,15 @@ public KNetConsumer(Properties props, IKNetDeserializer<K> keyDeserializer, IKNe

if (useJVMCallback)
{
consumerCallback = new KNetConsumerCallback<K, V>(CallbackMessage, _keyDeserializer, _valueDeserializer);
IExecute("setCallback", consumerCallback);
_consumerCallback = new KNetConsumerCallback<K, V>(CallbackMessage, _keyDeserializer, _valueDeserializer);
IExecute("setCallback", _consumerCallback);
}
else
{
consumedRecords = new();
threadRunning = true;
consumeThread = new(ConsumeHandler);
consumeThread.Start();
_consumedRecords = new();
_threadRunning = true;
_consumeThread = new(ConsumeHandler);
_consumeThread.Start();
}
}

Expand Down Expand Up @@ -194,22 +195,22 @@ void CallbackMessage(KNetConsumerRecord<K, V> message)
/// <inheritdoc cref="IDisposable.Dispose"/>
public override void Dispose()
{
if (consumerCallback != null)
if (_consumerCallback != null)
{
IExecute("setCallback", null);
consumerCallback?.Dispose();
_consumerCallback?.Dispose();
}
threadRunning = false;
if (consumedRecords != null)
_threadRunning = false;
if (_consumedRecords != null)
{
lock (consumedRecords)
lock (_consumedRecords)
{
System.Threading.Monitor.Pulse(consumedRecords);
System.Threading.Monitor.Pulse(_consumedRecords);
}
if (IsCompleting) { consumeThread?.Join(); };
if (IsCompleting) { _consumeThread?.Join(); };
actionCallback = null;
}
if (autoCreateSerDes)
if (_autoCreateSerDes)
{
_keyDeserializer?.Dispose();
_valueDeserializer?.Dispose();
Expand All @@ -226,11 +227,11 @@ void ConsumeHandler(object o)
{
try
{
while (threadRunning)
while (_threadRunning)
{
if (consumedRecords.TryDequeue(out KNetConsumerRecords<K, V> records))
if (_consumedRecords.TryDequeue(out KNetConsumerRecords<K, V> records))
{
System.Threading.Interlocked.Increment(ref dequeing);
System.Threading.Interlocked.Increment(ref _dequeing);
try
{
foreach (var item in records)
Expand All @@ -241,40 +242,45 @@ void ConsumeHandler(object o)
catch { }
finally
{
System.Threading.Interlocked.Decrement(ref dequeing);
System.Threading.Interlocked.Decrement(ref _dequeing);
}
}
else if (threadRunning)
else if (_threadRunning)
{
lock (consumedRecords)
lock (_consumedRecords)
{
System.Threading.Monitor.Wait(consumedRecords);
System.Threading.Monitor.Wait(_consumedRecords);
}
}
}
}
catch { }
}
/// <inheritdoc cref="IKNetConsumer{K, V}.IsCompleting"/>
public bool IsCompleting => !consumedRecords.IsEmpty || System.Threading.Interlocked.Read(ref dequeing) != 0;
public bool IsCompleting => !_consumedRecords.IsEmpty || System.Threading.Interlocked.Read(ref _dequeing) != 0;
/// <inheritdoc cref="IKNetConsumer{K, V}.IsEmpty"/>
public bool IsEmpty => consumedRecords.IsEmpty;
public bool IsEmpty => _consumedRecords.IsEmpty;
/// <inheritdoc cref="IKNetConsumer{K, V}.WaitingMessages"/>
public int WaitingMessages => consumedRecords.Count;
public int WaitingMessages => _consumedRecords.Count;
/// <inheritdoc cref="IKNetConsumer{K, V}.ConsumeAsync(long)"/>
public void ConsumeAsync(long timeoutMs)
public bool ConsumeAsync(long timeoutMs)
{
Duration duration = TimeSpan.FromMilliseconds(timeoutMs);
if (consumedRecords == null) throw new ArgumentException("Cannot be used since constructor was called with useJVMCallback set to true.");
if (!threadRunning) throw new InvalidOperationException("Dispatching thread is not running.");
if (_consumedRecords == null) throw new ArgumentException("Cannot be used since constructor was called with useJVMCallback set to true.");
if (!_threadRunning) throw new InvalidOperationException("Dispatching thread is not running.");
try
{
var results = this.Poll(duration);
consumedRecords.Enqueue(results);
lock (consumedRecords)
bool isEmpty = results.IsEmpty;
if (!isEmpty)
{
System.Threading.Monitor.Pulse(consumedRecords);
_consumedRecords.Enqueue(results);
lock (_consumedRecords)
{
System.Threading.Monitor.Pulse(_consumedRecords);
}
}
return !isEmpty;
}
finally
{
Expand All @@ -285,7 +291,7 @@ public void ConsumeAsync(long timeoutMs)
public void Consume(long timeoutMs, Action<KNetConsumerRecord<K, V>> callback)
{
Duration duration = TimeSpan.FromMilliseconds(timeoutMs);
if (consumerCallback == null) throw new ArgumentException("Cannot be used since constructor was called with useJVMCallback set to false.");
if (_consumerCallback == null) throw new ArgumentException("Cannot be used since constructor was called with useJVMCallback set to false.");
try
{
actionCallback = callback;
Expand Down
2 changes: 2 additions & 0 deletions src/net/KNet/Specific/Consumer/KNetConsumerRecord.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public KNetConsumerRecord(ConsumerRecord<byte[], byte[]> record, IKNetDeserializ
}
/// <inheritdoc cref="ConsumerRecord{K, V}.Topic"/>
public string Topic => _record.Topic();
/// <inheritdoc cref="ConsumerRecord{K, V}.LeaderEpoch"/>
public int? LeaderEpoch { get { var epoch = _record.LeaderEpoch(); return epoch.IsEmpty() ? null : epoch.Get(); } }
/// <inheritdoc cref="ConsumerRecord{K, V}.Partition"/>
public int Partition => _record.Partition();
/// <inheritdoc cref="ConsumerRecord{K, V}.Headers"/>
Expand Down
8 changes: 8 additions & 0 deletions src/net/KNet/Specific/Consumer/KNetConsumerRecords.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ public KNetConsumerRecords(ConsumerRecords<byte[], byte[]> records, IKNetDeseria
_keyDeserializer = keyDeserializer;
_valueDeserializer = valueDeserializer;
}
/// <summary>
/// <see langword="true"/> if the <see cref="KNetConsumerRecords{K, V}"/> is empty
/// </summary>
public bool IsEmpty => _records.IsEmpty();
/// <summary>
/// The number of elements in <see cref="KNetConsumerRecords{K, V}"/>
/// </summary>
public int Count => _records.Count();

IEnumerator<KNetConsumerRecord<K, V>> IEnumerable<KNetConsumerRecord<K, V>>.GetEnumerator()
{
Expand Down
Loading

0 comments on commit ea42082

Please sign in to comment.