Skip to content

Commit

Permalink
Updates on serializers retrieve (#453)
Browse files Browse the repository at this point in the history
  • Loading branch information
masesdevelopers authored May 16, 2024
1 parent 6aa6784 commit b274190
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,33 @@
*/

using MASES.JCOBridge.C2JBridge;
using MASES.JCOBridge.C2JBridge.JVMInterop;

namespace Org.Apache.Kafka.Common.Serialization
{
/// <summary>
/// <summary>
/// Listener for Kafka Serializer. Extends <see cref="JVMBridgeListener"/>. Implements <see cref="ISerializer{T}"/>
/// </summary>
/// <remarks>Dispose the object to avoid a resource leak, the object contains a reference to the corresponding JVM object</remarks>
public partial class Serde<T> : ISerde<T>
public partial class SerdeDirect<T> : ISerde<T>
{
/// <summary>
/// <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-clients/3.6.2/org/apache/kafka/common/serialization/Serde.html#deserializer--"/>
/// </summary>
/// <returns><see cref="Org.Apache.Kafka.Common.Serialization.Deserializer"/></returns>
public Org.Apache.Kafka.Common.Serialization.Deserializer<T> DeserializerDirect()
{
return new DeserializerDirect<T>();
var res = this.IExecute("deserializer") as IJavaObject;
return WrapsDirect<DeserializerDirect<T>>(res);
}
/// <summary>
/// <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-clients/3.6.2/org/apache/kafka/common/serialization/Serde.html#serializer--"/>
/// </summary>
/// <returns><see cref="Org.Apache.Kafka.Common.Serialization.Serializer"/></returns>
public Org.Apache.Kafka.Common.Serialization.Serializer<T> SerializerDirect()
{
return new SerializerDirect<T>();
var res = this.IExecute("serializer") as IJavaObject;
return WrapsDirect<SerializerDirect<T>>(res);
}
}
}
58 changes: 19 additions & 39 deletions src/net/KNet/Specific/Serialization/SerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,81 +114,61 @@ public SerDes()
throw new InvalidOperationException($"{_SerializationType} is not valid.");
}

SerdeDirect<TJVMT> kSerde = null;

if (IsDirectBuffered)
{
_KafkaSerializer = new KNetByteBufferSerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new KNetByteBufferDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new KNetSerdes.ByteBufferSerde().CastDirect<Serde<TJVMT>>();
kSerde = new KNetSerdes.ByteBufferSerde().CastTo<SerdeDirect<TJVMT>>();
}
else
{
switch (_JVMSerializationType)
{
case KNetSerialization.SerializationType.Boolean:
_KafkaSerializer = new BooleanSerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new BooleanDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new Serdes.BooleanSerde().CastDirect<Serde<TJVMT>>();
kSerde = new Serdes.BooleanSerde().CastTo<SerdeDirect<TJVMT>>();
break;
case KNetSerialization.SerializationType.ByteArray:
_KafkaSerializer = new ByteArraySerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new ByteArrayDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new Serdes.ByteArraySerde().CastDirect<Serde<TJVMT>>();
kSerde = new Serdes.ByteArraySerde().CastTo<SerdeDirect<TJVMT>>();
break;
case KNetSerialization.SerializationType.ByteBuffer:
_KafkaSerializer = new ByteBufferSerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new ByteBufferDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new Serdes.ByteBufferSerde().CastDirect<Serde<TJVMT>>();
kSerde = new Serdes.ByteBufferSerde().CastTo<SerdeDirect<TJVMT>>();
break;
case KNetSerialization.SerializationType.Bytes:
_KafkaSerializer = new BytesSerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new BytesDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new Serdes.BytesSerde().CastDirect<Serde<TJVMT>>();
kSerde = new Serdes.BytesSerde().CastTo<SerdeDirect<TJVMT>>();
break;
case KNetSerialization.SerializationType.Double:
_KafkaSerializer = new DoubleSerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new DoubleDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new Serdes.DoubleSerde().CastDirect<Serde<TJVMT>>();
kSerde = new Serdes.DoubleSerde().CastTo<SerdeDirect<TJVMT>>();
break;
case KNetSerialization.SerializationType.Float:
_KafkaSerializer = new FloatSerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new FloatDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new Serdes.FloatSerde().CastDirect<Serde<TJVMT>>();
kSerde = new Serdes.FloatSerde().CastTo<SerdeDirect<TJVMT>>();
break;
case KNetSerialization.SerializationType.Integer:
_KafkaSerializer = new IntegerSerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new IntegerDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new Serdes.IntegerSerde().CastDirect<Serde<TJVMT>>();
kSerde = new Serdes.IntegerSerde().CastTo<SerdeDirect<TJVMT>>();
break;
case KNetSerialization.SerializationType.Long:
_KafkaSerializer = new LongSerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new LongDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new Serdes.LongSerde().CastDirect<Serde<TJVMT>>();
kSerde = new Serdes.LongSerde().CastTo<SerdeDirect<TJVMT>>();
break;
case KNetSerialization.SerializationType.Short:
_KafkaSerializer = new ShortSerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new ShortDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new Serdes.ShortSerde().CastDirect<Serde<TJVMT>>();
kSerde = new Serdes.ShortSerde().CastTo<SerdeDirect<TJVMT>>();
break;
case KNetSerialization.SerializationType.String:
_KafkaSerializer = new StringSerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new StringDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new Serdes.StringSerde().CastDirect<Serde<TJVMT>>();
kSerde = new Serdes.StringSerde().CastTo<SerdeDirect<TJVMT>>();
break;
case KNetSerialization.SerializationType.Guid:
_KafkaSerializer = new UUIDSerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new UUIDDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new Serdes.UUIDSerde().CastDirect<Serde<TJVMT>>();
kSerde = new Serdes.UUIDSerde().CastTo<SerdeDirect<TJVMT>>();
break;
case KNetSerialization.SerializationType.Void:
_KafkaSerializer = new VoidSerializer().CastDirect<Serializer<TJVMT>>();
_KafkaDeserializer = new VoidDeserializer().CastDirect<Deserializer<TJVMT>>();
_KafkaSerde = new Serdes.VoidSerde().CastDirect<Serde<TJVMT>>();
kSerde = new Serdes.VoidSerde().CastTo<SerdeDirect<TJVMT>>();
break;
case KNetSerialization.SerializationType.External:
default:
throw new InvalidOperationException($"{typeof(T)} needs an external serializer: set {nameof(OnSerialize)} or {nameof(OnSerializeWithHeaders)}.");
}
}

_KafkaSerde = kSerde;
_KafkaSerializer = kSerde.SerializerDirect();
_KafkaDeserializer = kSerde.DeserializerDirect();
}
/// <summary>
/// Finalizer
Expand Down

0 comments on commit b274190

Please sign in to comment.