diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs index 00a5cbe3d9..1ff0c84670 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs @@ -19,10 +19,12 @@ using Java.Lang; using Java.Util; using MASES.JCOBridge.C2JBridge.JVMInterop; +using MASES.JNet.Extensions; using MASES.KNet.Common.Config; using MASES.KNet.Connect.Connector; using System; using System.Collections.Concurrent; +using System.Collections.Generic; namespace MASES.KNet.Connect { @@ -31,6 +33,10 @@ namespace MASES.KNet.Connect /// public interface IKNetConnector : IConnector { + /// + /// The properties retrieved from + /// + IReadOnlyDictionary Properties { get; } /// /// Allocates a task object based on /// @@ -46,11 +52,16 @@ public interface IKNetConnector : IConnector /// Type TaskClassType { get; } /// + /// Implement the method to execute the start action + /// + /// The set of properties returned from Apache Kafka Connect framework: the contains the same info from configuration file. + void Start(IReadOnlyDictionary props); + /// /// Invoked during allocation of tasks from Apache Kafka Connect /// /// The actual index - /// The to be filled in with properties for the task: the same will be received from - void TaskConfigs(int index, Map config); + /// The to be filled in with properties for the task: the same will be received from + void TaskConfigs(int index, IDictionary config); } /// /// The generic class which is the base of both source or sink connectors @@ -125,6 +136,9 @@ protected T Context() return ExecuteOnConnector("getContext"); } + /// + public IReadOnlyDictionary Properties { get; private set; } + /// public object AllocateTask(long taskId) { @@ -146,20 +160,21 @@ public object AllocateTask(long taskId) public void Initialize(ConnectorContext ctx) => throw new NotImplementedException("Invoked in Java before any initialization."); - public void Initialize(ConnectorContext ctx, List> taskConfigs) => throw new NotImplementedException("Invoked in Java before any initialization."); + public void Initialize(ConnectorContext ctx, Java.Util.List> taskConfigs) => throw new NotImplementedException("Invoked in Java before any initialization."); /// /// Public method used from Java to trigger /// public void StartInternal() { Map props = DataToExchange>(); - Start(props); + Properties = props.ToDictiony(); + Start(Properties); } - /// - /// Implement the method to execute the start action - /// - /// The set of properties returned from Apache Kafka Connect framework: the contains the same info from configuration file. - public abstract void Start(Map props); + + public void Start(Map props) => throw new NotImplementedException("Local version with a different signature"); + + /// + public abstract void Start(IReadOnlyDictionary props); public void Reconfigure(Map props) => throw new NotImplementedException("Invoked in Java before any initialization."); @@ -170,12 +185,18 @@ public void StartInternal() public void TaskConfigsInternal(int index) { Map props = DataToExchange>(); - TaskConfigs(index, props); + System.Collections.Generic.Dictionary dict = new System.Collections.Generic.Dictionary(props.ToDictiony()); + TaskConfigs(index, dict); + props.Clear(); + foreach (var item in dict) + { + props.Put(item.Key, item.Value); + } } - /// - public abstract void TaskConfigs(int index, Map config); + /// + public abstract void TaskConfigs(int index, IDictionary config); - public List> TaskConfigs(int maxTasks) => throw new NotImplementedException("Invoked using the other signature."); + public Java.Util.List> TaskConfigs(int maxTasks) => throw new NotImplementedException("Invoked using the other signature."); /// /// Public method used from Java to trigger /// diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkTask.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkTask.cs index 28239307bb..d71fc6a6a0 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkTask.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkTask.cs @@ -17,7 +17,9 @@ */ using Java.Util; +using MASES.JNet.Extensions; using MASES.KNet.Connect.Sink; +using System.Collections.Generic; namespace MASES.KNet.Connect { @@ -42,12 +44,13 @@ public abstract class KNetSinkTask : KNetTask public void PutInternal() { Collection collection = DataToExchange>(); - Put(collection); + var list = collection.ToList(); + Put(list); } /// /// Implement the method to execute the Put action /// /// The set of from Apache Kafka Connect framework - public abstract void Put(Collection collection); + public abstract void Put(IEnumerable collection); } } diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceConnector.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceConnector.cs index 7b2c1fb2b6..9c1d2d181e 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceConnector.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceConnector.cs @@ -16,7 +16,6 @@ * Refer to LICENSE for more information. */ -using Java.Util; using MASES.KNet.Connect.Source; using System; @@ -44,9 +43,9 @@ public ExactlyOnceSupport ExactlyOnceSupportInternal() return ExactlyOnceSupport(); } /// - /// Implement the method to execute the exactlyOnceSupport action + /// Implement the method to return the value /// - public abstract ExactlyOnceSupport ExactlyOnceSupport(); + public virtual ExactlyOnceSupport ExactlyOnceSupport() => KNet.Connect.Source.ExactlyOnceSupport.UNSUPPORTED; /// /// Public method used from Java to trigger @@ -56,9 +55,9 @@ public ConnectorTransactionBoundaries CanDefineTransactionBoundariesInternal() return CanDefineTransactionBoundaries(); } /// - /// Implement the method to execute the canDefineTransactionBoundaries action + /// Implement the method to return the value /// - public abstract ConnectorTransactionBoundaries CanDefineTransactionBoundaries(); + public virtual ConnectorTransactionBoundaries CanDefineTransactionBoundaries() => ConnectorTransactionBoundaries.UNSUPPORTED; /// /// Set the of the connector to a fixed value diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs index 13ef328b90..5a66219006 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs @@ -17,7 +17,9 @@ */ using Java.Util; +using MASES.JNet.Extensions; using MASES.KNet.Connect.Source; +using System.Collections.Generic; namespace MASES.KNet.Connect { @@ -46,12 +48,12 @@ public abstract class KNetSourceTask : KNetTask public void PollInternal() { var result = Poll(); - DataToExchange(result); + DataToExchange(result.ToJCollection()); } /// /// Implement the method to execute the Poll action /// /// The list of to return to Apache Kafka Connect framework - public abstract List Poll(); + public abstract IList Poll(); } } diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs index a5dcdc9818..b76808a4a7 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs @@ -18,8 +18,10 @@ using Java.Util; using MASES.JCOBridge.C2JBridge.JVMInterop; +using MASES.JNet.Extensions; using MASES.KNet.Connect.Connector; using System; +using System.Collections.Generic; namespace MASES.KNet.Connect { @@ -28,6 +30,10 @@ namespace MASES.KNet.Connect /// public interface IKNetTask : ITask { + /// + /// The properties retrieved from + /// + IReadOnlyDictionary Properties { get; } /// /// The associated /// @@ -36,6 +42,11 @@ public interface IKNetTask : ITask /// The id received during initialization /// long TaskId { get; } + /// + /// Implement the method to execute the start action + /// + /// The set of properties returned from Apache Kafka Connect framework: the contains the info from . + void Start(IReadOnlyDictionary props); } /// /// The generic class which is the base of both source or sink task @@ -90,6 +101,9 @@ protected T Context() { return (reflectedTask != null) ? reflectedTask.Invoke("getContext") : throw new InvalidOperationException($"{ReflectedTaskClassName} was not registered in global JVM"); } + + /// + public IReadOnlyDictionary Properties { get; private set; } /// public IKNetConnector Connector => connector; /// @@ -103,14 +117,15 @@ protected T Context() /// public void StartInternal() { - Map props = DataToExchange>(); - Start(props); + Map props = DataToExchange>(); + Properties = props.ToDictiony(); + Start(Properties); } - /// - /// Implement the method to execute the start action - /// - /// The set of properties returned from Apache Kafka Connect framework: the contains the info from . - public abstract void Start(Map props); + + public void Start(Map props) => throw new NotImplementedException("Local version with a different signature"); + + /// + public abstract void Start(IReadOnlyDictionary props); /// /// Public method used from Java to trigger /// diff --git a/src/net/KNet/KNet.csproj b/src/net/KNet/KNet.csproj index cec683f550..5696842442 100644 --- a/src/net/KNet/KNet.csproj +++ b/src/net/KNet/KNet.csproj @@ -18,7 +18,7 @@ - + All None diff --git a/src/net/templates/templates/knetConnectSink/KNetConnectSink.cs b/src/net/templates/templates/knetConnectSink/KNetConnectSink.cs index 5f0fb5d306..1593b398bc 100644 --- a/src/net/templates/templates/knetConnectSink/KNetConnectSink.cs +++ b/src/net/templates/templates/knetConnectSink/KNetConnectSink.cs @@ -1,42 +1,42 @@ -using Java.Util; -using MASES.KNet.Connect; +using MASES.KNet.Connect; using MASES.KNet.Connect.Sink; +using System.Collections.Generic; namespace MASES.KNetTemplate.KNetConnect { public class KNetConnectSink : KNetSinkConnector { - public override void Start(Map props) + public override void Start(IReadOnlyDictionary props) { - + // starts the connector, the method receives the configuration properties } public override void Stop() { - + // stops the connector } - public override void TaskConfigs(int index, Map config) + public override void TaskConfigs(int index, IDictionary config) { - + // fill in the properties for task configuration } } public class KNetConnectSinkTask : KNetSinkTask { - public override void Put(Collection collection) + public override void Put(IEnumerable collection) { - + // receives the records from Apache Kafka Connect to be used from connector } - public override void Start(Map props) + public override void Start(IReadOnlyDictionary props) { - + // starts the task with the configuration set from connector } public override void Stop() { - + // stops the task } } } diff --git a/src/net/templates/templates/knetConnectSource/KNetConnectSource.cs b/src/net/templates/templates/knetConnectSource/KNetConnectSource.cs index f6c9a1322a..0f75633729 100644 --- a/src/net/templates/templates/knetConnectSource/KNetConnectSource.cs +++ b/src/net/templates/templates/knetConnectSource/KNetConnectSource.cs @@ -1,52 +1,43 @@ -using Java.Util; -using MASES.KNet.Connect; +using MASES.KNet.Connect; using MASES.KNet.Connect.Source; +using System.Collections.Generic; namespace MASES.KNetTemplate.KNetConnect { public class KNetConnectSource : KNetSourceConnector { - public override void Start(Map props) + public override void Start(IReadOnlyDictionary props) { - + // starts the connector, the method receives the configuration properties } public override void Stop() { - - } - - public override void TaskConfigs(int index, Map config) - { - + // stops the connector } - public override ExactlyOnceSupport ExactlyOnceSupport() + public override void TaskConfigs(int index, IDictionary config) { - return KNet.Connect.Source.ExactlyOnceSupport.UNSUPPORTED; - } - - public override ConnectorTransactionBoundaries CanDefineTransactionBoundaries() - { - return ConnectorTransactionBoundaries.UNSUPPORTED; + // fill in the properties for task configuration } } public class KNetConnectSourceTask : KNetSourceTask { - public override List Poll() + public override IList Poll() { + // returns the records to Apache Kafka Connect to be used from connector return null; } - public override void Start(Map props) + public override void Start(IReadOnlyDictionary props) { - + // starts the task with the configuration set from connector } public override void Stop() { - + // stops the task } } } diff --git a/tests/KNetConnectTest/KNetConnectSink.cs b/tests/KNetConnectTest/KNetConnectSink.cs index eedbd43f44..10b7cfb251 100644 --- a/tests/KNetConnectTest/KNetConnectSink.cs +++ b/tests/KNetConnectTest/KNetConnectSink.cs @@ -19,12 +19,13 @@ using Java.Util; using MASES.KNet.Connect; using MASES.KNet.Connect.Sink; +using System.Collections.Generic; namespace MASES.KNetConnectTest { public class KNetSinkTestConnector : KNetSinkConnector { - public override void Start(Map props) + public override void Start(IReadOnlyDictionary props) { } @@ -34,7 +35,7 @@ public override void Stop() } - public override void TaskConfigs(int index, Map config) + public override void TaskConfigs(int index, IDictionary config) { } @@ -42,12 +43,12 @@ public override void TaskConfigs(int index, Map config) public class KNetSinkTestTask : KNetSinkTask { - public override void Put(Collection collection) + public override void Put(IEnumerable collection) { } - public override void Start(Map props) + public override void Start(IReadOnlyDictionary props) { } diff --git a/tests/KNetConnectTest/KNetConnectSource.cs b/tests/KNetConnectTest/KNetConnectSource.cs index 22585685f3..7260b75ebb 100644 --- a/tests/KNetConnectTest/KNetConnectSource.cs +++ b/tests/KNetConnectTest/KNetConnectSource.cs @@ -16,15 +16,15 @@ * Refer to LICENSE for more information. */ -using Java.Util; using MASES.KNet.Connect; using MASES.KNet.Connect.Source; +using System.Collections.Generic; namespace MASES.KNetConnectTest { public class KNetSourceTestConnector : KNetSourceConnector { - public override void Start(Map props) + public override void Start(IReadOnlyDictionary props) { } @@ -34,32 +34,22 @@ public override void Stop() } - public override void TaskConfigs(int index, Map config) + public override void TaskConfigs(int index, IDictionary config) { } - - public override ExactlyOnceSupport ExactlyOnceSupport() - { - return KNet.Connect.Source.ExactlyOnceSupport.UNSUPPORTED; - } - - public override ConnectorTransactionBoundaries CanDefineTransactionBoundaries() - { - return ConnectorTransactionBoundaries.UNSUPPORTED; - } } public class KNetSourceTestTask : KNetSourceTask { - public override List Poll() + public override IList Poll() { - ArrayList records = new(); + System.Collections.Generic.List records = new(); return records; } - public override void Start(Map props) + public override void Start(IReadOnlyDictionary props) { }