From 4fd05a54b2871bc233c885a5ee239fdd8550903b Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Thu, 10 Mar 2022 02:37:51 +0100 Subject: [PATCH 1/8] Reduced download time from repo --- .github/workflows/build.yaml | 1 + .github/workflows/pullrequest.yaml | 1 + .github/workflows/release.yaml | 1 + 3 files changed, 3 insertions(+) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index c116e3a1b9..473f535787 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -55,6 +55,7 @@ jobs: # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it - uses: actions/checkout@v2 with: + fetch-depth: '1' submodules: 'true' - name: Cache local Maven repository diff --git a/.github/workflows/pullrequest.yaml b/.github/workflows/pullrequest.yaml index 44bab1979c..20fbf8f957 100644 --- a/.github/workflows/pullrequest.yaml +++ b/.github/workflows/pullrequest.yaml @@ -23,6 +23,7 @@ jobs: # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it - uses: actions/checkout@v2 with: + fetch-depth: '1' submodules: 'true' - name: Cache local Maven repository diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index be208d7496..e10879e4fb 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -25,6 +25,7 @@ jobs: # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it - uses: actions/checkout@v2 with: + fetch-depth: '1' submodules: 'true' - name: Pre compile From ecad29c9164778c3d8cb5a8559008af9bc76f076 Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Thu, 10 Mar 2022 17:37:50 +0100 Subject: [PATCH 2/8] Version upgrade --- src/java/kafkabridge/pom.xml | 2 +- src/net/KafkaBridge/KafkaBridge.csproj | 2 +- src/net/KafkaBridgeCLI/KafkaBridgeCLI.csproj | 2 +- src/net/KafkaBridgeCLI/KafkaBridgeCLI.nuspec | 2 +- src/net/templates/templatepack.csproj | 2 +- .../kafkabridgeConsumerApp/kafkabridgeConsumerApp.csproj | 2 +- .../kafkabridgePipeStreamApp/kafkabridgePipeStreamApp.csproj | 2 +- .../kafkabridgeProducerApp/kafkabridgeProducerApp.csproj | 2 +- tests/KafkaBridgeTest/KafkaBridgeTest.csproj | 2 +- tests/KafkaBridgeTestAdmin/KafkaBridgeTestAdmin.csproj | 2 +- tests/KafkaBridgeTestStreams/KafkaBridgeTestStreams.csproj | 2 +- 11 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/java/kafkabridge/pom.xml b/src/java/kafkabridge/pom.xml index 57a145e280..c0cf5bd53e 100644 --- a/src/java/kafkabridge/pom.xml +++ b/src/java/kafkabridge/pom.xml @@ -9,7 +9,7 @@ mases.kafkabridge Apache Kafka interface bridging implementation https://github.com/masesgroup/KafkaBridge - 1.1.10.0 + 1.1.11.0 diff --git a/src/net/KafkaBridge/KafkaBridge.csproj b/src/net/KafkaBridge/KafkaBridge.csproj index 5d121ac8fb..c2d585c329 100644 --- a/src/net/KafkaBridge/KafkaBridge.csproj +++ b/src/net/KafkaBridge/KafkaBridge.csproj @@ -9,7 +9,7 @@ MASES s.r.l. MASES s.r.l. MASES s.r.l. - 1.1.10.0 + 1.1.11.0 KafkaBridge true net461;netcoreapp3.1;net5.0;net6.0 diff --git a/src/net/KafkaBridgeCLI/KafkaBridgeCLI.csproj b/src/net/KafkaBridgeCLI/KafkaBridgeCLI.csproj index 691cc4aaca..c19aa4daa4 100644 --- a/src/net/KafkaBridgeCLI/KafkaBridgeCLI.csproj +++ b/src/net/KafkaBridgeCLI/KafkaBridgeCLI.csproj @@ -10,7 +10,7 @@ MASES s.r.l. MASES s.r.l. MASES s.r.l. - 1.1.10.0 + 1.1.11.0 KafkaBridgeCLI true net461;netcoreapp3.1;net5.0;net6.0 diff --git a/src/net/KafkaBridgeCLI/KafkaBridgeCLI.nuspec b/src/net/KafkaBridgeCLI/KafkaBridgeCLI.nuspec index 8c7d82526f..7e68af14bf 100644 --- a/src/net/KafkaBridgeCLI/KafkaBridgeCLI.nuspec +++ b/src/net/KafkaBridgeCLI/KafkaBridgeCLI.nuspec @@ -2,7 +2,7 @@ MASES.KafkaBridgeCLI - 1.1.10 + 1.1.11 KafkaBridgeCLI - CLI interface of KafkaBridge MASES s.r.l. MASES s.r.l. diff --git a/src/net/templates/templatepack.csproj b/src/net/templates/templatepack.csproj index 350ea8918a..d32a6f996a 100644 --- a/src/net/templates/templatepack.csproj +++ b/src/net/templates/templatepack.csproj @@ -1,7 +1,7 @@ Template - 1.1.10.0 + 1.1.11.0 MASES.KafkaBridge.Templates KafkaBridge Templates - Templates to use the KafkaBridge MASES s.r.l. diff --git a/src/net/templates/templates/kafkabridgeConsumerApp/kafkabridgeConsumerApp.csproj b/src/net/templates/templates/kafkabridgeConsumerApp/kafkabridgeConsumerApp.csproj index 662db80f5c..8bc4220306 100644 --- a/src/net/templates/templates/kafkabridgeConsumerApp/kafkabridgeConsumerApp.csproj +++ b/src/net/templates/templates/kafkabridgeConsumerApp/kafkabridgeConsumerApp.csproj @@ -12,6 +12,6 @@ - + diff --git a/src/net/templates/templates/kafkabridgePipeStreamApp/kafkabridgePipeStreamApp.csproj b/src/net/templates/templates/kafkabridgePipeStreamApp/kafkabridgePipeStreamApp.csproj index 662db80f5c..8bc4220306 100644 --- a/src/net/templates/templates/kafkabridgePipeStreamApp/kafkabridgePipeStreamApp.csproj +++ b/src/net/templates/templates/kafkabridgePipeStreamApp/kafkabridgePipeStreamApp.csproj @@ -12,6 +12,6 @@ - + diff --git a/src/net/templates/templates/kafkabridgeProducerApp/kafkabridgeProducerApp.csproj b/src/net/templates/templates/kafkabridgeProducerApp/kafkabridgeProducerApp.csproj index 662db80f5c..8bc4220306 100644 --- a/src/net/templates/templates/kafkabridgeProducerApp/kafkabridgeProducerApp.csproj +++ b/src/net/templates/templates/kafkabridgeProducerApp/kafkabridgeProducerApp.csproj @@ -12,6 +12,6 @@ - + diff --git a/tests/KafkaBridgeTest/KafkaBridgeTest.csproj b/tests/KafkaBridgeTest/KafkaBridgeTest.csproj index 4de65f5acd..57111246ad 100644 --- a/tests/KafkaBridgeTest/KafkaBridgeTest.csproj +++ b/tests/KafkaBridgeTest/KafkaBridgeTest.csproj @@ -8,7 +8,7 @@ Copyright © MASES s.r.l. 2022 MASES s.r.l. MASES s.r.l. - 1.1.10.0 + 1.1.11.0 net461;netcoreapp3.1;net5.0;net6.0 ..\..\bin\ latest diff --git a/tests/KafkaBridgeTestAdmin/KafkaBridgeTestAdmin.csproj b/tests/KafkaBridgeTestAdmin/KafkaBridgeTestAdmin.csproj index b0fe26f1e4..0fcc372133 100644 --- a/tests/KafkaBridgeTestAdmin/KafkaBridgeTestAdmin.csproj +++ b/tests/KafkaBridgeTestAdmin/KafkaBridgeTestAdmin.csproj @@ -8,7 +8,7 @@ Copyright © MASES s.r.l. 2022 MASES s.r.l. MASES s.r.l. - 1.1.10.0 + 1.1.11.0 net461;netcoreapp3.1;net5.0;net6.0 ..\..\bin\ latest diff --git a/tests/KafkaBridgeTestStreams/KafkaBridgeTestStreams.csproj b/tests/KafkaBridgeTestStreams/KafkaBridgeTestStreams.csproj index 1e8b2aac66..52afad8683 100644 --- a/tests/KafkaBridgeTestStreams/KafkaBridgeTestStreams.csproj +++ b/tests/KafkaBridgeTestStreams/KafkaBridgeTestStreams.csproj @@ -8,7 +8,7 @@ Copyright © MASES s.r.l. 2022 MASES s.r.l. MASES s.r.l. - 1.1.10.0 + 1.1.11.0 net461;netcoreapp3.1;net5.0;net6.0 ..\..\bin\ latest From 9e679501721f200a1680d89dbf87621abfec7b5d Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Mon, 14 Mar 2022 22:05:54 +0100 Subject: [PATCH 3/8] Class fix --- .../Clients/Consumer/ConsumerRecords.cs | 21 ++++++++++++++++--- .../Clients/Consumer/KafkaConsumer.cs | 19 +++++++++++------ .../Clients/Producer/KafkaProducer.cs | 21 ++++++++++++------- .../Streams/KafkaClientSupplier.cs | 10 ++++----- .../BridgedClasses/Streams/KafkaStreams.cs | 2 +- .../Streams/StoreQueryParameters.cs | 1 - 6 files changed, 51 insertions(+), 23 deletions(-) diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Consumer/ConsumerRecords.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Consumer/ConsumerRecords.cs index cad5954602..562f1009bd 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Consumer/ConsumerRecords.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Consumer/ConsumerRecords.cs @@ -16,15 +16,30 @@ * Refer to LICENSE for more information. */ +using Java.Lang; +using Java.Util; +using MASES.KafkaBridge.Common; + namespace MASES.KafkaBridge.Clients.Consumer { public class ConsumerRecords : JCOBridge.C2JBridge.JVMBridgeBaseEnumerable, ConsumerRecord> { public override string ClassName => "org.apache.kafka.clients.consumer.ConsumerRecords"; - } - public class ConsumerRecords : ConsumerRecords - { + public List> Records(TopicPartition partition) + { + return IExecute>>("records", partition); + } + + public Iterable> Records(String topic) + { + return IExecute>>("records", topic); + } + + public Set partitions => IExecute>("partitions"); + + public Iterator> Iterator => IExecute>>("iterator"); + public int Count => IExecute("count"); } } diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Consumer/KafkaConsumer.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Consumer/KafkaConsumer.cs index e907d967e2..66c07551e7 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Consumer/KafkaConsumer.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Consumer/KafkaConsumer.cs @@ -24,7 +24,7 @@ namespace MASES.KafkaBridge.Clients.Consumer { - public class KafkaConsumer : JCOBridge.C2JBridge.JVMBridgeBase>, IConsumer + public class KafkaConsumer : JCOBridge.C2JBridge.JVMBridgeBase { public override bool IsCloseable => true; @@ -34,6 +34,18 @@ public KafkaConsumer() { } + public KafkaConsumer(params object[] args) + : base(args) + { + } + } + + public class KafkaConsumer : KafkaConsumer, IConsumer + { + public KafkaConsumer() + { + } + public KafkaConsumer(Properties props) : base(props) { @@ -257,9 +269,4 @@ public void Wakeup() IExecute("wakeup"); } } - - public class KafkaConsumer : KafkaConsumer - { - public KafkaConsumer(Properties props) : base(props) { } - } } diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Producer/KafkaProducer.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Producer/KafkaProducer.cs index 813ae6241e..6a1497903b 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Producer/KafkaProducer.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Producer/KafkaProducer.cs @@ -24,14 +24,24 @@ namespace MASES.KafkaBridge.Clients.Producer { - public class KafkaProducer : JCOBridge.C2JBridge.JVMBridgeBase>, IProducer + public class KafkaProducer : JCOBridge.C2JBridge.JVMBridgeBase { public override bool IsCloseable => true; public override string ClassName => "org.apache.kafka.clients.producer.KafkaProducer"; - public Map Metrics => IExecute>("metrics"); + public KafkaProducer() + { + } + + public KafkaProducer(params object[] args) + : base(args) + { + } + } + public class KafkaProducer : KafkaProducer, IProducer + { public KafkaProducer() { } @@ -46,6 +56,8 @@ public KafkaProducer(Properties props, Serializer keySerializer, Serializer Metrics => IExecute>("metrics"); + public void InitTransactions() { IExecute("initTransactions"); @@ -106,9 +118,4 @@ public List PartitionsFor(string topic) return IExecute>("partitionsFor", topic); } } - - public class KafkaProducer : KafkaProducer - { - public KafkaProducer(Properties props) : base(props) { } - } } diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KafkaClientSupplier.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KafkaClientSupplier.cs index 16ada561ae..61fee10288 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KafkaClientSupplier.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KafkaClientSupplier.cs @@ -85,11 +85,11 @@ public class KafkaClientSupplier : JVMBridgeListener, IKafkaClientSupplier /// The to be executed /// Set to false to disable attach of and set an own one public KafkaClientSupplier(Func, Clients.Admin.IAdmin> onGetAdmin = null, - Func, IProducer> onGetProducer = null, - Func, IConsumer> onGetConsumer = null, - Func, IConsumer> onGetRestoreConsumer = null, - Func, IConsumer> onGetGlobalConsumer = null, - bool attachEventHandler = true) + Func, IProducer> onGetProducer = null, + Func, IConsumer> onGetConsumer = null, + Func, IConsumer> onGetRestoreConsumer = null, + Func, IConsumer> onGetGlobalConsumer = null, + bool attachEventHandler = true) { if (onGetAdmin != null) GetAdminFunction = onGetAdmin; else GetAdminFunction = GetAdmin; diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KafkaStreams.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KafkaStreams.cs index 1381427853..17e817b92e 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KafkaStreams.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KafkaStreams.cs @@ -83,7 +83,7 @@ void EventHandler(object sender, CLRListenerEventArgs> d public virtual void OnChange(StateType newState, StateType oldState) { } } - public class KafkaStreams : JCOBridge.C2JBridge.JVMBridgeBase + public class KafkaStreams : JVMBridgeBase { public override bool IsCloseable => true; diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/StoreQueryParameters.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/StoreQueryParameters.cs index 80068a4f36..bfbbeb71cc 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/StoreQueryParameters.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/StoreQueryParameters.cs @@ -48,6 +48,5 @@ public StoreQueryParameters WithPartition(int partition) public int Partition => IExecute("partition"); public bool StaleStoresEnabled => IExecute("staleStoresEnabled"); - } } From 46a0ac0ba649f4282470026554fe8448ac91d99f Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Mon, 14 Mar 2022 22:06:29 +0100 Subject: [PATCH 4/8] #43: added some API with .NET style --- .../Extensions/KafkaAdminClientExtensions.cs | 546 ++++++++++++++++++ .../Extensions/KafkaConsumerExtensions.cs | 74 +++ .../Extensions/KafkaProducerExtensions.cs | 112 ++++ .../Extensions/KafkaStreamsExtensions.cs | 24 + tests/KafkaBridgeTest/Program.cs | 3 +- 5 files changed, 758 insertions(+), 1 deletion(-) create mode 100644 src/net/KafkaBridge/Extensions/KafkaAdminClientExtensions.cs create mode 100644 src/net/KafkaBridge/Extensions/KafkaConsumerExtensions.cs create mode 100644 src/net/KafkaBridge/Extensions/KafkaProducerExtensions.cs create mode 100644 src/net/KafkaBridge/Extensions/KafkaStreamsExtensions.cs diff --git a/src/net/KafkaBridge/Extensions/KafkaAdminClientExtensions.cs b/src/net/KafkaBridge/Extensions/KafkaAdminClientExtensions.cs new file mode 100644 index 0000000000..256bf906f7 --- /dev/null +++ b/src/net/KafkaBridge/Extensions/KafkaAdminClientExtensions.cs @@ -0,0 +1,546 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using Java.Util; +using MASES.KafkaBridge.Clients.Admin; +using MASES.KafkaBridge.Clients.Consumer; +using MASES.KafkaBridge.Common; +using MASES.KafkaBridge.Common.Acl; +using MASES.KafkaBridge.Common.Config; +using MASES.KafkaBridge.Common.Quota; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace MASES.KafkaBridge.Extensions +{ + public static class KafkaAdminClientExtensions + { + async static Task Execute(Func func) + { + Task task = Task.Run(() => + { + return func(); + }); + + await task; + if (task.Status == TaskStatus.Faulted && task.Exception != null) + { + throw task.Exception.Flatten().InnerException; + } + return task.Result; + } + + async static Task Execute(Func func, TIn input) + { + Task task = Task.Run(() => + { + return func(input); + }); + + await task; + if (task.Status == TaskStatus.Faulted && task.Exception != null) + { + throw task.Exception.Flatten().InnerException; + } + return task.Result; + } + + async static Task Execute(Func func, TIn1 input1, TIn2 input2) + { + Task task = Task.Run(() => + { + return func(input1, input2); + }); + + await task; + if (task.Status == TaskStatus.Faulted && task.Exception != null) + { + throw task.Exception.Flatten().InnerException; + } + return task.Result; + } + + async static Task Execute(Func func, TIn1 input1, TIn2 input2, Tin3 input3) + { + Task task = Task.Run(() => + { + return func(input1, input2, input3); + }); + + await task; + if (task.Status == TaskStatus.Faulted && task.Exception != null) + { + throw task.Exception.Flatten().InnerException; + } + return task.Result; + } + + public static async Task CreateTopicsAsync(this IAdmin admin, Collection newTopics) + { + return await Execute(admin.CreateTopics, newTopics); + } + + public static CreateTopicsResult CreateTopic(this IAdmin admin, string topicName, int numPartitions = 1, short replicationFactor = 1) + { + return admin.CreateTopics(Collections.Singleton(new NewTopic(topicName, numPartitions, replicationFactor))); + } + + public static async Task CreateTopicAsync(this IAdmin admin, string topicName, int numPartitions = 1, short replicationFactor = 1) + { + return await CreateTopicsAsync(admin, Collections.Singleton(new NewTopic(topicName, numPartitions, replicationFactor))); + } + + public static async Task CreateTopicsAsync(this IAdmin admin, Collection newTopics, CreateTopicsOptions options) + { + return await Execute(admin.CreateTopics, newTopics, options); + } + + public static async Task DeleteTopicsAsync(this IAdmin admin, Collection topics) + { + return await Execute(admin.DeleteTopics, topics); + } + + public static async Task DeleteTopicAsync(this IAdmin admin, string topicName) + { + return await DeleteTopicsAsync(admin, Collections.Singleton(topicName)); + } + + public static async Task DeleteTopicsAsync(this IAdmin admin, Collection topics, DeleteTopicsOptions options) + { + return await Execute(admin.DeleteTopics, topics, options); + } + + public static async Task DeleteTopicsAsync(this IAdmin admin, TopicCollection topics) + { + return await Execute(admin.DeleteTopics, topics); + } + + public static async Task DeleteTopicsAsync(this IAdmin admin, TopicCollection topics, DeleteTopicsOptions options) + { + return await Execute(admin.DeleteTopics, topics, options); + } + + public static async Task ListTopicsAsync(this IAdmin admin) + { + return await Execute(admin.ListTopics); + } + + public static async Task ListTopicsAsync(this IAdmin admin, ListTopicsOptions options) + { + return await Execute(admin.ListTopics, options); + } + + public static async Task DescribeTopicsAsync(this IAdmin admin, Collection topicNames) + { + return await Execute(admin.DescribeTopics, topicNames); + } + + public static async Task DescribeTopicAsync(this IAdmin admin, string topicName) + { + return await Execute(admin.DescribeTopics, Collections.Singleton(topicName)) ; + } + + public static async Task DescribeTopicsAsync(this IAdmin admin, Collection topicNames, DescribeTopicsOptions options) + { + return await Execute(admin.DescribeTopics, topicNames, options); + } + + public static async Task DescribeClusterAsync(this IAdmin admin) + { + return await Execute(admin.DescribeCluster); + } + + public static async Task DescribeClusterAsync(this IAdmin admin, DescribeClusterOptions options) + { + return await Execute(admin.DescribeCluster, options); + } + + public static async Task DescribeAclsAsync(this IAdmin admin, AclBindingFilter filter) + { + return await Execute(admin.DescribeAcls, filter); + } + + public static async Task DescribeAclsAsync(this IAdmin admin, AclBindingFilter filter, DescribeAclsOptions options) + { + return await Execute(admin.DescribeAcls, filter, options); + } + + public static async Task CreateAclsAsync(this IAdmin admin, Collection acls) + { + return await Execute(admin.CreateAcls, acls); + } + + public static async Task CreateAclsAsync(this IAdmin admin, Collection acls, CreateAclsOptions options) + { + return await Execute(admin.CreateAcls, acls, options); + } + + public static async Task DeleteAclsAsync(this IAdmin admin, Collection filters) + { + return await Execute(admin.DeleteAcls, filters); + } + + public static async Task DeleteAclsAsync(this IAdmin admin, Collection filters, DeleteAclsOptions options) + { + return await Execute(admin.DeleteAcls, filters, options); + } + + public static async Task DescribeConfigsAsync(this IAdmin admin, Collection resources) + { + return await Execute(admin.DescribeConfigs, resources); + } + + public static async Task DescribeConfigsAsync(this IAdmin admin, Collection resources, DescribeConfigsOptions options) + { + return await Execute(admin.DescribeConfigs, resources, options); + } + + public static async Task IncrementalAlterConfigsAsync(this IAdmin admin, Map> configs) + { + return await Execute(admin.IncrementalAlterConfigs, configs); + } + + public static async Task IncrementalAlterConfigsAsync(this IAdmin admin, Map> configs, AlterConfigsOptions options) + { + return await Execute(admin.IncrementalAlterConfigs, configs, options); + } + + public static async Task AlterReplicaLogDirsAsync(this IAdmin admin, Map replicaAssignment) + { + return await Execute(admin.AlterReplicaLogDirs, replicaAssignment); + } + + public static async Task AlterReplicaLogDirsAsync(this IAdmin admin, Map replicaAssignment, AlterReplicaLogDirsOptions options) + { + return await Execute(admin.AlterReplicaLogDirs, replicaAssignment, options); + } + + public static async Task DescribeLogDirsAsync(this IAdmin admin, Collection brokers) + { + return await Execute(admin.DescribeLogDirs, brokers); + } + + public static async Task DescribeLogDirsAsync(this IAdmin admin, Collection brokers, DescribeLogDirsOptions options) + { + return await Execute(admin.DescribeLogDirs, brokers, options); + } + + public static async Task DescribeReplicaLogDirsAsync(this IAdmin admin, Collection replicas) + { + return await Execute(admin.DescribeReplicaLogDirs, replicas); + } + + public static async Task DescribeReplicaLogDirsAsync(this IAdmin admin, Collection replicas, DescribeReplicaLogDirsOptions options) + { + return await Execute(admin.DescribeReplicaLogDirs, replicas, options); + } + + public static async Task CreatePartitionsAsync(this IAdmin admin, Map newPartitions) + { + return await Execute(admin.CreatePartitions, newPartitions); + } + + public static async Task CreatePartitionsAsync(this IAdmin admin, Map newPartitions, CreatePartitionsOptions options) + { + return await Execute(admin.CreatePartitions, newPartitions, options); + } + + public static async Task DeleteRecordsAsync(this IAdmin admin, Map recordsToDelete) + { + return await Execute(admin.DeleteRecords, recordsToDelete); + } + + public static async Task DeleteRecordsAsync(this IAdmin admin, Map recordsToDelete, DeleteRecordsOptions options) + { + return await Execute(admin.DeleteRecords, recordsToDelete, options); + } + + public static async Task CreateDelegationTokenAsync(this IAdmin admin) + { + return await Execute(admin.CreateDelegationToken); + } + + public static async Task CreateDelegationTokenAsync(this IAdmin admin, CreateDelegationTokenOptions options) + { + return await Execute(admin.CreateDelegationToken, options); + } + + public static async Task RenewDelegationTokenAsync(this IAdmin admin, byte[] hmac) + { + return await Execute(admin.RenewDelegationToken, hmac); + } + + public static async Task RenewDelegationTokenAsync(this IAdmin admin, byte[] hmac, RenewDelegationTokenOptions options) + { + return await Execute(admin.RenewDelegationToken, hmac, options); + } + + public static async Task ExpireDelegationTokenAsync(this IAdmin admin, byte[] hmac) + { + return await Execute(admin.ExpireDelegationToken, hmac); + } + + public static async Task ExpireDelegationTokenAsync(this IAdmin admin, byte[] hmac, ExpireDelegationTokenOptions options) + { + return await Execute(admin.ExpireDelegationToken, hmac, options); + } + + public static async Task DescribeDelegationTokenAsync(this IAdmin admin) + { + return await Execute(admin.DescribeDelegationToken); + } + + public static async Task DescribeDelegationTokenAsync(this IAdmin admin, DescribeDelegationTokenOptions options) + { + return await Execute(admin.DescribeDelegationToken, options); + } + + public static async Task DescribeConsumerGroupsAsync(this IAdmin admin, Collection groupIds, DescribeConsumerGroupsOptions options) + { + return await Execute(admin.DescribeConsumerGroups, groupIds, options); + } + + public static async Task DescribeConsumerGroupsAsync(this IAdmin admin, Collection groupIds) + { + return await Execute(admin.DescribeConsumerGroups, groupIds); + } + + public static async Task ListConsumerGroupsAsync(this IAdmin admin, ListConsumerGroupsOptions options) + { + return await Execute(admin.ListConsumerGroups, options); + } + + public static async Task ListConsumerGroupsAsync(this IAdmin admin) + { + return await Execute(admin.ListConsumerGroups); + } + + public static async Task ListConsumerGroupOffsetsAsync(this IAdmin admin, string groupId, ListConsumerGroupOffsetsOptions options) + { + return await Execute(admin.ListConsumerGroupOffsets, groupId, options); + } + + public static async Task ListConsumerGroupOffsetsAsync(this IAdmin admin, string groupId) + { + return await Execute(admin.ListConsumerGroupOffsets, groupId); + } + + public static async Task DeleteConsumerGroupsAsync(this IAdmin admin, Collection groupIds, DeleteConsumerGroupsOptions options) + { + return await Execute(admin.DeleteConsumerGroups, groupIds, options); + } + + public static async Task DeleteConsumerGroupsAsync(this IAdmin admin, Collection groupIds) + { + return await Execute(admin.DeleteConsumerGroups, groupIds); + } + + public static async Task DeleteConsumerGroupOffsetsAsync(this IAdmin admin, string groupId, Set partitions, DeleteConsumerGroupOffsetsOptions options) + { + return await Execute(admin.DeleteConsumerGroupOffsets, groupId, partitions, options); + } + + public static async Task DeleteConsumerGroupOffsetsAsync(this IAdmin admin, string groupId, Set partitions) + { + return await Execute(admin.DeleteConsumerGroupOffsets, groupId, partitions); + } + + public static async Task ElectLeadersAsync(this IAdmin admin, ElectionType electionType, Set partitions) + { + return await Execute(admin.ElectLeaders, electionType, partitions); + } + + public static async Task ElectLeadersAsync(this IAdmin admin, ElectionType electionType, Set partitions, ElectLeadersOptions options) + { + return await Execute(admin.ElectLeaders, electionType, partitions, options); + } + + public static async Task AlterPartitionReassignmentsAsync(this IAdmin admin, Map> reassignments) + { + return await Execute(admin.AlterPartitionReassignments, reassignments); + } + + public static async Task AlterPartitionReassignmentsAsync(this IAdmin admin, Map> reassignments, AlterPartitionReassignmentsOptions options) + { + return await Execute(admin.AlterPartitionReassignments, reassignments, options); + } + + public static async Task ListPartitionReassignmentsAsync(this IAdmin admin) + { + return await Execute(admin.ListPartitionReassignments); + } + + public static async Task ListPartitionReassignmentsAsync(this IAdmin admin, Set partitions) + { + return await Execute(admin.ListPartitionReassignments, partitions); + } + + public static async Task ListPartitionReassignmentsAsync(this IAdmin admin, Set partitions, ListPartitionReassignmentsOptions options) + { + return await Execute(admin.ListPartitionReassignments, partitions, options); + } + + public static async Task ListPartitionReassignmentsAsync(this IAdmin admin, ListPartitionReassignmentsOptions options) + { + return await Execute(admin.ListPartitionReassignments, options); + } + + public static async Task ListPartitionReassignmentsAsync(this IAdmin admin, Optional> partitions, ListPartitionReassignmentsOptions options) + { + return await Execute(admin.ListPartitionReassignments, partitions, options); + } + + public static async Task RemoveMembersFromConsumerGroupAsync(this IAdmin admin, string groupId, RemoveMembersFromConsumerGroupOptions options) + { + return await Execute(admin.RemoveMembersFromConsumerGroup, groupId, options); + } + + public static async Task AlterConsumerGroupOffsetsAsync(this IAdmin admin, string groupId, Map offsets) + { + return await Execute(admin.AlterConsumerGroupOffsets, groupId, offsets); + } + + public static async Task AlterConsumerGroupOffsetsAsync(this IAdmin admin, string groupId, Map offsets, AlterConsumerGroupOffsetsOptions options) + { + return await Execute(admin.AlterConsumerGroupOffsets, groupId, offsets, options); + } + + public static async Task ListOffsetsAsync(this IAdmin admin, Map topicPartitionOffsets) + { + return await Execute(admin.ListOffsets, topicPartitionOffsets); + } + + public static async Task ListOffsetsAsync(this IAdmin admin, Map topicPartitionOffsets, ListOffsetsOptions options) + { + return await Execute(admin.ListOffsets, topicPartitionOffsets, options); + } + + public static async Task DescribeClientQuotasAsync(this IAdmin admin, ClientQuotaFilter filter) + { + return await Execute(admin.DescribeClientQuotas, filter); + } + + public static async Task DescribeClientQuotasAsync(this IAdmin admin, ClientQuotaFilter filter, DescribeClientQuotasOptions options) + { + return await Execute(admin.DescribeClientQuotas, filter, options); + } + + public static async Task AlterClientQuotasAsync(this IAdmin admin, Collection entries) + { + return await Execute(admin.AlterClientQuotas, entries); + } + + public static async Task AlterClientQuotasAsync(this IAdmin admin, Collection entries, AlterClientQuotasOptions options) + { + return await Execute(admin.AlterClientQuotas, entries, options); + } + + public static async Task DescribeUserScramCredentialsAsync(this IAdmin admin) + { + return await Execute(admin.DescribeUserScramCredentials); + } + + public static async Task DescribeUserScramCredentialsAsync(this IAdmin admin, Java.Util.List users) + { + return await Execute(admin.DescribeUserScramCredentials, users); + } + + public static async Task DescribeUserScramCredentialsAsync(this IAdmin admin, Java.Util.List users, DescribeUserScramCredentialsOptions options) + { + return await Execute(admin.DescribeUserScramCredentials, users, options); + } + + public static async Task AlterUserScramCredentialsAsync(this IAdmin admin, Java.Util.List alterations) + { + return await Execute(admin.AlterUserScramCredentials, alterations); + } + + public static async Task AlterUserScramCredentialsAsync(this IAdmin admin, Java.Util.List alterations, AlterUserScramCredentialsOptions options) + { + return await Execute(admin.AlterUserScramCredentials, alterations, options); + } + + public static async Task DescribeFeaturesAsync(this IAdmin admin) + { + return await Execute(admin.DescribeFeatures); + } + + public static async Task DescribeFeaturesAsync(this IAdmin admin, DescribeFeaturesOptions options) + { + return await Execute(admin.DescribeFeatures, options); + } + + public static async Task UpdateFeaturesAsync(this IAdmin admin, Map featureUpdates, UpdateFeaturesOptions options) + { + return await Execute(admin.UpdateFeatures, featureUpdates, options); + } + + public static async Task UnregisterBrokerAsync(this IAdmin admin, int brokerId) + { + return await Execute(admin.UnregisterBroker, brokerId); + } + + public static async Task UnregisterBrokerAsync(this IAdmin admin, int brokerId, UnregisterBrokerOptions options) + { + return await Execute(admin.UnregisterBroker, brokerId, options); + } + + public static async Task DescribeProducersAsync(this IAdmin admin, Collection partitions) + { + return await Execute(admin.DescribeProducers, partitions); + } + + public static async Task DescribeProducersAsync(this IAdmin admin, Collection partitions, DescribeProducersOptions options) + { + return await Execute(admin.DescribeProducers, partitions, options); + } + + public static async Task DescribeTransactionsAsync(this IAdmin admin, Collection transactionalIds) + { + return await Execute(admin.DescribeTransactions, transactionalIds); + } + + public static async Task DescribeTransactionsAsync(this IAdmin admin, Collection transactionalIds, DescribeTransactionsOptions options) + { + return await Execute(admin.DescribeTransactions, transactionalIds, options); + } + + public static async Task AbortTransactionAsync(this IAdmin admin, AbortTransactionSpec spec) + { + return await Execute(admin.AbortTransaction, spec); + } + + public static async Task AbortTransactionAsync(this IAdmin admin, AbortTransactionSpec spec, AbortTransactionOptions options) + { + return await Execute(admin.AbortTransaction, spec, options); + } + + public static async Task ListTransactionsAsync(this IAdmin admin) + { + return await Execute(admin.ListTransactions); + } + + public static async Task ListTransactionsAsync(this IAdmin admin, ListTransactionsOptions options) + { + return await Execute(admin.ListTransactions, options); + } + } +} diff --git a/src/net/KafkaBridge/Extensions/KafkaConsumerExtensions.cs b/src/net/KafkaBridge/Extensions/KafkaConsumerExtensions.cs new file mode 100644 index 0000000000..7d6f332818 --- /dev/null +++ b/src/net/KafkaBridge/Extensions/KafkaConsumerExtensions.cs @@ -0,0 +1,74 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.KafkaBridge.Clients.Consumer; +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace MASES.KafkaBridge.Extensions +{ + public static class KafkaConsumerExtensions + { + const int internalMs = 100; + /// + /// Consumes from an instance of + /// + /// The key type + /// The value type + /// The consumer instance + /// The to use to abort waiting + /// The received + public static ConsumerRecords Consume(this IConsumer consumer, CancellationToken token) + { + token.ThrowIfCancellationRequested(); + while (true) + { + try + { + var records = consumer.Poll(TimeSpan.FromMilliseconds(internalMs)); + if (records.Count == 0) continue; + return records; + } + catch (OperationCanceledException) + { + return null; + } + } + } + /// + /// Execute the consume in async mode + /// + /// The key type + /// The value type + /// The consumer instance + /// The received + public static async Task> ConsumeAsync(this IConsumer consumer) + { + return await Task.Run(() => + { + while (true) + { + var records = consumer.Poll(TimeSpan.FromMilliseconds(internalMs)); + if (records.Count == 0) continue; + return records; + } + }); + } + } +} diff --git a/src/net/KafkaBridge/Extensions/KafkaProducerExtensions.cs b/src/net/KafkaBridge/Extensions/KafkaProducerExtensions.cs new file mode 100644 index 0000000000..211dd7417b --- /dev/null +++ b/src/net/KafkaBridge/Extensions/KafkaProducerExtensions.cs @@ -0,0 +1,112 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using Java.Util.Concurrent; +using MASES.JCOBridge.C2JBridge; +using MASES.KafkaBridge.Clients.Producer; +using System; +using System.Threading.Tasks; + +namespace MASES.KafkaBridge.Extensions +{ + public static class KafkaProducerExtensions + { + public static void Produce(this IProducer producer, string topic, K key, V value, Action action = null) + { + Produce(producer, new ProducerRecord(topic, key, value), action); + } + + public static void Produce(this IProducer producer, string topic, int partition, K key, V value, Action action = null) + { + Produce(producer, new ProducerRecord(topic, partition, key, value), action); + } + + public static void Produce(this IProducer producer, string topic, int partition, long timestamp, K key, V value, Action action = null) + { + Produce(producer, new ProducerRecord(topic, partition, timestamp, key, value), action); + } + + public static void Produce(this IProducer producer, string topic, int partition, DateTime timestamp, K key, V value, Action action = null) + { + Produce(producer, new ProducerRecord(topic, partition, timestamp, key, value), action); + } + + public static void Produce(this IProducer producer, ProducerRecord record, Action action = null) + { + Callback cb = null; + + try + { + Future result; + if (action != null) + { + cb = new Callback(action); + result = producer.Send(record, cb); + } + else + { + result = producer.Send(record); + } + result.Get(); + } + catch (ExecutionException e) + { + throw e.InnerException; + } + finally + { + cb?.Dispose(); + } + } + + public static async Task ProduceAsync(this IProducer producer, string topic, K key, V value, Action action = null) + { + await ProduceAsync(producer, new ProducerRecord(topic, key, value), action); + } + + public static async Task ProduceAsync(this IProducer producer, string topic, int partition, K key, V value, Action action = null) + { + await ProduceAsync(producer, new ProducerRecord(topic, partition, key, value), action); + } + + public static async Task ProduceAsync(this IProducer producer, string topic, int partition, long timestamp, K key, V value, Action action = null) + { + await ProduceAsync(producer, new ProducerRecord(topic, partition, timestamp, key, value), action); + } + + public static async Task ProduceAsync(this IProducer producer, string topic, int partition, DateTime timestamp, K key, V value, Action action = null) + { + await ProduceAsync(producer, new ProducerRecord(topic, partition, timestamp, key, value), action); + } + + public static async Task ProduceAsync(this IProducer producer, ProducerRecord record, Action action = null) + { + Task task = Task.Factory.StartNew(() => + { + Produce(producer, record, action); + return Task.CompletedTask; + }); + + await task; + if (task.Result.Status == TaskStatus.Faulted && task.Result.Exception != null) + { + throw task.Result.Exception.Flatten().InnerException; + } + } + } +} diff --git a/src/net/KafkaBridge/Extensions/KafkaStreamsExtensions.cs b/src/net/KafkaBridge/Extensions/KafkaStreamsExtensions.cs new file mode 100644 index 0000000000..807fa0d384 --- /dev/null +++ b/src/net/KafkaBridge/Extensions/KafkaStreamsExtensions.cs @@ -0,0 +1,24 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +namespace MASES.KafkaBridge.Extensions +{ + public static class KafkaStreamsExtensions + { + } +} diff --git a/tests/KafkaBridgeTest/Program.cs b/tests/KafkaBridgeTest/Program.cs index 675f9495ec..566fb69d2a 100644 --- a/tests/KafkaBridgeTest/Program.cs +++ b/tests/KafkaBridgeTest/Program.cs @@ -32,6 +32,7 @@ using System; using System.Text; using System.Threading; +using MASES.KafkaBridge.Extensions; namespace MASES.KafkaBridgeTest { @@ -100,7 +101,7 @@ static void CreateTopic() Properties props = AdminClientConfigBuilder.Create().WithBootstrapServers(serverToUse).ToProperties(); - using (var admin = KafkaAdminClient.Create(props)) + using (IAdmin admin = KafkaAdminClient.Create(props)) { // Create a compacted topic CreateTopicsResult result = admin.CreateTopics(coll); From 32e954812180519705d942a17a0e3e842b464261 Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Tue, 15 Mar 2022 04:18:22 +0100 Subject: [PATCH 5/8] #1: improvements of Consumer APIs --- .../consumer/ConsumerInterceptorImpl.java | 59 +++++++ .../consumer/ConsumerInterceptorImplTest.java | 18 ++ .../Clients/Consumer/ConsumerInterceptor.cs | 157 ++++++++++++++++++ .../Consumer/ConsumerRebalanceListener.cs | 11 +- 4 files changed, 241 insertions(+), 4 deletions(-) create mode 100644 src/java/kafkabridge/src/main/java/org/mases/kafkabridge/clients/consumer/ConsumerInterceptorImpl.java create mode 100644 src/java/kafkabridge/src/test/java/org/mases/kafkabridge/clients/consumer/ConsumerInterceptorImplTest.java create mode 100644 src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Consumer/ConsumerInterceptor.cs diff --git a/src/java/kafkabridge/src/main/java/org/mases/kafkabridge/clients/consumer/ConsumerInterceptorImpl.java b/src/java/kafkabridge/src/main/java/org/mases/kafkabridge/clients/consumer/ConsumerInterceptorImpl.java new file mode 100644 index 0000000000..c42ee2d848 --- /dev/null +++ b/src/java/kafkabridge/src/main/java/org/mases/kafkabridge/clients/consumer/ConsumerInterceptorImpl.java @@ -0,0 +1,59 @@ +/* + * MIT License + * + * Copyright (c) 2022 MASES s.r.l. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.mases.kafkabridge.clients.consumer; + +import org.apache.kafka.clients.consumer.ConsumerInterceptor; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.mases.jcobridge.*; + +import java.util.Map; + +public final class ConsumerInterceptorImpl extends JCListener implements ConsumerInterceptor { + public ConsumerInterceptorImpl(String key) throws JCNativeException { + super(key); + } + + @Override + public void configure(Map configs) { + raiseEvent("configure", configs); + } + + @Override + public ConsumerRecords onConsume(ConsumerRecords records) { + raiseEvent("onConsume", records); + Object retVal = getReturnData(); + return (ConsumerRecords) retVal; + } + + @Override + public void onCommit(Map offsets) { + raiseEvent("onCommit", offsets); + } + + @Override + public void close() { + raiseEvent("close"); + } +} diff --git a/src/java/kafkabridge/src/test/java/org/mases/kafkabridge/clients/consumer/ConsumerInterceptorImplTest.java b/src/java/kafkabridge/src/test/java/org/mases/kafkabridge/clients/consumer/ConsumerInterceptorImplTest.java new file mode 100644 index 0000000000..cd7da0ea35 --- /dev/null +++ b/src/java/kafkabridge/src/test/java/org/mases/kafkabridge/clients/consumer/ConsumerInterceptorImplTest.java @@ -0,0 +1,18 @@ +package org.mases.kafkabridge.clients.consumer; + +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +/** + * Unit test for simple App. + */ +public class ConsumerInterceptorImplTest { + /** + * Rigorous Test :-) + */ + @Test + public void shouldAnswerWithTrue() { + assertTrue(true); + } +} diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Consumer/ConsumerInterceptor.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Consumer/ConsumerInterceptor.cs new file mode 100644 index 0000000000..92bb07c8d9 --- /dev/null +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Consumer/ConsumerInterceptor.cs @@ -0,0 +1,157 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.JCOBridge.C2JBridge; +using MASES.KafkaBridge.Common; +using Java.Util; +using System; + +namespace MASES.KafkaBridge.Clients.Consumer +{ + /// + /// Listener for Kafka ConsumerInterceptor. Extends + /// + public interface IConsumerInterceptor : IJVMBridgeBase + { + /// + /// Configure this class with the given key-value pairs + /// + /// The configuration + void Configure(Map configs); + /// + /// This is called just before the records are returned by + /// + /// records records to be consumed by the client or records returned by the previous interceptors in the list + /// records that are either modified by the interceptor or same as records passed to this method + ConsumerRecords OnConsume(ConsumerRecords records); + /// + /// This is called when offsets get committed. + /// + /// A of offsets by partition with associated metadata + void OnCommit(Map offsets); + /// + /// This is called when interceptor is closed + /// + void Close(); + } + + /// + /// Listener for Kafka ConsumerRebalanceListener. Extends , implements + /// + /// Remember to Dispose the object otherwise there is a resource leak, the object contains a reference to the the corresponding JVM object + public class ConsumerInterceptor : JVMBridgeListener, IConsumerInterceptor + { + /// + public sealed override string ClassName => "org.mases.kafkabridge.clients.consumer.ConsumerInterceptorImpl"; + + readonly Action> configureFunction = null; + readonly Func, ConsumerRecords> onConsumeFunction = null; + readonly Action> onCommitFunction = null; + readonly Action closeFunction = null; + /// + /// The to be executed on Configure + /// + public virtual Action> OnConfigure { get { return configureFunction; } } + /// + /// The to be executed on OnConsume + /// + public virtual Func, ConsumerRecords> OnOnConsume { get { return onConsumeFunction; } } + /// + /// The to be executed on OnCommit + /// + public virtual Action> OnOnCommit { get { return onCommitFunction; } } + /// + /// The to be executed on Close + /// + public virtual Action OnClose { get { return closeFunction; } } + /// + /// Initialize a new instance of + /// + /// The to be executed on Configure + /// The to be executed on OnConsume + /// The to be executed on OnCommit + /// The to be executed on Close + /// Set to false to disable attach of and set an own one + public ConsumerInterceptor(Action> configure = null, + Func, ConsumerRecords> onConsume = null, + Action> onCommit = null, + Action onClose = null, + bool attachEventHandler = true) + { + if (configure != null) configureFunction = configure; + else configureFunction = Configure; + if (onConsume != null) onConsumeFunction = onConsume; + else onConsumeFunction = OnConsume; + if (onCommit != null) onCommitFunction = onCommit; + else onCommitFunction = OnCommit; + if (onClose != null) closeFunction = onClose; + else closeFunction = Close; + + if (attachEventHandler) + { + AddEventHandler("configure", new EventHandler>>>(EventHandlerConfigure)); + AddEventHandler("onConsume", new EventHandler>>>(EventHandlerOnConsume)); + AddEventHandler("onCommit", new EventHandler>>>(EventHandlerOnCommit)); + AddEventHandler("close", new EventHandler>(EventHandlerClose)); + } + } + + void EventHandlerConfigure(object sender, CLRListenerEventArgs>> data) + { + OnConfigure(data.EventData.TypedEventData); + } + + void EventHandlerOnConsume(object sender, CLRListenerEventArgs>> data) + { + var result = OnOnConsume(data.EventData.TypedEventData); + data.SetReturnValue(result); + } + + void EventHandlerOnCommit(object sender, CLRListenerEventArgs>> data) + { + OnOnCommit(data.EventData.TypedEventData); + } + + void EventHandlerClose(object sender, CLRListenerEventArgs data) + { + OnClose(); + } + + /// + public virtual void Configure(Map configs) + { + + } + + /// + public virtual ConsumerRecords OnConsume(ConsumerRecords records) + { + return default; + } + + /// + public virtual void OnCommit(Map offsets) + { + } + + /// + public virtual void Close() + { + } + } +} diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Consumer/ConsumerRebalanceListener.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Consumer/ConsumerRebalanceListener.cs index bab75871f0..4b7a4f73a0 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Consumer/ConsumerRebalanceListener.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Consumer/ConsumerRebalanceListener.cs @@ -64,15 +64,18 @@ public class ConsumerRebalanceListener : JVMBridgeListener, IConsumerRebalanceLi /// /// The to be executed on revoked partitions /// The to be executed on assigned partitions - public ConsumerRebalanceListener(Action> revoked = null, Action> assigned = null) + /// Set to false to disable attach of and set an own one + public ConsumerRebalanceListener(Action> revoked = null, Action> assigned = null, bool attachEventHandler = true) { if (revoked != null) revokedFunction = revoked; else revokedFunction = OnPartitionsRevoked; if (assigned != null) assignedFunction = assigned; else assignedFunction = OnPartitionsAssigned; - - AddEventHandler("onPartitionsRevoked", new EventHandler>>>(EventHandlerRevoked)); - AddEventHandler("onPartitionsAssigned", new EventHandler>>>(EventHandlerAssigned)); + if (attachEventHandler) + { + AddEventHandler("onPartitionsRevoked", new EventHandler>>>(EventHandlerRevoked)); + AddEventHandler("onPartitionsAssigned", new EventHandler>>>(EventHandlerAssigned)); + } } void EventHandlerRevoked(object sender, CLRListenerEventArgs>> data) From 3b8e15a0455a28d626c3bace72a9e2e70ba16bc9 Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Tue, 15 Mar 2022 15:27:56 +0100 Subject: [PATCH 6/8] https://github.com/masesgroup/KafkaBridge/issues/33#issuecomment-1068055414 --- src/net/KafkaBridge/KafkaBridge.csproj | 2 +- src/net/KafkaBridgeCLI/KafkaBridgeCLI.csproj | 2 +- src/net/KafkaBridgeCLI/KafkaBridgeCLI.nuspec | 2 +- src/net/templates/templatepack.csproj | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/net/KafkaBridge/KafkaBridge.csproj b/src/net/KafkaBridge/KafkaBridge.csproj index c2d585c329..93e3dc2feb 100644 --- a/src/net/KafkaBridge/KafkaBridge.csproj +++ b/src/net/KafkaBridge/KafkaBridge.csproj @@ -15,7 +15,7 @@ net461;netcoreapp3.1;net5.0;net6.0 ..\..\..\bin\ true - false + true https://github.com/masesgroup/KafkaBridge/ https://github.com/masesgroup/KafkaBridge https://github.com/masesgroup/KafkaBridge/releases diff --git a/src/net/KafkaBridgeCLI/KafkaBridgeCLI.csproj b/src/net/KafkaBridgeCLI/KafkaBridgeCLI.csproj index c19aa4daa4..fa346e8292 100644 --- a/src/net/KafkaBridgeCLI/KafkaBridgeCLI.csproj +++ b/src/net/KafkaBridgeCLI/KafkaBridgeCLI.csproj @@ -16,7 +16,7 @@ net461;netcoreapp3.1;net5.0;net6.0 ..\..\..\bin\ - false + true https://github.com/masesgroup/KafkaBridge/ https://github.com/masesgroup/KafkaBridge https://github.com/masesgroup/KafkaBridge/releases diff --git a/src/net/KafkaBridgeCLI/KafkaBridgeCLI.nuspec b/src/net/KafkaBridgeCLI/KafkaBridgeCLI.nuspec index 7e68af14bf..aff4c57bce 100644 --- a/src/net/KafkaBridgeCLI/KafkaBridgeCLI.nuspec +++ b/src/net/KafkaBridgeCLI/KafkaBridgeCLI.nuspec @@ -6,7 +6,7 @@ KafkaBridgeCLI - CLI interface of KafkaBridge MASES s.r.l. MASES s.r.l. - false + true https://github.com/masesgroup/KafkaBridge/ KafkaBridgeCLI - CLI interface of KafkaBridge diff --git a/src/net/templates/templatepack.csproj b/src/net/templates/templatepack.csproj index d32a6f996a..a247dc808c 100644 --- a/src/net/templates/templatepack.csproj +++ b/src/net/templates/templatepack.csproj @@ -13,7 +13,7 @@ net6.0;net5.0;netcoreapp3.1;net461 ..\..\..\bin\ true - false + true https://github.com/masesgroup/KafkaBridge/ https://github.com/masesgroup/KafkaBridge https://github.com/masesgroup/KafkaBridge/releases From 27e0ae4f372f8201994f58e51378d570c5a01fa0 Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Tue, 15 Mar 2022 16:34:57 +0100 Subject: [PATCH 7/8] #43: refinement --- .../Common/Serialization/Serde.cs | 9 +++- .../Extensions/KafkaAdminClientExtensions.cs | 44 +++++++++++++++---- tests/KafkaBridgeTest/Program.cs | 3 ++ 3 files changed, 46 insertions(+), 10 deletions(-) diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Common/Serialization/Serde.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Common/Serialization/Serde.cs index 4ea42cd4fe..1357328e6a 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Common/Serialization/Serde.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Common/Serialization/Serde.cs @@ -43,12 +43,19 @@ public interface ISerde : ISerde Deserializer Deserializer { get; } } - public class Serde : Serde, ISerde + public class Serde : JVMBridgeBase>, ISerde { public override string ClassName => "org.apache.kafka.common.serialization.Serde"; + public static implicit operator Serde(Serde serde) { return Wraps(serde.Instance); } + public Serializer Serializer => IExecute>("serializer"); public Deserializer Deserializer => IExecute>("deserializer"); + + public void Configure(Map configs, bool isKey) + { + IExecute("configure", configs, isKey); + } } } diff --git a/src/net/KafkaBridge/Extensions/KafkaAdminClientExtensions.cs b/src/net/KafkaBridge/Extensions/KafkaAdminClientExtensions.cs index 256bf906f7..cdd18fad93 100644 --- a/src/net/KafkaBridge/Extensions/KafkaAdminClientExtensions.cs +++ b/src/net/KafkaBridge/Extensions/KafkaAdminClientExtensions.cs @@ -17,6 +17,7 @@ */ using Java.Util; +using Java.Util.Concurrent; using MASES.KafkaBridge.Clients.Admin; using MASES.KafkaBridge.Clients.Consumer; using MASES.KafkaBridge.Common; @@ -24,9 +25,6 @@ using MASES.KafkaBridge.Common.Config; using MASES.KafkaBridge.Common.Quota; using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; using System.Threading.Tasks; namespace MASES.KafkaBridge.Extensions @@ -43,7 +41,12 @@ async static Task Execute(Func func) await task; if (task.Status == TaskStatus.Faulted && task.Exception != null) { - throw task.Exception.Flatten().InnerException; + var innerException = task.Exception.Flatten().InnerException; + if (innerException is ExecutionException exException) + { + throw exException.InnerException; + } + else throw innerException; } return task.Result; } @@ -58,7 +61,12 @@ async static Task Execute(Func func, TIn in await task; if (task.Status == TaskStatus.Faulted && task.Exception != null) { - throw task.Exception.Flatten().InnerException; + var innerException = task.Exception.Flatten().InnerException; + if (innerException is ExecutionException exException) + { + throw exException.InnerException; + } + else throw innerException; } return task.Result; } @@ -73,7 +81,12 @@ async static Task Execute(Func Execute(Func CreateTopicsAsync(this IAdmin admin return await Execute(admin.CreateTopics, newTopics); } - public static CreateTopicsResult CreateTopic(this IAdmin admin, string topicName, int numPartitions = 1, short replicationFactor = 1) + public static void CreateTopic(this IAdmin admin, string topicName, int numPartitions = 1, short replicationFactor = 1) { - return admin.CreateTopics(Collections.Singleton(new NewTopic(topicName, numPartitions, replicationFactor))); + try + { + var res = admin.CreateTopics(Collections.Singleton(new NewTopic(topicName, numPartitions, replicationFactor))); + res.All.Get(); + } + catch (ExecutionException ex) + { + throw ex.InnerException; + } } public static async Task CreateTopicAsync(this IAdmin admin, string topicName, int numPartitions = 1, short replicationFactor = 1) diff --git a/tests/KafkaBridgeTest/Program.cs b/tests/KafkaBridgeTest/Program.cs index 566fb69d2a..148ad577b6 100644 --- a/tests/KafkaBridgeTest/Program.cs +++ b/tests/KafkaBridgeTest/Program.cs @@ -103,6 +103,7 @@ static void CreateTopic() using (IAdmin admin = KafkaAdminClient.Create(props)) { + /******* standard // Create a compacted topic CreateTopicsResult result = admin.CreateTopics(coll); @@ -112,6 +113,8 @@ static void CreateTopic() // Call get() to block until the topic creation is complete or has failed // if creation failed the ExecutionException wraps the underlying cause. future.Get(); + ********/ + admin.CreateTopic(topicName, partitions, replicationFactor); } } catch (Java.Util.Concurrent.ExecutionException ex) From 1b43e197ee64bd042b54a2353865509a6837a12e Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Tue, 15 Mar 2022 16:55:06 +0100 Subject: [PATCH 8/8] Upgrade templates --- src/net/KafkaBridge/KafkaBridge.csproj | 2 +- .../kafkabridgeConsumerApp/kafkabridgeConsumerApp.csproj | 2 +- .../kafkabridgePipeStreamApp/kafkabridgePipeStreamApp.csproj | 2 +- .../kafkabridgeProducerApp/kafkabridgeProducerApp.csproj | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/net/KafkaBridge/KafkaBridge.csproj b/src/net/KafkaBridge/KafkaBridge.csproj index 93e3dc2feb..8307cc1cab 100644 --- a/src/net/KafkaBridge/KafkaBridge.csproj +++ b/src/net/KafkaBridge/KafkaBridge.csproj @@ -58,7 +58,7 @@ All None - + diff --git a/src/net/templates/templates/kafkabridgeConsumerApp/kafkabridgeConsumerApp.csproj b/src/net/templates/templates/kafkabridgeConsumerApp/kafkabridgeConsumerApp.csproj index 8bc4220306..b4e3d8173f 100644 --- a/src/net/templates/templates/kafkabridgeConsumerApp/kafkabridgeConsumerApp.csproj +++ b/src/net/templates/templates/kafkabridgeConsumerApp/kafkabridgeConsumerApp.csproj @@ -12,6 +12,6 @@ - + diff --git a/src/net/templates/templates/kafkabridgePipeStreamApp/kafkabridgePipeStreamApp.csproj b/src/net/templates/templates/kafkabridgePipeStreamApp/kafkabridgePipeStreamApp.csproj index 8bc4220306..b4e3d8173f 100644 --- a/src/net/templates/templates/kafkabridgePipeStreamApp/kafkabridgePipeStreamApp.csproj +++ b/src/net/templates/templates/kafkabridgePipeStreamApp/kafkabridgePipeStreamApp.csproj @@ -12,6 +12,6 @@ - + diff --git a/src/net/templates/templates/kafkabridgeProducerApp/kafkabridgeProducerApp.csproj b/src/net/templates/templates/kafkabridgeProducerApp/kafkabridgeProducerApp.csproj index 8bc4220306..b4e3d8173f 100644 --- a/src/net/templates/templates/kafkabridgeProducerApp/kafkabridgeProducerApp.csproj +++ b/src/net/templates/templates/kafkabridgeProducerApp/kafkabridgeProducerApp.csproj @@ -12,6 +12,6 @@ - +