diff --git a/src/net/KNet.Serialization.Avro/KNet.Serialization.Avro.csproj b/src/net/KNet.Serialization.Avro/KNet.Serialization.Avro.csproj index 5e605ae454..ae7205e2ae 100644 --- a/src/net/KNet.Serialization.Avro/KNet.Serialization.Avro.csproj +++ b/src/net/KNet.Serialization.Avro/KNet.Serialization.Avro.csproj @@ -18,7 +18,7 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/net/KNet.Serialization.MessagePack/KNet.Serialization.MessagePack.csproj b/src/net/KNet.Serialization.MessagePack/KNet.Serialization.MessagePack.csproj index 37f113cf65..2bdf4cddc8 100644 --- a/src/net/KNet.Serialization.MessagePack/KNet.Serialization.MessagePack.csproj +++ b/src/net/KNet.Serialization.MessagePack/KNet.Serialization.MessagePack.csproj @@ -21,7 +21,7 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/net/KNet.Serialization.Protobuf/KNet.Serialization.Protobuf.csproj b/src/net/KNet.Serialization.Protobuf/KNet.Serialization.Protobuf.csproj index 477ab7cc48..d7b13c99a1 100644 --- a/src/net/KNet.Serialization.Protobuf/KNet.Serialization.Protobuf.csproj +++ b/src/net/KNet.Serialization.Protobuf/KNet.Serialization.Protobuf.csproj @@ -18,7 +18,7 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/net/KNet/Specific/Consumer/KNetConsumer.cs b/src/net/KNet/Specific/Consumer/KNetConsumer.cs index 50212a3f1a..dae85e7262 100644 --- a/src/net/KNet/Specific/Consumer/KNetConsumer.cs +++ b/src/net/KNet/Specific/Consumer/KNetConsumer.cs @@ -31,216 +31,6 @@ namespace MASES.KNet.Consumer { - /// - /// KNet extension of - /// - /// The key type - /// The value type - public class KNetConsumerRecord - { - readonly IKNetDeserializer _keyDeserializer; - readonly IKNetDeserializer _valueDeserializer; - readonly ConsumerRecord _record; - /// - /// Initialize a new - /// - /// The to use for initialization - /// Key serializer base on - /// Value serializer base on - public KNetConsumerRecord(ConsumerRecord record, IKNetDeserializer keyDeserializer, IKNetDeserializer valueDeserializer) - { - _record = record; - _keyDeserializer = keyDeserializer; - _valueDeserializer = valueDeserializer; - } - /// - public string Topic => _record.Topic(); - /// - public int Partition => _record.Partition(); - /// - public Headers Headers => _record.Headers(); - /// - public long Offset => _record.Offset(); - /// - public System.DateTime DateTime => _record.DateTime; - /// - public long Timestamp => _record.Timestamp(); - /// - public TimestampType TimestampType => _record.TimestampType(); - /// - public int SerializedKeySize => _record.SerializedKeySize(); - /// - public int SerializedValueSize => _record.SerializedValueSize(); - - bool _localKeyDes = false; - K _localKey = default; - /// - public K Key - { - get - { - if (!_localKeyDes) - { - _localKey = _keyDeserializer.UseHeaders ? _keyDeserializer.DeserializeWithHeaders(Topic, Headers, _record.Key()) : _keyDeserializer.Deserialize(Topic, _record.Key()); - _localKeyDes = true; - } - return _localKey; - } - } - - bool _localValueDes = false; - V _localValue = default; - /// - public V Value - { - get - { - if (!_localValueDes) - { - _localValue = _valueDeserializer.UseHeaders ? _valueDeserializer.DeserializeWithHeaders(Topic, Headers, _record.Value()) : _valueDeserializer.Deserialize(Topic, _record.Value()); - _localValueDes = true; - } - return _localValue; - } - } - /// - public override string ToString() - { - return $"Topic: {Topic} - Partition {Partition} - Offset {Offset} - Key {Key} - Value {Value}"; - } - } - - class KNetConsumerRecordsEnumerator : IEnumerator>, IAsyncEnumerator> - { - readonly IKNetDeserializer _keyDeserializer; - readonly IKNetDeserializer _valueDeserializer; - readonly CancellationToken _cancellationToken; - readonly ConsumerRecords _records; - IEnumerator> _recordEnumerator; - IAsyncEnumerator> _recordAsyncEnumerator; - - public KNetConsumerRecordsEnumerator(ConsumerRecords records, IKNetDeserializer keyDeserializer, IKNetDeserializer valueDeserializer) - { - _records = records; - _recordEnumerator = _records.GetEnumerator(); - _keyDeserializer = keyDeserializer; - _valueDeserializer = valueDeserializer; - } - - public KNetConsumerRecordsEnumerator(ConsumerRecords records, IKNetDeserializer keyDeserializer, IKNetDeserializer valueDeserializer, CancellationToken cancellationToken) - { - _records = records; - _recordAsyncEnumerator = _records.GetAsyncEnumerator(cancellationToken); - _keyDeserializer = keyDeserializer; - _valueDeserializer = valueDeserializer; - _cancellationToken = cancellationToken; - } - - KNetConsumerRecord IAsyncEnumerator>.Current => new KNetConsumerRecord(_recordAsyncEnumerator.Current, _keyDeserializer, _valueDeserializer); - - KNetConsumerRecord IEnumerator>.Current => new KNetConsumerRecord(_recordEnumerator.Current, _keyDeserializer, _valueDeserializer); - - object System.Collections.IEnumerator.Current => (_recordEnumerator as System.Collections.IEnumerator)?.Current; - - public void Dispose() - { - - } - - public ValueTask DisposeAsync() - { - return _recordAsyncEnumerator.DisposeAsync(); - } - - public bool MoveNext() - { - return _recordEnumerator.MoveNext(); - } - - public ValueTask MoveNextAsync() - { - return _recordAsyncEnumerator.MoveNextAsync(); - } - - public void Reset() - { - _recordEnumerator = _records.GetEnumerator(); - } - } - /// - /// KNet extension of - /// - /// The key type - /// The value type - public class KNetConsumerRecords : IEnumerable>, IAsyncEnumerable> - { - readonly IKNetDeserializer _keyDeserializer; - readonly IKNetDeserializer _valueDeserializer; - readonly ConsumerRecords _records; - /// - /// Initialize a new - /// - /// The to use for initialization - /// Key serializer base on - /// Value serializer base on - public KNetConsumerRecords(ConsumerRecords records, IKNetDeserializer keyDeserializer, IKNetDeserializer valueDeserializer) - { - _records = records; - _keyDeserializer = keyDeserializer; - _valueDeserializer = valueDeserializer; - } - - IEnumerator> IEnumerable>.GetEnumerator() - { - return new KNetConsumerRecordsEnumerator(_records, _keyDeserializer, _valueDeserializer); - } - - System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() - { - return new KNetConsumerRecordsEnumerator(_records, _keyDeserializer, _valueDeserializer); - } - - IAsyncEnumerator> IAsyncEnumerable>.GetAsyncEnumerator(CancellationToken cancellationToken) - { - return new KNetConsumerRecordsEnumerator(_records, _keyDeserializer, _valueDeserializer, cancellationToken); - } - } - - interface IKNetConsumerCallback : IJVMBridgeBase - { - void RecordReady(KNetConsumerRecord message); - } - - class KNetConsumerCallback : JVMBridgeListener, IKNetConsumerCallback - { - readonly IKNetDeserializer _keyDeserializer; - readonly IKNetDeserializer _valueDeserializer; - /// - /// - /// - public sealed override string BridgeClassName => "org.mases.knet.clients.consumer.KNetConsumerCallback"; - - readonly Action> recordReadyFunction = null; - public virtual Action> OnRecordReady { get { return recordReadyFunction; } } - public KNetConsumerCallback(Action> recordReady, IKNetDeserializer keyDeserializer, IKNetDeserializer valueDeserializer) - { - if (recordReady != null) recordReadyFunction = recordReady; - else recordReadyFunction = RecordReady; - - _keyDeserializer = keyDeserializer; - _valueDeserializer = valueDeserializer; - - AddEventHandler("recordReady", new EventHandler>(OnRecordReadyEventHandler)); - } - - void OnRecordReadyEventHandler(object sender, CLRListenerEventArgs data) - { - var record = this.BridgeInstance.Invoke>("getRecord"); - OnRecordReady(new KNetConsumerRecord(record, _keyDeserializer, _valueDeserializer)); - } - - public virtual void RecordReady(KNetConsumerRecord message) { } - } /// /// KNet extension of /// diff --git a/src/net/KNet/Specific/Consumer/KNetConsumerCallback.cs b/src/net/KNet/Specific/Consumer/KNetConsumerCallback.cs new file mode 100644 index 0000000000..869f69e6da --- /dev/null +++ b/src/net/KNet/Specific/Consumer/KNetConsumerCallback.cs @@ -0,0 +1,61 @@ +/* +* Copyright 2023 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.JCOBridge.C2JBridge; +using System; +using Org.Apache.Kafka.Clients.Consumer; +using MASES.KNet.Serialization; + +namespace MASES.KNet.Consumer +{ + interface IKNetConsumerCallback : IJVMBridgeBase + { + void RecordReady(KNetConsumerRecord message); + } + + class KNetConsumerCallback : JVMBridgeListener, IKNetConsumerCallback + { + readonly IKNetDeserializer _keyDeserializer; + readonly IKNetDeserializer _valueDeserializer; + /// + /// + /// + public sealed override string BridgeClassName => "org.mases.knet.clients.consumer.KNetConsumerCallback"; + + readonly Action> recordReadyFunction = null; + public virtual Action> OnRecordReady { get { return recordReadyFunction; } } + public KNetConsumerCallback(Action> recordReady, IKNetDeserializer keyDeserializer, IKNetDeserializer valueDeserializer) + { + if (recordReady != null) recordReadyFunction = recordReady; + else recordReadyFunction = RecordReady; + + _keyDeserializer = keyDeserializer; + _valueDeserializer = valueDeserializer; + + AddEventHandler("recordReady", new EventHandler>(OnRecordReadyEventHandler)); + } + + void OnRecordReadyEventHandler(object sender, CLRListenerEventArgs data) + { + var record = this.BridgeInstance.Invoke>("getRecord"); + OnRecordReady(new KNetConsumerRecord(record, _keyDeserializer, _valueDeserializer)); + } + + public virtual void RecordReady(KNetConsumerRecord message) { } + } +} diff --git a/src/net/KNet/Specific/Consumer/KNetConsumerRecord.cs b/src/net/KNet/Specific/Consumer/KNetConsumerRecord.cs new file mode 100644 index 0000000000..4ccff2baff --- /dev/null +++ b/src/net/KNet/Specific/Consumer/KNetConsumerRecord.cs @@ -0,0 +1,104 @@ +/* +* Copyright 2023 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using Org.Apache.Kafka.Clients.Consumer; +using MASES.KNet.Serialization; +using Org.Apache.Kafka.Common.Header; +using Org.Apache.Kafka.Common.Record; + +namespace MASES.KNet.Consumer +{ + /// + /// KNet extension of + /// + /// The key type + /// The value type + public class KNetConsumerRecord + { + readonly IKNetDeserializer _keyDeserializer; + readonly IKNetDeserializer _valueDeserializer; + readonly ConsumerRecord _record; + /// + /// Initialize a new + /// + /// The to use for initialization + /// Key serializer base on + /// Value serializer base on + public KNetConsumerRecord(ConsumerRecord record, IKNetDeserializer keyDeserializer, IKNetDeserializer valueDeserializer) + { + _record = record; + _keyDeserializer = keyDeserializer; + _valueDeserializer = valueDeserializer; + } + /// + public string Topic => _record.Topic(); + /// + public int Partition => _record.Partition(); + /// + public Headers Headers => _record.Headers(); + /// + public long Offset => _record.Offset(); + /// + public System.DateTime DateTime => _record.DateTime; + /// + public long Timestamp => _record.Timestamp(); + /// + public TimestampType TimestampType => _record.TimestampType(); + /// + public int SerializedKeySize => _record.SerializedKeySize(); + /// + public int SerializedValueSize => _record.SerializedValueSize(); + + bool _localKeyDes = false; + K _localKey = default; + /// + public K Key + { + get + { + if (!_localKeyDes) + { + _localKey = _keyDeserializer.UseHeaders ? _keyDeserializer.DeserializeWithHeaders(Topic, Headers, _record.Key()) : _keyDeserializer.Deserialize(Topic, _record.Key()); + _localKeyDes = true; + } + return _localKey; + } + } + + bool _localValueDes = false; + V _localValue = default; + /// + public V Value + { + get + { + if (!_localValueDes) + { + _localValue = _valueDeserializer.UseHeaders ? _valueDeserializer.DeserializeWithHeaders(Topic, Headers, _record.Value()) : _valueDeserializer.Deserialize(Topic, _record.Value()); + _localValueDes = true; + } + return _localValue; + } + } + /// + public override string ToString() + { + return $"Topic: {Topic} - Partition {Partition} - Offset {Offset} - Key {Key} - Value {Value}"; + } + } +} diff --git a/src/net/KNet/Specific/Consumer/KNetConsumerRecords.cs b/src/net/KNet/Specific/Consumer/KNetConsumerRecords.cs new file mode 100644 index 0000000000..0ece4ee60c --- /dev/null +++ b/src/net/KNet/Specific/Consumer/KNetConsumerRecords.cs @@ -0,0 +1,64 @@ +/* +* Copyright 2023 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using Org.Apache.Kafka.Clients.Consumer; +using MASES.KNet.Serialization; +using System.Collections.Generic; +using System.Threading; + +namespace MASES.KNet.Consumer +{ + /// + /// KNet extension of + /// + /// The key type + /// The value type + public class KNetConsumerRecords : IEnumerable>, IAsyncEnumerable> + { + readonly IKNetDeserializer _keyDeserializer; + readonly IKNetDeserializer _valueDeserializer; + readonly ConsumerRecords _records; + /// + /// Initialize a new + /// + /// The to use for initialization + /// Key serializer base on + /// Value serializer base on + public KNetConsumerRecords(ConsumerRecords records, IKNetDeserializer keyDeserializer, IKNetDeserializer valueDeserializer) + { + _records = records; + _keyDeserializer = keyDeserializer; + _valueDeserializer = valueDeserializer; + } + + IEnumerator> IEnumerable>.GetEnumerator() + { + return new KNetConsumerRecordsEnumerator(_records, _keyDeserializer, _valueDeserializer); + } + + System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() + { + return new KNetConsumerRecordsEnumerator(_records, _keyDeserializer, _valueDeserializer); + } + + IAsyncEnumerator> IAsyncEnumerable>.GetAsyncEnumerator(CancellationToken cancellationToken) + { + return new KNetConsumerRecordsEnumerator(_records, _keyDeserializer, _valueDeserializer, cancellationToken); + } + } +} diff --git a/src/net/KNet/Specific/Consumer/KNetConsumerRecordsEnumerator.cs b/src/net/KNet/Specific/Consumer/KNetConsumerRecordsEnumerator.cs new file mode 100644 index 0000000000..4f29ef5a70 --- /dev/null +++ b/src/net/KNet/Specific/Consumer/KNetConsumerRecordsEnumerator.cs @@ -0,0 +1,84 @@ +/* +* Copyright 2023 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using Org.Apache.Kafka.Clients.Consumer; +using MASES.KNet.Serialization; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace MASES.KNet.Consumer +{ + class KNetConsumerRecordsEnumerator : IEnumerator>, IAsyncEnumerator> + { + readonly IKNetDeserializer _keyDeserializer; + readonly IKNetDeserializer _valueDeserializer; + readonly CancellationToken _cancellationToken; + readonly ConsumerRecords _records; + IEnumerator> _recordEnumerator; + IAsyncEnumerator> _recordAsyncEnumerator; + + public KNetConsumerRecordsEnumerator(ConsumerRecords records, IKNetDeserializer keyDeserializer, IKNetDeserializer valueDeserializer) + { + _records = records; + _recordEnumerator = _records.GetEnumerator(); + _keyDeserializer = keyDeserializer; + _valueDeserializer = valueDeserializer; + } + + public KNetConsumerRecordsEnumerator(ConsumerRecords records, IKNetDeserializer keyDeserializer, IKNetDeserializer valueDeserializer, CancellationToken cancellationToken) + { + _records = records; + _recordAsyncEnumerator = _records.GetAsyncEnumerator(cancellationToken); + _keyDeserializer = keyDeserializer; + _valueDeserializer = valueDeserializer; + _cancellationToken = cancellationToken; + } + + KNetConsumerRecord IAsyncEnumerator>.Current => new KNetConsumerRecord(_recordAsyncEnumerator.Current, _keyDeserializer, _valueDeserializer); + + KNetConsumerRecord IEnumerator>.Current => new KNetConsumerRecord(_recordEnumerator.Current, _keyDeserializer, _valueDeserializer); + + object System.Collections.IEnumerator.Current => (_recordEnumerator as System.Collections.IEnumerator)?.Current; + + public void Dispose() + { + + } + + public ValueTask DisposeAsync() + { + return _recordAsyncEnumerator.DisposeAsync(); + } + + public bool MoveNext() + { + return _recordEnumerator.MoveNext(); + } + + public ValueTask MoveNextAsync() + { + return _recordAsyncEnumerator.MoveNextAsync(); + } + + public void Reset() + { + _recordEnumerator = _records.GetEnumerator(); + } + } +} diff --git a/src/net/KNet/Specific/GenericConfigBuilder.cs b/src/net/KNet/Specific/GenericConfigBuilder.cs index 2ce09d52af..40a00e6317 100644 --- a/src/net/KNet/Specific/GenericConfigBuilder.cs +++ b/src/net/KNet/Specific/GenericConfigBuilder.cs @@ -58,6 +58,16 @@ public static T CreateFrom(T origin) public event System.ComponentModel.PropertyChangedEventHandler PropertyChanged; System.Collections.Generic.Dictionary _options = new(); + + /// + /// Verify if the was previously set + /// + /// The property name to be verified + /// if property exists, otherwise + public bool ExistProperty(string propertyName) + { + return _options.ContainsKey(propertyName); + } /// /// Reads the set /// diff --git a/src/net/KNet/Specific/Producer/KNetProducer.cs b/src/net/KNet/Specific/Producer/KNetProducer.cs index 7b0c22c786..44e5e03815 100644 --- a/src/net/KNet/Specific/Producer/KNetProducer.cs +++ b/src/net/KNet/Specific/Producer/KNetProducer.cs @@ -16,7 +16,6 @@ * Refer to LICENSE for more information. */ -using Org.Apache.Kafka.Common.Serialization; using Java.Util; using Org.Apache.Kafka.Common.Header; using Org.Apache.Kafka.Clients.Producer; @@ -28,122 +27,6 @@ namespace MASES.KNet.Producer { - /// - /// KNet extension of - /// - public class KNetProducerRecord - { - /// - /// Initialize a new - /// - public KNetProducerRecord() - { - } - /// - /// Initialize a new - /// - public KNetProducerRecord(string topic, int partition, long timestamp, K key, V value, Headers headers) - { - Topic = topic; - Partition = partition; - Timestamp = timestamp; - Key = key; - Value = value; - Headers = headers; - } - /// - /// Initialize a new - /// - public KNetProducerRecord(string topic, int partition, System.DateTime timestamp, K key, V value, Headers headers) - { - Topic = topic; - Partition = partition; - Timestamp = new System.DateTimeOffset(timestamp).ToUnixTimeMilliseconds(); - Key = key; - Value = value; - Headers = headers; - } - /// - /// Initialize a new - /// - public KNetProducerRecord(string topic, int partition, long timestamp, K key, V value) - { - Topic = topic; - Partition = partition; - Timestamp = timestamp; - Key = key; - Value = value; - } - /// - /// Initialize a new - /// - public KNetProducerRecord(string topic, int partition, System.DateTime timestamp, K key, V value) - { - Topic = topic; - Partition = partition; - Timestamp = new System.DateTimeOffset(timestamp).ToUnixTimeMilliseconds(); - Key = key; - Value = value; - } - /// - /// Initialize a new - /// - public KNetProducerRecord(string topic, int partition, K key, V value, Headers headers) - { - Topic = topic; - Partition = partition; - Key = key; - Value = value; - Headers = headers; - } - /// - /// Initialize a new - /// - public KNetProducerRecord(string topic, int partition, K key, V value) - { - Topic = topic; - Partition = partition; - Key = key; - Value = value; - } - /// - /// Initialize a new - /// - public KNetProducerRecord(string topic, K key, V value) - { - Topic = topic; - Key = key; - Value = value; - } - /// - /// Initialize a new - /// - public KNetProducerRecord(string topic, V value) - { - Topic = topic; - Value = value; - } - /// - public string Topic { get; private set; } - /// - public int? Partition { get; private set; } - /// - public K Key { get; private set; } - /// - public V Value { get; private set; } - /// - public long? Timestamp { get; private set; } - /// - public System.DateTime? DateTime => Timestamp.HasValue ? System.DateTimeOffset.FromUnixTimeMilliseconds(Timestamp.Value).DateTime : null; - /// - public Headers Headers { get; private set; } - /// - public override string ToString() - { - return $"Topic: {Topic} - Partition {Partition} - Key {Key} - Value {Value}"; - } - } - /// /// Extends adding less intrusive methods which performs better in high throughput applications /// diff --git a/src/net/KNet/Specific/Producer/KNetProducerRecord.cs b/src/net/KNet/Specific/Producer/KNetProducerRecord.cs new file mode 100644 index 0000000000..751c500030 --- /dev/null +++ b/src/net/KNet/Specific/Producer/KNetProducerRecord.cs @@ -0,0 +1,139 @@ +/* +* Copyright 2023 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using Org.Apache.Kafka.Common.Header; +using Org.Apache.Kafka.Clients.Producer; + +namespace MASES.KNet.Producer +{ + /// + /// KNet extension of + /// + public class KNetProducerRecord + { + /// + /// Initialize a new + /// + public KNetProducerRecord() + { + } + /// + /// Initialize a new + /// + public KNetProducerRecord(string topic, int partition, long timestamp, K key, V value, Headers headers) + { + Topic = topic; + Partition = partition; + Timestamp = timestamp; + Key = key; + Value = value; + Headers = headers; + } + /// + /// Initialize a new + /// + public KNetProducerRecord(string topic, int partition, System.DateTime timestamp, K key, V value, Headers headers) + { + Topic = topic; + Partition = partition; + Timestamp = new System.DateTimeOffset(timestamp).ToUnixTimeMilliseconds(); + Key = key; + Value = value; + Headers = headers; + } + /// + /// Initialize a new + /// + public KNetProducerRecord(string topic, int partition, long timestamp, K key, V value) + { + Topic = topic; + Partition = partition; + Timestamp = timestamp; + Key = key; + Value = value; + } + /// + /// Initialize a new + /// + public KNetProducerRecord(string topic, int partition, System.DateTime timestamp, K key, V value) + { + Topic = topic; + Partition = partition; + Timestamp = new System.DateTimeOffset(timestamp).ToUnixTimeMilliseconds(); + Key = key; + Value = value; + } + /// + /// Initialize a new + /// + public KNetProducerRecord(string topic, int partition, K key, V value, Headers headers) + { + Topic = topic; + Partition = partition; + Key = key; + Value = value; + Headers = headers; + } + /// + /// Initialize a new + /// + public KNetProducerRecord(string topic, int partition, K key, V value) + { + Topic = topic; + Partition = partition; + Key = key; + Value = value; + } + /// + /// Initialize a new + /// + public KNetProducerRecord(string topic, K key, V value) + { + Topic = topic; + Key = key; + Value = value; + } + /// + /// Initialize a new + /// + public KNetProducerRecord(string topic, V value) + { + Topic = topic; + Value = value; + } + /// + public string Topic { get; private set; } + /// + public int? Partition { get; private set; } + /// + public K Key { get; private set; } + /// + public V Value { get; private set; } + /// + public long? Timestamp { get; private set; } + /// + public System.DateTime? DateTime => Timestamp.HasValue ? System.DateTimeOffset.FromUnixTimeMilliseconds(Timestamp.Value).DateTime : null; + /// + public Headers Headers { get; private set; } + /// + public override string ToString() + { + return $"Topic: {Topic} - Partition {Partition} - Key {Key} - Value {Value}"; + } + } +} diff --git a/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs b/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs index 117f466cfc..1ade25a809 100644 --- a/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs +++ b/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs @@ -16,7 +16,6 @@ * Refer to LICENSE for more information. */ -using Java.Time; using Java.Util; using MASES.JCOBridge.C2JBridge; using MASES.KNet.Admin; @@ -29,7 +28,6 @@ using Org.Apache.Kafka.Clients.Consumer; using Org.Apache.Kafka.Clients.Producer; using Org.Apache.Kafka.Common; -using Org.Apache.Kafka.Common.Config; using Org.Apache.Kafka.Common.Errors; using System; using System.Collections; @@ -38,7 +36,6 @@ using System.Diagnostics; using System.Linq; using System.Threading; -using static Javax.Swing.Text.Html.HTML; namespace MASES.KNet.Replicator {