Skip to content

Commit

Permalink
Code refinement
Browse files Browse the repository at this point in the history
  • Loading branch information
masesdevelopers committed Sep 26, 2023
1 parent 18aa570 commit 87cd45f
Show file tree
Hide file tree
Showing 12 changed files with 465 additions and 333 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<None Include="..\..\documentation\articles\usageSerDes.md" Pack="true" PackagePath="\" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Apache.Avro" Version="1.11.1" />
<PackageReference Include="Apache.Avro" Version="1.11.3" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<PackageReference Include="System.Memory" Version="4.5.5" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="MessagePack" Version="2.5.108" />
<PackageReference Include="MessagePack" Version="2.5.124" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<None Include="..\..\documentation\articles\usageSerDes.md" Pack="true" PackagePath="\" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.23.3" />
<PackageReference Include="Google.Protobuf" Version="3.24.3" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
210 changes: 0 additions & 210 deletions src/net/KNet/Specific/Consumer/KNetConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,216 +31,6 @@

namespace MASES.KNet.Consumer
{
/// <summary>
/// KNet extension of <see cref="ConsumerRecord{K, V}"/>
/// </summary>
/// <typeparam name="K">The key type</typeparam>
/// <typeparam name="V">The value type</typeparam>
public class KNetConsumerRecord<K, V>
{
readonly IKNetDeserializer<K> _keyDeserializer;
readonly IKNetDeserializer<V> _valueDeserializer;
readonly ConsumerRecord<byte[], byte[]> _record;
/// <summary>
/// Initialize a new <see cref="KNetConsumerRecord{K, V}"/>
/// </summary>
/// <param name="record">The <see cref="ConsumerRecord{K, V}"/> to use for initialization</param>
/// <param name="keyDeserializer">Key serializer base on <see cref="KNetSerDes{K}"/></param>
/// <param name="valueDeserializer">Value serializer base on <see cref="KNetSerDes{K}"/></param>
public KNetConsumerRecord(ConsumerRecord<byte[], byte[]> record, IKNetDeserializer<K> keyDeserializer, IKNetDeserializer<V> valueDeserializer)
{
_record = record;
_keyDeserializer = keyDeserializer;
_valueDeserializer = valueDeserializer;
}
/// <inheritdoc cref="ConsumerRecord{K, V}.Topic"/>
public string Topic => _record.Topic();
/// <inheritdoc cref="ConsumerRecord{K, V}.Partition"/>
public int Partition => _record.Partition();
/// <inheritdoc cref="ConsumerRecord{K, V}.Headers"/>
public Headers Headers => _record.Headers();
/// <inheritdoc cref="ConsumerRecord{K, V}.Offset"/>
public long Offset => _record.Offset();
/// <inheritdoc cref="ConsumerRecord{K, V}.DateTime"/>
public System.DateTime DateTime => _record.DateTime;
/// <inheritdoc cref="ConsumerRecord{K, V}.Timestamp"/>
public long Timestamp => _record.Timestamp();
/// <inheritdoc cref="ConsumerRecord{K, V}.TimestampType"/>
public TimestampType TimestampType => _record.TimestampType();
/// <inheritdoc cref="ConsumerRecord{K, V}.SerializedKeySize"/>
public int SerializedKeySize => _record.SerializedKeySize();
/// <inheritdoc cref="ConsumerRecord{K, V}.SerializedValueSize"/>
public int SerializedValueSize => _record.SerializedValueSize();

bool _localKeyDes = false;
K _localKey = default;
/// <inheritdoc cref="ConsumerRecord{K, V}.Key"/>
public K Key
{
get
{
if (!_localKeyDes)
{
_localKey = _keyDeserializer.UseHeaders ? _keyDeserializer.DeserializeWithHeaders(Topic, Headers, _record.Key()) : _keyDeserializer.Deserialize(Topic, _record.Key());
_localKeyDes = true;
}
return _localKey;
}
}

bool _localValueDes = false;
V _localValue = default;
/// <inheritdoc cref="ConsumerRecord{K, V}.Value"/>
public V Value
{
get
{
if (!_localValueDes)
{
_localValue = _valueDeserializer.UseHeaders ? _valueDeserializer.DeserializeWithHeaders(Topic, Headers, _record.Value()) : _valueDeserializer.Deserialize(Topic, _record.Value());
_localValueDes = true;
}
return _localValue;
}
}
/// <inheritdoc cref="object.ToString"/>
public override string ToString()
{
return $"Topic: {Topic} - Partition {Partition} - Offset {Offset} - Key {Key} - Value {Value}";
}
}

class KNetConsumerRecordsEnumerator<K, V> : IEnumerator<KNetConsumerRecord<K, V>>, IAsyncEnumerator<KNetConsumerRecord<K, V>>
{
readonly IKNetDeserializer<K> _keyDeserializer;
readonly IKNetDeserializer<V> _valueDeserializer;
readonly CancellationToken _cancellationToken;
readonly ConsumerRecords<byte[], byte[]> _records;
IEnumerator<ConsumerRecord<byte[], byte[]>> _recordEnumerator;
IAsyncEnumerator<ConsumerRecord<byte[], byte[]>> _recordAsyncEnumerator;

public KNetConsumerRecordsEnumerator(ConsumerRecords<byte[], byte[]> records, IKNetDeserializer<K> keyDeserializer, IKNetDeserializer<V> valueDeserializer)
{
_records = records;
_recordEnumerator = _records.GetEnumerator();
_keyDeserializer = keyDeserializer;
_valueDeserializer = valueDeserializer;
}

public KNetConsumerRecordsEnumerator(ConsumerRecords<byte[], byte[]> records, IKNetDeserializer<K> keyDeserializer, IKNetDeserializer<V> valueDeserializer, CancellationToken cancellationToken)
{
_records = records;
_recordAsyncEnumerator = _records.GetAsyncEnumerator(cancellationToken);
_keyDeserializer = keyDeserializer;
_valueDeserializer = valueDeserializer;
_cancellationToken = cancellationToken;
}

KNetConsumerRecord<K, V> IAsyncEnumerator<KNetConsumerRecord<K, V>>.Current => new KNetConsumerRecord<K, V>(_recordAsyncEnumerator.Current, _keyDeserializer, _valueDeserializer);

KNetConsumerRecord<K, V> IEnumerator<KNetConsumerRecord<K, V>>.Current => new KNetConsumerRecord<K, V>(_recordEnumerator.Current, _keyDeserializer, _valueDeserializer);

object System.Collections.IEnumerator.Current => (_recordEnumerator as System.Collections.IEnumerator)?.Current;

public void Dispose()
{

}

public ValueTask DisposeAsync()
{
return _recordAsyncEnumerator.DisposeAsync();
}

public bool MoveNext()
{
return _recordEnumerator.MoveNext();
}

public ValueTask<bool> MoveNextAsync()
{
return _recordAsyncEnumerator.MoveNextAsync();
}

public void Reset()
{
_recordEnumerator = _records.GetEnumerator();
}
}
/// <summary>
/// KNet extension of <see cref="ConsumerRecords{K, V}"/>
/// </summary>
/// <typeparam name="K">The key type</typeparam>
/// <typeparam name="V">The value type</typeparam>
public class KNetConsumerRecords<K, V> : IEnumerable<KNetConsumerRecord<K, V>>, IAsyncEnumerable<KNetConsumerRecord<K, V>>
{
readonly IKNetDeserializer<K> _keyDeserializer;
readonly IKNetDeserializer<V> _valueDeserializer;
readonly ConsumerRecords<byte[], byte[]> _records;
/// <summary>
/// Initialize a new <see cref="KNetConsumerRecord{K, V}"/>
/// </summary>
/// <param name="records">The <see cref="ConsumerRecords{K, V}"/> to use for initialization</param>
/// <param name="keyDeserializer">Key serializer base on <see cref="KNetSerDes{K}"/></param>
/// <param name="valueDeserializer">Value serializer base on <see cref="KNetSerDes{K}"/></param>
public KNetConsumerRecords(ConsumerRecords<byte[], byte[]> records, IKNetDeserializer<K> keyDeserializer, IKNetDeserializer<V> valueDeserializer)
{
_records = records;
_keyDeserializer = keyDeserializer;
_valueDeserializer = valueDeserializer;
}

IEnumerator<KNetConsumerRecord<K, V>> IEnumerable<KNetConsumerRecord<K, V>>.GetEnumerator()
{
return new KNetConsumerRecordsEnumerator<K, V>(_records, _keyDeserializer, _valueDeserializer);
}

System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
{
return new KNetConsumerRecordsEnumerator<K, V>(_records, _keyDeserializer, _valueDeserializer);
}

IAsyncEnumerator<KNetConsumerRecord<K, V>> IAsyncEnumerable<KNetConsumerRecord<K, V>>.GetAsyncEnumerator(CancellationToken cancellationToken)
{
return new KNetConsumerRecordsEnumerator<K, V>(_records, _keyDeserializer, _valueDeserializer, cancellationToken);
}
}

interface IKNetConsumerCallback<K, V> : IJVMBridgeBase
{
void RecordReady(KNetConsumerRecord<K, V> message);
}

class KNetConsumerCallback<K, V> : JVMBridgeListener, IKNetConsumerCallback<K, V>
{
readonly IKNetDeserializer<K> _keyDeserializer;
readonly IKNetDeserializer<V> _valueDeserializer;
/// <summary>
/// <see href="https://www.jcobridge.com/api-clr/html/P_MASES_JCOBridge_C2JBridge_JVMBridgeListener_BridgeClassName.htm"/>
/// </summary>
public sealed override string BridgeClassName => "org.mases.knet.clients.consumer.KNetConsumerCallback";

readonly Action<KNetConsumerRecord<K, V>> recordReadyFunction = null;
public virtual Action<KNetConsumerRecord<K, V>> OnRecordReady { get { return recordReadyFunction; } }
public KNetConsumerCallback(Action<KNetConsumerRecord<K, V>> recordReady, IKNetDeserializer<K> keyDeserializer, IKNetDeserializer<V> valueDeserializer)
{
if (recordReady != null) recordReadyFunction = recordReady;
else recordReadyFunction = RecordReady;

_keyDeserializer = keyDeserializer;
_valueDeserializer = valueDeserializer;

AddEventHandler("recordReady", new EventHandler<CLRListenerEventArgs<CLREventData>>(OnRecordReadyEventHandler));
}

void OnRecordReadyEventHandler(object sender, CLRListenerEventArgs<CLREventData> data)
{
var record = this.BridgeInstance.Invoke<ConsumerRecord<byte[], byte[]>>("getRecord");
OnRecordReady(new KNetConsumerRecord<K, V>(record, _keyDeserializer, _valueDeserializer));
}

public virtual void RecordReady(KNetConsumerRecord<K, V> message) { }
}
/// <summary>
/// KNet extension of <see cref="IConsumer{K, V}"/>
/// </summary>
Expand Down
61 changes: 61 additions & 0 deletions src/net/KNet/Specific/Consumer/KNetConsumerCallback.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2023 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

using MASES.JCOBridge.C2JBridge;
using System;
using Org.Apache.Kafka.Clients.Consumer;
using MASES.KNet.Serialization;

namespace MASES.KNet.Consumer
{
interface IKNetConsumerCallback<K, V> : IJVMBridgeBase
{
void RecordReady(KNetConsumerRecord<K, V> message);
}

class KNetConsumerCallback<K, V> : JVMBridgeListener, IKNetConsumerCallback<K, V>
{
readonly IKNetDeserializer<K> _keyDeserializer;
readonly IKNetDeserializer<V> _valueDeserializer;
/// <summary>
/// <see href="https://www.jcobridge.com/api-clr/html/P_MASES_JCOBridge_C2JBridge_JVMBridgeListener_BridgeClassName.htm"/>
/// </summary>
public sealed override string BridgeClassName => "org.mases.knet.clients.consumer.KNetConsumerCallback";

readonly Action<KNetConsumerRecord<K, V>> recordReadyFunction = null;
public virtual Action<KNetConsumerRecord<K, V>> OnRecordReady { get { return recordReadyFunction; } }
public KNetConsumerCallback(Action<KNetConsumerRecord<K, V>> recordReady, IKNetDeserializer<K> keyDeserializer, IKNetDeserializer<V> valueDeserializer)
{
if (recordReady != null) recordReadyFunction = recordReady;
else recordReadyFunction = RecordReady;

_keyDeserializer = keyDeserializer;
_valueDeserializer = valueDeserializer;

AddEventHandler("recordReady", new EventHandler<CLRListenerEventArgs<CLREventData>>(OnRecordReadyEventHandler));
}

void OnRecordReadyEventHandler(object sender, CLRListenerEventArgs<CLREventData> data)
{
var record = this.BridgeInstance.Invoke<ConsumerRecord<byte[], byte[]>>("getRecord");
OnRecordReady(new KNetConsumerRecord<K, V>(record, _keyDeserializer, _valueDeserializer));
}

public virtual void RecordReady(KNetConsumerRecord<K, V> message) { }
}
}
Loading

0 comments on commit 87cd45f

Please sign in to comment.