Skip to content

Commit

Permalink
Remove dependencies from knet connect sdk of jnet types (Map, Collect…
Browse files Browse the repository at this point in the history
…ion, etc) (#127)

* #92: upgrade to JNet 1.4.15

* #125: upgrade API to remove usage of Java types like Map, Collection and so on

* #125: test upgrade
  • Loading branch information
masesdevelopers authored Nov 21, 2022
1 parent b94dd99 commit 6a8ead8
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 83 deletions.
47 changes: 34 additions & 13 deletions src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
using Java.Lang;
using Java.Util;
using MASES.JCOBridge.C2JBridge.JVMInterop;
using MASES.JNet.Extensions;
using MASES.KNet.Common.Config;
using MASES.KNet.Connect.Connector;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;

namespace MASES.KNet.Connect
{
Expand All @@ -31,6 +33,10 @@ namespace MASES.KNet.Connect
/// </summary>
public interface IKNetConnector : IConnector
{
/// <summary>
/// The properties retrieved from <see cref="KNetConnector.StartInternal"/>
/// </summary>
IReadOnlyDictionary<string, string> Properties { get; }
/// <summary>
/// Allocates a task object based on <see cref="KNetTask"/>
/// </summary>
Expand All @@ -46,11 +52,16 @@ public interface IKNetConnector : IConnector
/// </summary>
Type TaskClassType { get; }
/// <summary>
/// Implement the method to execute the start action
/// </summary>
/// <param name="props">The set of properties returned from Apache Kafka Connect framework: the <see cref="IReadOnlyDictionary{string, string}"/> contains the same info from configuration file.</param>
void Start(IReadOnlyDictionary<string, string> props);
/// <summary>
/// Invoked during allocation of tasks from Apache Kafka Connect
/// </summary>
/// <param name="index">The actual index</param>
/// <param name="config">The <see cref="Map{string, string}"/> to be filled in with properties for the task: the same will be received from <see cref="KNetTask.Start(Map{string, string})"/></param>
void TaskConfigs(int index, Map<string, string> config);
/// <param name="config">The <see cref="IDictionary{string, string}"/> to be filled in with properties for the task: the same will be received from <see cref="KNetTask.Start(IReadOnlyDictionary{string, string})"/></param>
void TaskConfigs(int index, IDictionary<string, string> config);
}
/// <summary>
/// The generic class which is the base of both source or sink connectors
Expand Down Expand Up @@ -125,6 +136,9 @@ protected T Context<T>()
return ExecuteOnConnector<T>("getContext");
}

/// <inheritdoc cref="IKNetConnector.Properties"/>
public IReadOnlyDictionary<string, string> Properties { get; private set; }

/// <inheritdoc cref="IKNetConnector.AllocateTask(long)"/>
public object AllocateTask(long taskId)
{
Expand All @@ -146,20 +160,21 @@ public object AllocateTask(long taskId)

public void Initialize(ConnectorContext ctx) => throw new NotImplementedException("Invoked in Java before any initialization.");

public void Initialize(ConnectorContext ctx, List<Map<string, string>> taskConfigs) => throw new NotImplementedException("Invoked in Java before any initialization.");
public void Initialize(ConnectorContext ctx, Java.Util.List<Map<string, string>> taskConfigs) => throw new NotImplementedException("Invoked in Java before any initialization.");
/// <summary>
/// Public method used from Java to trigger <see cref="Start(Map{string, string})"/>
/// </summary>
public void StartInternal()
{
Map<string, string> props = DataToExchange<Map<string, string>>();
Start(props);
Properties = props.ToDictiony();
Start(Properties);
}
/// <summary>
/// Implement the method to execute the start action
/// </summary>
/// <param name="props">The set of properties returned from Apache Kafka Connect framework: the <see cref="Map{string, string}"/> contains the same info from configuration file.</param>
public abstract void Start(Map<string, string> props);

public void Start(Map<string, string> props) => throw new NotImplementedException("Local version with a different signature");

/// <inheritdoc cref="IKNetConnector.Start(IReadOnlyDictionary{string, string})"/>
public abstract void Start(IReadOnlyDictionary<string, string> props);

public void Reconfigure(Map<string, string> props) => throw new NotImplementedException("Invoked in Java before any initialization.");

Expand All @@ -170,12 +185,18 @@ public void StartInternal()
public void TaskConfigsInternal(int index)
{
Map<string, string> props = DataToExchange<Map<string, string>>();
TaskConfigs(index, props);
System.Collections.Generic.Dictionary<string, string> dict = new System.Collections.Generic.Dictionary<string, string>(props.ToDictiony());
TaskConfigs(index, dict);
props.Clear();
foreach (var item in dict)
{
props.Put(item.Key, item.Value);
}
}
/// <inheritdoc cref="IKNetConnector.TaskConfigs(int, Map{string, string})"/>
public abstract void TaskConfigs(int index, Map<string, string> config);
/// <inheritdoc cref="IKNetConnector.TaskConfigs(int, IDictionary{string, string})"/>
public abstract void TaskConfigs(int index, IDictionary<string, string> config);

public List<Map<string, string>> TaskConfigs(int maxTasks) => throw new NotImplementedException("Invoked using the other signature.");
public Java.Util.List<Map<string, string>> TaskConfigs(int maxTasks) => throw new NotImplementedException("Invoked using the other signature.");
/// <summary>
/// Public method used from Java to trigger <see cref="Stop"/>
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
*/

using Java.Util;
using MASES.JNet.Extensions;
using MASES.KNet.Connect.Sink;
using System.Collections.Generic;

namespace MASES.KNet.Connect
{
Expand All @@ -42,12 +44,13 @@ public abstract class KNetSinkTask<TTask> : KNetTask<TTask>
public void PutInternal()
{
Collection<SinkRecord> collection = DataToExchange<Collection<SinkRecord>>();
Put(collection);
var list = collection.ToList();
Put(list);
}
/// <summary>
/// Implement the method to execute the Put action
/// </summary>
/// <param name="collection">The set of <see cref="SinkRecord"/> from Apache Kafka Connect framework</param>
public abstract void Put(Collection<SinkRecord> collection);
public abstract void Put(IEnumerable<SinkRecord> collection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* Refer to LICENSE for more information.
*/

using Java.Util;
using MASES.KNet.Connect.Source;
using System;

Expand Down Expand Up @@ -44,9 +43,9 @@ public ExactlyOnceSupport ExactlyOnceSupportInternal()
return ExactlyOnceSupport();
}
/// <summary>
/// Implement the method to execute the exactlyOnceSupport action
/// Implement the method to return the <see cref="KNet.Connect.Source.ExactlyOnceSupport"/> value
/// </summary>
public abstract ExactlyOnceSupport ExactlyOnceSupport();
public virtual ExactlyOnceSupport ExactlyOnceSupport() => KNet.Connect.Source.ExactlyOnceSupport.UNSUPPORTED;

/// <summary>
/// Public method used from Java to trigger <see cref="CanDefineTransactionBoundaries"/>
Expand All @@ -56,9 +55,9 @@ public ConnectorTransactionBoundaries CanDefineTransactionBoundariesInternal()
return CanDefineTransactionBoundaries();
}
/// <summary>
/// Implement the method to execute the canDefineTransactionBoundaries action
/// Implement the method to return the <see cref="ConnectorTransactionBoundaries"/> value
/// </summary>
public abstract ConnectorTransactionBoundaries CanDefineTransactionBoundaries();
public virtual ConnectorTransactionBoundaries CanDefineTransactionBoundaries() => ConnectorTransactionBoundaries.UNSUPPORTED;

/// <summary>
/// Set the <see cref="ReflectedConnectorClassName"/> of the connector to a fixed value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
*/

using Java.Util;
using MASES.JNet.Extensions;
using MASES.KNet.Connect.Source;
using System.Collections.Generic;

namespace MASES.KNet.Connect
{
Expand Down Expand Up @@ -46,12 +48,12 @@ public abstract class KNetSourceTask<TTask> : KNetTask<TTask>
public void PollInternal()
{
var result = Poll();
DataToExchange(result);
DataToExchange(result.ToJCollection());
}
/// <summary>
/// Implement the method to execute the Poll action
/// </summary>
/// <returns>The list of <see cref="SourceRecord"/> to return to Apache Kafka Connect framework</returns>
public abstract List<SourceRecord> Poll();
public abstract IList<SourceRecord> Poll();
}
}
29 changes: 22 additions & 7 deletions src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

using Java.Util;
using MASES.JCOBridge.C2JBridge.JVMInterop;
using MASES.JNet.Extensions;
using MASES.KNet.Connect.Connector;
using System;
using System.Collections.Generic;

namespace MASES.KNet.Connect
{
Expand All @@ -28,6 +30,10 @@ namespace MASES.KNet.Connect
/// </summary>
public interface IKNetTask : ITask
{
/// <summary>
/// The properties retrieved from <see cref="KNetTask.StartInternal"/>
/// </summary>
IReadOnlyDictionary<string, string> Properties { get; }
/// <summary>
/// The associated <see cref="IConnector"/>
/// </summary>
Expand All @@ -36,6 +42,11 @@ public interface IKNetTask : ITask
/// The id received during initialization
/// </summary>
long TaskId { get; }
/// <summary>
/// Implement the method to execute the start action
/// </summary>
/// <param name="props">The set of properties returned from Apache Kafka Connect framework: the <see cref="IReadOnlyDictionary{string, string}"/> contains the info from <see cref="IKNetConnector.TaskConfigs(int, Map{string, string})"/>.</param>
void Start(IReadOnlyDictionary<string, string> props);
}
/// <summary>
/// The generic class which is the base of both source or sink task
Expand Down Expand Up @@ -90,6 +101,9 @@ protected T Context<T>()
{
return (reflectedTask != null) ? reflectedTask.Invoke<T>("getContext") : throw new InvalidOperationException($"{ReflectedTaskClassName} was not registered in global JVM");
}

/// <inheritdoc cref="IKNetTask.Properties"/>
public IReadOnlyDictionary<string, string> Properties { get; private set; }
/// <inheritdoc cref="IKNetTask.Connector"/>
public IKNetConnector Connector => connector;
/// <inheritdoc cref="IKNetTask.TaskId"/>
Expand All @@ -103,14 +117,15 @@ protected T Context<T>()
/// </summary>
public void StartInternal()
{
Map<string, string> props = DataToExchange<Map<string, string>>();
Start(props);
Map<string, string> props = DataToExchange<Map<string, string>>();
Properties = props.ToDictiony();
Start(Properties);
}
/// <summary>
/// Implement the method to execute the start action
/// </summary>
/// <param name="props">The set of properties returned from Apache Kafka Connect framework: the <see cref="Map{string, string}"/> contains the info from <see cref="IKNetConnector.TaskConfigs(int, Map{string, string})"/>.</param>
public abstract void Start(Map<string, string> props);

public void Start(Map<string, string> props) => throw new NotImplementedException("Local version with a different signature");

/// <inheritdoc cref="IKNetTask.Start(IReadOnlyDictionary{string, string})"/>
public abstract void Start(IReadOnlyDictionary<string, string> props);
/// <summary>
/// Public method used from Java to trigger <see cref="Stop"/>
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/net/KNet/KNet.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<None Include="..\Documentation\articles\usage.md" Pack="true" PackagePath="\" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="MASES.JNet" Version="1.4.14">
<PackageReference Include="MASES.JNet" Version="1.4.15">
<IncludeAssets>All</IncludeAssets>
<PrivateAssets>None</PrivateAssets>
</PackageReference>
Expand Down
24 changes: 12 additions & 12 deletions src/net/templates/templates/knetConnectSink/KNetConnectSink.cs
Original file line number Diff line number Diff line change
@@ -1,42 +1,42 @@
using Java.Util;
using MASES.KNet.Connect;
using MASES.KNet.Connect;
using MASES.KNet.Connect.Sink;
using System.Collections.Generic;

namespace MASES.KNetTemplate.KNetConnect
{
public class KNetConnectSink : KNetSinkConnector<KNetConnectSink, KNetConnectSinkTask>
{
public override void Start(Map<string, string> props)
public override void Start(IReadOnlyDictionary<string, string> props)
{

// starts the connector, the method receives the configuration properties
}

public override void Stop()
{

// stops the connector
}

public override void TaskConfigs(int index, Map<string, string> config)
public override void TaskConfigs(int index, IDictionary<string, string> config)
{

// fill in the properties for task configuration
}
}

public class KNetConnectSinkTask : KNetSinkTask<KNetConnectSinkTask>
{
public override void Put(Collection<SinkRecord> collection)
public override void Put(IEnumerable<SinkRecord> collection)
{

// receives the records from Apache Kafka Connect to be used from connector
}

public override void Start(Map<string, string> props)
public override void Start(IReadOnlyDictionary<string, string> props)
{

// starts the task with the configuration set from connector
}

public override void Stop()
{

// stops the task
}
}
}
33 changes: 12 additions & 21 deletions src/net/templates/templates/knetConnectSource/KNetConnectSource.cs
Original file line number Diff line number Diff line change
@@ -1,52 +1,43 @@
using Java.Util;
using MASES.KNet.Connect;
using MASES.KNet.Connect;
using MASES.KNet.Connect.Source;
using System.Collections.Generic;

namespace MASES.KNetTemplate.KNetConnect
{
public class KNetConnectSource : KNetSourceConnector<KNetConnectSource, KNetConnectSourceTask>
{
public override void Start(Map<string, string> props)
public override void Start(IReadOnlyDictionary<string, string> props)
{

// starts the connector, the method receives the configuration properties
}

public override void Stop()
{

}

public override void TaskConfigs(int index, Map<string, string> config)
{

// stops the connector
}

public override ExactlyOnceSupport ExactlyOnceSupport()
public override void TaskConfigs(int index, IDictionary<string, string> config)
{
return KNet.Connect.Source.ExactlyOnceSupport.UNSUPPORTED;
}

public override ConnectorTransactionBoundaries CanDefineTransactionBoundaries()
{
return ConnectorTransactionBoundaries.UNSUPPORTED;
// fill in the properties for task configuration
}
}

public class KNetConnectSourceTask : KNetSourceTask<KNetConnectSourceTask>
{
public override List<SourceRecord> Poll()
public override IList<SourceRecord> Poll()
{
// returns the records to Apache Kafka Connect to be used from connector
return null;
}

public override void Start(Map<string, string> props)
public override void Start(IReadOnlyDictionary<string, string> props)
{

// starts the task with the configuration set from connector
}

public override void Stop()
{

// stops the task
}
}
}
Loading

0 comments on commit 6a8ead8

Please sign in to comment.