From 5c03268f36a442f355c58978a96f9657ee667cfc Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Thu, 31 Aug 2023 12:16:48 +0300 Subject: [PATCH 1/2] Removed transport api from Cluster interface --- .../java/io/scalecube/cluster/Cluster.java | 42 ---------- .../io/scalecube/cluster/ClusterImpl.java | 20 ----- .../examples/ClusterMetadataExample.java | 14 ---- .../scalecube/examples/MessagingExample.java | 84 ------------------- .../examples/WebsocketMessagingExample.java | 84 ------------------- 5 files changed, 244 deletions(-) delete mode 100644 examples/src/main/java/io/scalecube/examples/MessagingExample.java delete mode 100644 examples/src/main/java/io/scalecube/examples/WebsocketMessagingExample.java diff --git a/cluster-api/src/main/java/io/scalecube/cluster/Cluster.java b/cluster-api/src/main/java/io/scalecube/cluster/Cluster.java index b0f15582..c0df446b 100644 --- a/cluster-api/src/main/java/io/scalecube/cluster/Cluster.java +++ b/cluster-api/src/main/java/io/scalecube/cluster/Cluster.java @@ -16,48 +16,6 @@ public interface Cluster { */ Address address(); - /** - * Send a msg from this member (src) to target member (specified in parameters). - * - * @param member target member - * @param message msg - * @return promise telling success or failure - */ - Mono send(Member member, Message message); - - /** - * Send a msg from this member (src) to target member (specified in parameters). - * - * @param address target address - * @param message msg - * @return promise telling success or failure - */ - Mono send(Address address, Message message); - - /** - * Sends message to the given address. It will issue connect in case if no transport channel by - * given transport {@code address} exists already. Send is an async operation and expecting a - * response by a provided correlationId and sender address of the caller. - * - * @param address address where message will be sent - * @param request to send message must contain correlctionId and sender to handle reply. - * @return promise which will be completed with result of sending (message or exception) - * @throws IllegalArgumentException if {@code message} or {@code address} is null - */ - Mono requestResponse(Address address, Message request); - - /** - * Sends message to the given address. It will issue connect in case if no transport channel by - * given transport {@code address} exists already. Send is an async operation and expecting a - * response by a provided correlationId and sender address of the caller. - * - * @param member where message will be sent - * @param request to send message must contain correlctionId and sender to handle reply. - * @return promise which will be completed with result of sending (message or exception) - * @throws IllegalArgumentException if {@code message} or {@code address} is null - */ - Mono requestResponse(Member member, Message request); - /** * Spreads given message between cluster members using gossiping protocol. * diff --git a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java index 8419f358..4dee1227 100644 --- a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java @@ -389,26 +389,6 @@ public Address address() { return member().address(); } - @Override - public Mono send(Member member, Message message) { - return send(member.address(), message); - } - - @Override - public Mono send(Address address, Message message) { - return transport.send(address, message); - } - - @Override - public Mono requestResponse(Address address, Message request) { - return transport.requestResponse(address, request); - } - - @Override - public Mono requestResponse(Member member, Message request) { - return transport.requestResponse(member.address(), request); - } - @Override public Mono spreadGossip(Message message) { return gossip.spread(message); diff --git a/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java b/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java index 95824c93..40fd88e9 100644 --- a/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java +++ b/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java @@ -8,7 +8,6 @@ import io.scalecube.transport.netty.tcp.TcpTransportFactory; import java.util.Collections; import java.util.Optional; -import java.util.concurrent.TimeUnit; /** * Using Cluster metadata: metadata is set of custom parameters that may be used by application @@ -49,18 +48,5 @@ public void onMessage(Message message) { System.err.println("### joeMemberOptional: " + joeMemberOptional); System.err.println("### joeMetadata: " + alice.metadata(joeMemberOptional.get())); - - // Send hello to Joe - joeMemberOptional.ifPresent( - member -> - alice - .send(member, Message.withData("Hello Joe").build()) - .subscribe( - null, - ex -> { - // no-op - })); - - TimeUnit.SECONDS.sleep(3); } } diff --git a/examples/src/main/java/io/scalecube/examples/MessagingExample.java b/examples/src/main/java/io/scalecube/examples/MessagingExample.java deleted file mode 100644 index b747b229..00000000 --- a/examples/src/main/java/io/scalecube/examples/MessagingExample.java +++ /dev/null @@ -1,84 +0,0 @@ -package io.scalecube.examples; - -import io.scalecube.cluster.Cluster; -import io.scalecube.cluster.ClusterImpl; -import io.scalecube.cluster.ClusterMessageHandler; -import io.scalecube.cluster.transport.api.Message; -import io.scalecube.transport.netty.tcp.TcpTransportFactory; -import reactor.core.publisher.Flux; - -/** - * Basic example for member transport between cluster members to run the example Start ClusterNodeA - * and cluster ClusterNodeB A listen on transport messages B send message to member A. - * - * @author ronen hamias, Anton Kharenko - */ -public class MessagingExample { - - /** Main method. */ - public static void main(String[] args) throws Exception { - // Start cluster node Alice to listen and respond for incoming greeting messages - Cluster alice = - new ClusterImpl() - .transportFactory(TcpTransportFactory::new) - .handler( - cluster -> { - return new ClusterMessageHandler() { - @Override - public void onMessage(Message msg) { - System.out.println("Alice received: " + msg.data()); - cluster - .send(msg.sender(), Message.fromData("Greetings from Alice")) - .subscribe(null, Throwable::printStackTrace); - } - }; - }) - .startAwait(); - - // Join cluster node Bob to cluster with Alice, listen and respond for incoming greeting - // messages - Cluster bob = - new ClusterImpl() - .membership(opts -> opts.seedMembers(alice.address())) - .transportFactory(TcpTransportFactory::new) - .handler( - cluster -> { - return new ClusterMessageHandler() { - @Override - public void onMessage(Message msg) { - System.out.println("Bob received: " + msg.data()); - cluster - .send(msg.sender(), Message.fromData("Greetings from Bob")) - .subscribe(null, Throwable::printStackTrace); - } - }; - }) - .startAwait(); - - // Join cluster node Carol to cluster with Alice and Bob - Cluster carol = - new ClusterImpl() - .membership(opts -> opts.seedMembers(alice.address(), bob.address())) - .transportFactory(TcpTransportFactory::new) - .handler( - cluster -> { - return new ClusterMessageHandler() { - @Override - public void onMessage(Message msg) { - System.out.println("Carol received: " + msg.data()); - } - }; - }) - .startAwait(); - - // Send from Carol greeting message to all other cluster members (which is Alice and Bob) - Message greetingMsg = Message.fromData("Greetings from Carol"); - - Flux.fromIterable(carol.otherMembers()) - .flatMap(member -> carol.send(member, greetingMsg)) - .subscribe(null, Throwable::printStackTrace); - - // Avoid exit main thread immediately ]:-> - Thread.sleep(1000); - } -} diff --git a/examples/src/main/java/io/scalecube/examples/WebsocketMessagingExample.java b/examples/src/main/java/io/scalecube/examples/WebsocketMessagingExample.java deleted file mode 100644 index 7049e1a1..00000000 --- a/examples/src/main/java/io/scalecube/examples/WebsocketMessagingExample.java +++ /dev/null @@ -1,84 +0,0 @@ -package io.scalecube.examples; - -import io.scalecube.cluster.Cluster; -import io.scalecube.cluster.ClusterImpl; -import io.scalecube.cluster.ClusterMessageHandler; -import io.scalecube.cluster.transport.api.Message; -import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; -import reactor.core.publisher.Flux; - -/** - * Exactly the same example as {@link MessagingExample} but cluster transport is websocket. - * - * @see io.scalecube.transport.netty.websocket.WebsocketTransportFactory - * @author arvy - */ -public class WebsocketMessagingExample { - - /** Main method. */ - public static void main(String[] args) throws Exception { - // Start cluster node Alice to listen and respond for incoming greeting messages - Cluster alice = - new ClusterImpl() - .transportFactory(WebsocketTransportFactory::new) - .handler( - cluster -> { - return new ClusterMessageHandler() { - @Override - public void onMessage(Message msg) { - System.out.println("Alice received: " + msg.data()); - cluster - .send(msg.sender(), Message.fromData("Greetings from Alice")) - .subscribe(null, Throwable::printStackTrace); - } - }; - }) - .startAwait(); - - // Join cluster node Bob to cluster with Alice, listen and respond for incoming greeting - // messages - Cluster bob = - new ClusterImpl() - .transportFactory(WebsocketTransportFactory::new) - .membership(opts -> opts.seedMembers(alice.address())) - .handler( - cluster -> { - return new ClusterMessageHandler() { - @Override - public void onMessage(Message msg) { - System.out.println("Bob received: " + msg.data()); - cluster - .send(msg.sender(), Message.fromData("Greetings from Bob")) - .subscribe(null, Throwable::printStackTrace); - } - }; - }) - .startAwait(); - - // Join cluster node Carol to cluster with Alice and Bob - Cluster carol = - new ClusterImpl() - .transportFactory(WebsocketTransportFactory::new) - .membership(opts -> opts.seedMembers(alice.address(), bob.address())) - .handler( - cluster -> { - return new ClusterMessageHandler() { - @Override - public void onMessage(Message msg) { - System.out.println("Carol received: " + msg.data()); - } - }; - }) - .startAwait(); - - // Send from Carol greeting message to all other cluster members (which is Alice and Bob) - Message greetingMsg = Message.fromData("Greetings from Carol"); - - Flux.fromIterable(carol.otherMembers()) - .flatMap(member -> carol.send(member, greetingMsg)) - .subscribe(null, Throwable::printStackTrace); - - // Avoid exit main thread immediately ]:-> - Thread.sleep(1000); - } -} From d4fc4589f1f14922150aba8033f2f2413778eb6a Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Thu, 31 Aug 2023 12:17:55 +0300 Subject: [PATCH 2/2] Enabled logging :set back old versions for log4j and slf4j --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index ee828b01..e21c8b82 100644 --- a/pom.xml +++ b/pom.xml @@ -35,8 +35,8 @@ 1.0.24 - 2.0.7 - 2.20.0 + 1.7.36 + 2.17.2 2020.0.32 2.15.1