From c1cea572c6bb6b2a961c1087a6ea3f13ff0fbad9 Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Wed, 14 Apr 2021 15:38:25 +0300 Subject: [PATCH] Fixing ci2 (#360) * Enhanced error messages * Made `transportFactory` as non-autocomputable * Cleaned all reactor deprecations * Updated `reactor` (to `2020.0.6`) * Fixed CI (@segabriel ) --- .github/workflows/branch-ci.yml | 4 +- .github/workflows/pre-release-ci.yml | 1 + .github/workflows/release-ci.yml | 4 +- cluster-api/pom.xml | 4 +- cluster/pom.xml | 4 +- .../io/scalecube/cluster/ClusterImpl.java | 103 ++++++++----- .../fdetector/FailureDetectorImpl.java | 57 ++++--- .../cluster/gossip/GossipProtocolImpl.java | 50 +++--- .../membership/MembershipProtocolImpl.java | 128 ++++++++++------ .../cluster/metadata/MetadataStoreImpl.java | 22 ++- .../java/io/scalecube/cluster/BaseTest.java | 6 +- .../cluster/ClusterNamespacesTest.java | 4 +- .../io/scalecube/cluster/ClusterTest.java | 64 ++++++-- .../cluster/gossip/GossipProtocolTest.java | 3 +- .../membership/MembershipProtocolTest.java | 68 ++++---- codec-parent/pom.xml | 4 +- .../examples/ClusterJoinExamples.java | 19 ++- .../ClusterJoinNamespacesExamples.java | 8 + .../examples/ClusterMetadataExample.java | 4 +- .../io/scalecube/examples/GossipExample.java | 10 +- .../examples/MembershipEventsExample.java | 4 + .../scalecube/examples/MessagingExample.java | 4 + transport-parent/pom.xml | 4 +- .../transport/api/TransportFactory.java | 4 - .../transport/netty/TransportImpl.java | 145 +++++++++++------- .../transport/netty/tcp/TcpReceiver.java | 20 +-- .../transport/netty/tcp/TcpSender.java | 7 +- .../netty/websocket/WebsocketReceiver.java | 19 +-- .../netty/websocket/WebsocketSender.java | 4 +- .../transport/netty/tcp/TcpTransportTest.java | 10 +- .../websocket/WebsocketTransportTest.java | 10 +- .../src/test/resources/log4j2-test.xml | 2 +- 32 files changed, 504 insertions(+), 296 deletions(-) diff --git a/.github/workflows/branch-ci.yml b/.github/workflows/branch-ci.yml index b6399688..603791e5 100644 --- a/.github/workflows/branch-ci.yml +++ b/.github/workflows/branch-ci.yml @@ -33,4 +33,6 @@ jobs: env: GITHUB_TOKEN: ${{ secrets.ORGANIZATION_TOKEN }} - name: Maven Verify - run: mvn verify -B + run: | + sudo echo "127.0.0.1 $(eval hostname)" | sudo tee -a /etc/hosts + mvn verify -B diff --git a/.github/workflows/pre-release-ci.yml b/.github/workflows/pre-release-ci.yml index a7f35887..4948f5bc 100644 --- a/.github/workflows/pre-release-ci.yml +++ b/.github/workflows/pre-release-ci.yml @@ -25,6 +25,7 @@ jobs: server-password: GITHUB_TOKEN - name: Deploy pre-release version to GitHub Packages run: | + sudo echo "127.0.0.1 $(eval hostname)" | sudo tee -a /etc/hosts pre_release_version=${{ github.event.release.tag_name }} echo Pre-release version $pre_release_version mvn versions:set -DnewVersion=$pre_release_version -DgenerateBackupPoms=false diff --git a/.github/workflows/release-ci.yml b/.github/workflows/release-ci.yml index f4c7b083..9cc975bc 100644 --- a/.github/workflows/release-ci.yml +++ b/.github/workflows/release-ci.yml @@ -31,7 +31,9 @@ jobs: env: GITHUB_TOKEN: ${{ secrets.ORGANIZATION_TOKEN }} - name: Maven Verify - run: mvn verify -B + run: | + sudo echo "127.0.0.1 $(eval hostname)" | sudo tee -a /etc/hosts + mvn verify -B - name: Configure git run: | git config --global user.email "${GITHUB_ACTOR}@users.noreply.github.com" diff --git a/cluster-api/pom.xml b/cluster-api/pom.xml index 139d5bf8..874c7001 100644 --- a/cluster-api/pom.xml +++ b/cluster-api/pom.xml @@ -1,5 +1,7 @@ - + 4.0.0 diff --git a/cluster/pom.xml b/cluster/pom.xml index 01a94fd0..06c159cf 100644 --- a/cluster/pom.xml +++ b/cluster/pom.xml @@ -1,5 +1,7 @@ - + 4.0.0 diff --git a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java index 47ea78da..49b3f84c 100644 --- a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java @@ -1,5 +1,7 @@ package io.scalecube.cluster; +import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED; + import io.scalecube.cluster.fdetector.FailureDetectorConfig; import io.scalecube.cluster.fdetector.FailureDetectorImpl; import io.scalecube.cluster.gossip.GossipConfig; @@ -43,11 +45,12 @@ import reactor.core.Disposable; import reactor.core.Disposables; import reactor.core.Exceptions; -import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.SignalType; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.EmitFailureHandler; +import reactor.core.publisher.Sinks.EmitResult; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; @@ -79,17 +82,17 @@ public final class ClusterImpl implements Cluster { cluster -> new ClusterMessageHandler() {}; // Subject - private final DirectProcessor membershipEvents = DirectProcessor.create(); - private final FluxSink membershipSink = membershipEvents.sink(); + private final Sinks.Many membershipSink = + Sinks.many().multicast().directBestEffort(); // Disposables private final Disposable.Composite actionsDisposables = Disposables.composite(); // Lifecycle - private final MonoProcessor start = MonoProcessor.create(); - private final MonoProcessor onStart = MonoProcessor.create(); - private final MonoProcessor shutdown = MonoProcessor.create(); - private final MonoProcessor onShutdown = MonoProcessor.create(); + private final Sinks.One start = Sinks.one(); + private final Sinks.One onStart = Sinks.one(); + private final Sinks.One shutdown = Sinks.one(); + private final Sinks.One onShutdown = Sinks.one(); // Cluster components private Transport transport; @@ -119,14 +122,16 @@ private ClusterImpl(ClusterImpl that) { private void initLifecycle() { start + .asMono() .then(doStart()) - .doOnSuccess(avoid -> onStart.onComplete()) - .doOnError(onStart::onError) + .doOnSuccess(avoid -> onStart.emitEmpty(RetryEmitFailureHandler.INSTANCE)) + .doOnError(th -> onStart.emitError(th, RetryEmitFailureHandler.INSTANCE)) .subscribe(null, th -> LOGGER.error("[{}][doStart] Exception occurred:", localMember, th)); shutdown + .asMono() .then(doShutdown()) - .doFinally(s -> onShutdown.onComplete()) + .doFinally(s -> onShutdown.emitEmpty(RetryEmitFailureHandler.INSTANCE)) .subscribe( null, th -> @@ -232,8 +237,8 @@ public ClusterImpl handler(Function handler) { public Mono start() { return Mono.defer( () -> { - start.onComplete(); - return onStart.thenReturn(this); + start.emitEmpty(RetryEmitFailureHandler.INSTANCE); + return onStart.asMono().thenReturn(this); }); } @@ -248,9 +253,9 @@ private Mono doStart() { private Mono doStart0() { return TransportImpl.bind(config.transportConfig()) .flatMap( - transport1 -> { - localMember = createLocalMember(transport1.address()); - transport = new SenderAwareTransport(transport1, localMember.address()); + boundTransport -> { + localMember = createLocalMember(boundTransport.address()); + transport = new SenderAwareTransport(boundTransport, localMember.address()); cidGenerator = new CorrelationIdGenerator(localMember.id()); scheduler = Schedulers.newSingle("sc-cluster-" + localMember.address().port(), true); @@ -260,7 +265,7 @@ private Mono doStart0() { new FailureDetectorImpl( localMember, transport, - membershipEvents.onBackpressureBuffer(), + membershipSink.asFlux().onBackpressureBuffer(), config.failureDetectorConfig(), scheduler, cidGenerator); @@ -269,7 +274,7 @@ private Mono doStart0() { new GossipProtocolImpl( localMember, transport, - membershipEvents.onBackpressureBuffer(), + membershipSink.asFlux().onBackpressureBuffer(), config.gossipConfig(), scheduler); @@ -294,8 +299,11 @@ private Mono doStart0() { membership .listen() /*.publishOn(scheduler)*/ - // Dont uncomment, already beign executed inside sc-cluster thread - .subscribe(membershipSink::next, this::onError, membershipSink::complete)); + // Dont uncomment, already beign executed inside scalecube-cluster thread + .subscribe( + event -> membershipSink.emitNext(event, RetryEmitFailureHandler.INSTANCE), + ex -> LOGGER.error("[{}][membership][error] cause:", localMember, ex), + () -> membershipSink.emitComplete(RetryEmitFailureHandler.INSTANCE))); return Mono.fromRunnable(() -> failureDetector.start()) .then(Mono.fromRunnable(() -> gossip.start())) @@ -317,30 +325,45 @@ private void validateConfiguration() { if (metadataCodec == null) { Object metadata = config.metadata(); if (metadata != null && !(metadata instanceof Serializable)) { - throw new IllegalArgumentException( - "Invalid cluster configuration: metadata must be Serializable"); + throw new IllegalArgumentException("Invalid cluster config: metadata must be Serializable"); } } + Objects.requireNonNull( + config.transportConfig().transportFactory(), + "Invalid cluster config: transportFactory must be specified"); + Objects.requireNonNull( config.transportConfig().messageCodec(), - "Invalid cluster configuration: transport.messageCodec must be specified"); + "Invalid cluster config: messageCodec must be specified"); Objects.requireNonNull( config.membershipConfig().namespace(), - "Invalid cluster configuration: membership.namespace must be specified"); + "Invalid cluster config: membership namespace must be specified"); if (!NAMESPACE_PATTERN.matcher(config.membershipConfig().namespace()).matches()) { throw new IllegalArgumentException( - "Invalid cluster config: membership.namespace format is invalid"); + "Invalid cluster config: membership namespace format is invalid"); } } private void startHandler() { ClusterMessageHandler handler = this.handler.apply(this); - actionsDisposables.add(listenMessage().subscribe(handler::onMessage, this::onError)); - actionsDisposables.add(listenMembership().subscribe(handler::onMembershipEvent, this::onError)); - actionsDisposables.add(listenGossip().subscribe(handler::onGossip, this::onError)); + actionsDisposables.add( + listenMessage() + .subscribe( + handler::onMessage, + ex -> LOGGER.error("[{}][onMessage][error] cause:", localMember, ex))); + actionsDisposables.add( + listenMembership() + .subscribe( + handler::onMembershipEvent, + ex -> LOGGER.error("[{}][onMembershipEvent][error] cause:", localMember, ex))); + actionsDisposables.add( + listenGossip() + .subscribe( + handler::onGossip, + ex -> LOGGER.error("[{}][onGossip][error] cause:", localMember, ex))); } private void startJmxMonitor() { @@ -357,10 +380,6 @@ private void startJmxMonitor() { } } - private void onError(Throwable th) { - LOGGER.error("[{}] Received unexpected error:", localMember, th); - } - private Flux listenMessage() { // filter out system messages return transport.listen().filter(msg -> !SYSTEM_MESSAGES.contains(msg.qualifier())); @@ -373,7 +392,7 @@ private Flux listenGossip() { private Flux listenMembership() { // listen on live stream - return membershipEvents.onBackpressureBuffer(); + return membershipSink.asFlux().onBackpressureBuffer(); } /** @@ -481,7 +500,7 @@ public Mono updateMetadata(T metadata) { @Override public void shutdown() { - shutdown.onComplete(); + shutdown.emitEmpty(RetryEmitFailureHandler.INSTANCE); } private Mono doShutdown() { @@ -524,12 +543,12 @@ private Mono dispose() { @Override public Mono onShutdown() { - return onShutdown; + return onShutdown.asMono(); } @Override public boolean isShutdown() { - return onShutdown.isDisposed(); + return onShutdown.asMono().toFuture().isDone(); } private static class SenderAwareTransport implements Transport { @@ -581,4 +600,14 @@ private Message enhanceWithSender(Message message) { return Message.with(message).sender(address).build(); } } + + private static class RetryEmitFailureHandler implements EmitFailureHandler { + + private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler(); + + @Override + public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) { + return emitResult == FAIL_NON_SERIALIZED; + } + } } diff --git a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java index 7c72de85..df243f9d 100644 --- a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java @@ -1,5 +1,7 @@ package io.scalecube.cluster.fdetector; +import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED; + import io.scalecube.cluster.CorrelationIdGenerator; import io.scalecube.cluster.Member; import io.scalecube.cluster.fdetector.PingData.AckType; @@ -20,10 +22,11 @@ import org.slf4j.LoggerFactory; import reactor.core.Disposable; import reactor.core.Disposables; -import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxProcessor; -import reactor.core.publisher.FluxSink; +import reactor.core.publisher.SignalType; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.EmitFailureHandler; +import reactor.core.publisher.Sinks.EmitResult; import reactor.core.scheduler.Scheduler; public final class FailureDetectorImpl implements FailureDetector { @@ -45,8 +48,8 @@ public final class FailureDetectorImpl implements FailureDetector { // State + private final List pingMembers = new ArrayList<>(); private long currentPeriod = 0; - private List pingMembers = new ArrayList<>(); private int pingMemberIndex = 0; // index for sequential ping member selection // Disposables @@ -54,12 +57,11 @@ public final class FailureDetectorImpl implements FailureDetector { private final Disposable.Composite actionsDisposables = Disposables.composite(); // Subject - private final FluxProcessor subject = - DirectProcessor.create().serialize(); - private final FluxSink sink = subject.sink(); + private final Sinks.Many sink = Sinks.many().multicast().directBestEffort(); // Scheduled + private final Scheduler scheduler; /** @@ -89,13 +91,24 @@ public FailureDetectorImpl( // Subscribe actionsDisposables.addAll( Arrays.asList( - membershipProcessor // Listen membership events to update remoteMembers - .publishOn(scheduler) - .subscribe(this::onMemberEvent, this::onError), transport .listen() // Listen failure detector requests .publishOn(scheduler) - .subscribe(this::onMessage, this::onError))); + .subscribe( + this::onMessage, + ex -> + LOGGER.error( + "[{}][{}][onMessage][error] cause:", localMember, currentPeriod, ex)), + membershipProcessor // Listen membership events to update remoteMembers + .publishOn(scheduler) + .subscribe( + this::onMembershipEvent, + ex -> + LOGGER.error( + "[{}][{}][onMembershipEvent][error] cause:", + localMember, + currentPeriod, + ex)))); } @Override @@ -111,12 +124,12 @@ public void stop() { actionsDisposables.dispose(); // Stop publishing events - sink.complete(); + sink.emitComplete(RetryEmitFailureHandler.INSTANCE); } @Override public Flux listen() { - return subject.onBackpressureBuffer(); + return sink.asFlux().onBackpressureBuffer(); } // ================================================ @@ -314,11 +327,7 @@ private void onTransitPingAck(Message message) { ex.toString())); } - private void onError(Throwable throwable) { - LOGGER.error("[{}][{}] Received unexpected error:", localMember, currentPeriod, throwable); - } - - private void onMemberEvent(MembershipEvent event) { + private void onMembershipEvent(MembershipEvent event) { Member member = event.member(); if (event.isRemoved()) { boolean removed = pingMembers.remove(member); @@ -376,7 +385,7 @@ private List selectPingReqMembers(Member pingMember) { private void publishPingResult(long period, Member member, MemberStatus status) { LOGGER.debug("[{}][{}] Member {} detected as {}", localMember, period, member, status); - sink.next(new FailureDetectorEvent(member, status)); + sink.emitNext(new FailureDetectorEvent(member, status), RetryEmitFailureHandler.INSTANCE); } private MemberStatus computeMemberStatus(Message message, long period) { @@ -424,4 +433,14 @@ private boolean isTransitPingAck(Message message) { Transport getTransport() { return transport; } + + private static class RetryEmitFailureHandler implements EmitFailureHandler { + + private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler(); + + @Override + public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) { + return emitResult == FAIL_NON_SERIALIZED; + } + } } diff --git a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java index d6edd8b8..7b63f5c1 100644 --- a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java @@ -1,5 +1,7 @@ package io.scalecube.cluster.gossip; +import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED; + import io.scalecube.cluster.ClusterMath; import io.scalecube.cluster.Member; import io.scalecube.cluster.membership.MembershipEvent; @@ -21,12 +23,13 @@ import org.slf4j.LoggerFactory; import reactor.core.Disposable; import reactor.core.Disposables; -import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxProcessor; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; +import reactor.core.publisher.SignalType; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.EmitFailureHandler; +import reactor.core.publisher.Sinks.EmitResult; import reactor.core.scheduler.Scheduler; public final class GossipProtocolImpl implements GossipProtocol { @@ -60,10 +63,7 @@ public final class GossipProtocolImpl implements GossipProtocol { // Subject - private final FluxProcessor subject = - DirectProcessor.create().serialize(); - - private final FluxSink sink = subject.sink(); + private final Sinks.Many sink = Sinks.many().multicast().directBestEffort(); // Scheduled @@ -95,12 +95,16 @@ public GossipProtocolImpl( Arrays.asList( membershipProcessor // Listen membership events to update remoteMembers .publishOn(scheduler) - .subscribe(this::onMemberEvent, this::onError), + .subscribe( + this::onMembershipEvent, + ex -> LOGGER.error("[{}][onMembershipEvent][error] cause:", localMember, ex)), transport .listen() // Listen gossip requests .publishOn(scheduler) - .filter(this::isGossipReq) - .subscribe(this::onGossipReq, this::onError))); + .filter(this::isGossipRequest) + .subscribe( + this::onGossipRequest, + ex -> LOGGER.error("[{}][onGossipRequest][error] cause:", localMember, ex)))); } @Override @@ -119,7 +123,7 @@ public void stop() { actionsDisposables.dispose(); // Stop publishing events - sink.complete(); + sink.emitComplete(RetryEmitFailureHandler.INSTANCE); } @Override @@ -131,7 +135,7 @@ public Mono spread(Message message) { @Override public Flux listen() { - return subject.onBackpressureBuffer(); + return sink.asFlux().onBackpressureBuffer(); } // ================================================ @@ -198,7 +202,7 @@ private String createAndPutGossip(Message message) { return gossip.gossipId(); } - private void onGossipReq(Message message) { + private void onGossipRequest(Message message) { final long period = this.currentPeriod; final GossipRequest gossipRequest = message.data(); for (Gossip gossip : gossipRequest.gossips()) { @@ -207,7 +211,7 @@ private void onGossipReq(Message message) { if (gossipState == null) { // new gossip gossipState = new GossipState(gossip, period); gossips.put(gossip.gossipId(), gossipState); - sink.next(gossip.message()); + sink.emitNext(gossip.message(), RetryEmitFailureHandler.INSTANCE); } gossipState.addToInfected(gossipRequest.from()); } @@ -235,7 +239,7 @@ private void checkGossipSegmentation() { } } - private void onMemberEvent(MembershipEvent event) { + private void onMembershipEvent(MembershipEvent event) { Member member = event.member(); if (event.isRemoved()) { boolean removed = remoteMembers.remove(member); @@ -260,15 +264,11 @@ private void onMemberEvent(MembershipEvent event) { } } - private void onError(Throwable throwable) { - LOGGER.error("[{}][{}] Received unexpected error:", localMember, currentPeriod, throwable); - } - // ================================================ // ============== Helper Methods ================== // ================================================ - private boolean isGossipReq(Message message) { + private boolean isGossipRequest(Message message) { return GOSSIP_REQ.equals(message.qualifier()); } @@ -384,4 +384,14 @@ Transport getTransport() { Member getMember() { return localMember; } + + private static class RetryEmitFailureHandler implements EmitFailureHandler { + + private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler(); + + @Override + public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) { + return emitResult == FAIL_NON_SERIALIZED; + } + } } diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java index 3a8a8c3c..9e2f2b2f 100644 --- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java @@ -3,6 +3,7 @@ import static io.scalecube.cluster.membership.MemberStatus.ALIVE; import static io.scalecube.cluster.membership.MemberStatus.DEAD; import static io.scalecube.cluster.membership.MemberStatus.LEAVING; +import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED; import io.scalecube.cluster.ClusterConfig; import io.scalecube.cluster.ClusterMath; @@ -44,12 +45,13 @@ import org.slf4j.LoggerFactory; import reactor.core.Disposable; import reactor.core.Disposables; -import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxProcessor; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; +import reactor.core.publisher.SignalType; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.EmitFailureHandler; +import reactor.core.publisher.Sinks.EmitResult; import reactor.core.scheduler.Scheduler; public final class MembershipProtocolImpl implements MembershipProtocol { @@ -93,9 +95,7 @@ private enum MembershipUpdateReason { // Subject - private final FluxProcessor subject = - DirectProcessor.create().serialize(); - private final FluxSink sink = subject.sink(); + private final Sinks.Many sink = Sinks.many().multicast().directBestEffort(); // Disposables private final Disposable.Composite actionsDisposables = Disposables.composite(); @@ -154,15 +154,23 @@ public MembershipProtocolImpl( transport .listen() // Listen to incoming SYNC and SYNC ACK requests from other members .publishOn(scheduler) - .subscribe(this::onMessage, this::onError), + .subscribe( + this::onMessage, + ex -> LOGGER.error("[{}][onMessage][error] cause:", localMember, ex)), failureDetector .listen() // Listen to events from failure detector .publishOn(scheduler) - .subscribe(this::onFailureDetectorEvent, this::onError), + .subscribe( + this::onFailureDetectorEvent, + ex -> + LOGGER.error( + "[{}][onFailureDetectorEvent][error] cause:", localMember, ex)), gossipProtocol .listen() // Listen to membership gossips .publishOn(scheduler) - .subscribe(this::onMembershipGossip, this::onError), + .subscribe( + this::onMembershipGossip, + ex -> LOGGER.error("[{}][onMembershipGossip][error] cause:", localMember, ex)), listen() // Listen removed members for monitoring .filter(MembershipEvent::isRemoved) .subscribe(this::onMemberRemoved))); @@ -204,7 +212,7 @@ private boolean checkAddressesNotEqual(Address address0, Address address1) { @Override public Flux listen() { - return subject.onBackpressureBuffer(); + return sink.asFlux().onBackpressureBuffer(); } /** @@ -273,7 +281,12 @@ private void start0(MonoSink sink) { address -> transport .requestResponse(address, prepareSyncDataMsg(SYNC, cidGenerator.nextCid())) - .doOnError(this::onSyncError) + .doOnError( + ex -> + LOGGER.warn( + "[{}] Exception on initial Sync, cause: {}", + localMember, + ex.toString())) .onErrorResume(Exception.class, e -> Mono.empty())) .toArray(Mono[]::new); @@ -288,7 +301,11 @@ private void start0(MonoSink sink) { schedulePeriodicSync(); sink.success(); }) - .subscribe(null, this::onSyncAckError); + .subscribe( + null, + ex -> + LOGGER.warn( + "[{}] Exception on initial SyncAck, cause: {}", localMember, ex.toString())); } @Override @@ -307,7 +324,7 @@ public void stop() { suspicionTimeoutTasks.clear(); // Stop publishing events - sink.complete(); + sink.emitComplete(RetryEmitFailureHandler.INSTANCE); } @Override @@ -363,10 +380,12 @@ private void doSync() { private void onMessage(Message message) { if (isSync(message)) { - onSync(message).subscribe(null, this::onError); + onSync(message) + .subscribe(null, ex -> LOGGER.error("[{}][onSync][error] cause:", localMember, ex)); } else if (isSyncAck(message)) { if (message.correlationId() == null) { // filter out initial sync - onSyncAck(message, false).subscribe(null, this::onError); + onSyncAck(message, false) + .subscribe(null, ex -> LOGGER.error("[{}][onSyncAck][error] cause:", localMember, ex)); } } } @@ -445,7 +464,13 @@ private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) { MembershipRecord record = new MembershipRecord(r0.member(), fdEvent.status(), r0.incarnation()); updateMembership(record, MembershipUpdateReason.FAILURE_DETECTOR_EVENT) - .subscribe(null, this::onError); + .subscribe( + null, + ex -> + LOGGER.error( + "[{}][onFailureDetectorEvent][updateMembership][error] cause:", + localMember, + ex)); } } @@ -455,7 +480,11 @@ private void onMembershipGossip(Message message) { MembershipRecord record = message.data(); LOGGER.debug("[{}] Received membership gossip: {}", localMember, record); updateMembership(record, MembershipUpdateReason.MEMBERSHIP_GOSSIP) - .subscribe(null, this::onError); + .subscribe( + null, + ex -> + LOGGER.error( + "[{}][onMembershipGossip][updateMembership][error] cause:", localMember, ex)); } } @@ -501,7 +530,13 @@ private Mono syncMembership(SyncData syncData, boolean onStart) { .map( r1 -> updateMembership(r1, reason) - .doOnError(ex -> onSyncMembershipError(reason, ex)) + .doOnError( + ex -> + LOGGER.warn( + "[{}][syncMembership][{}][error] cause: {}", + localMember, + reason, + ex.toString())) .onErrorResume(ex -> Mono.empty())) .toArray(Mono[]::new); @@ -536,31 +571,6 @@ private static boolean areNamespacesRelated(String namespace1, String namespace2 return areNamespacesRelated; } - private void onSyncMembershipError(MembershipUpdateReason reason, Throwable ex) { - LOGGER.debug( - "[{}][syncMembership][{}] Exception occurred, cause: {}", - localMember, - reason, - ex.toString()); - } - - private void onSyncError(Throwable ex) { - LOGGER.debug("[{}] Exception on initial Sync, cause: {}", localMember, ex.toString()); - } - - private void onSyncAckError(Throwable ex) { - LOGGER.debug("[{}] Exception on initial SyncAck, cause: {}", localMember, ex.toString()); - } - - private void onError(Throwable throwable) { - LOGGER.error("[{}] Received unexpected error:", localMember, throwable); - } - - @SuppressWarnings("unused") - private void onErrorIgnore(Throwable throwable) { - // no-op - } - /** * Try to update membership table with the given record. * @@ -704,7 +714,12 @@ private Mono onSelfMemberDetected( r1, r2); - spreadMembershipGossip(r2).doOnError(this::onErrorIgnore).subscribe(); + spreadMembershipGossip(r2) + .subscribe( + null, + th -> { + // no-op + }); }); } @@ -735,7 +750,7 @@ private Mono onLeavingDetected(MembershipRecord r0, MembershipRecord r1) { private void publishEvent(MembershipEvent event) { LOGGER.info("[{}][publishEvent] {}", localMember, event); - sink.next(event); + sink.emitNext(event, RetryEmitFailureHandler.INSTANCE); } private Mono onDeadMemberDetected(MembershipRecord r1) { @@ -830,7 +845,11 @@ private void onSuspicionTimeout(String memberId) { LOGGER.debug("[{}] Declare SUSPECTED member {} as DEAD by timeout", localMember, r); MembershipRecord deadRecord = new MembershipRecord(r.member(), DEAD, r.incarnation()); updateMembership(deadRecord, MembershipUpdateReason.SUSPICION_TIMEOUT) - .subscribe(null, this::onError); + .subscribe( + null, + ex -> + LOGGER.error( + "[{}][onSuspicionTimeout][updateMembership][error] cause:", localMember, ex)); } } @@ -839,7 +858,12 @@ private void spreadMembershipGossipUnlessGossiped( // Spread gossip (unless already gossiped) if (reason != MembershipUpdateReason.MEMBERSHIP_GOSSIP && reason != MembershipUpdateReason.INITIAL_SYNC) { - spreadMembershipGossip(r).doOnError(this::onErrorIgnore).subscribe(); + spreadMembershipGossip(r) + .subscribe( + null, + th -> { + // no-op + }); } } @@ -942,4 +966,14 @@ private void onMemberRemoved(MembershipEvent event) { removedMembersHistory.remove(0); } } + + private static class RetryEmitFailureHandler implements EmitFailureHandler { + + private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler(); + + @Override + public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) { + return emitResult == FAIL_NON_SERIALIZED; + } + } } diff --git a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java index 1cb0ebb3..1f0f599d 100644 --- a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java @@ -80,7 +80,12 @@ public void start() { // Subscribe actionsDisposables.add( // Listen to incoming get_metadata requests from other members - transport.listen().publishOn(scheduler).subscribe(this::onMessage, this::onError)); + transport + .listen() + .publishOn(scheduler) + .subscribe( + this::onMessage, + ex -> LOGGER.error("[{}][onMessage][error] cause:", localMember, ex))); } @Override @@ -194,10 +199,6 @@ private void onMessage(Message message) { } } - private void onError(Throwable throwable) { - LOGGER.error("[{}] Received unexpected error:", localMember, throwable); - } - private void onMetadataRequest(Message message) { final Address sender = message.sender(); LOGGER.debug("[{}] Received GetMetadataReq from {}", localMember, sender); @@ -240,7 +241,16 @@ private void onMetadataRequest(Message message) { } private ByteBuffer encodeMetadata() { - ByteBuffer result = config.metadataCodec().serialize(localMetadata); + ByteBuffer result = null; + try { + result = config.metadataCodec().serialize(localMetadata); + } catch (Exception e) { + LOGGER.error( + "[{}] Failed to encode metadata: {}, cause: {}", + localMember, + localMetadata, + e.toString()); + } return Optional.ofNullable(result).orElse(EMPTY_BUFFER); } } diff --git a/cluster/src/test/java/io/scalecube/cluster/BaseTest.java b/cluster/src/test/java/io/scalecube/cluster/BaseTest.java index 4d30e4a3..617626f6 100644 --- a/cluster/src/test/java/io/scalecube/cluster/BaseTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/BaseTest.java @@ -6,6 +6,7 @@ import io.scalecube.cluster.transport.api.TransportConfig; import io.scalecube.cluster.utils.NetworkEmulatorTransport; import io.scalecube.transport.netty.TransportImpl; +import io.scalecube.transport.netty.tcp.TcpTransportFactory; import java.time.Duration; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.AfterEach; @@ -47,11 +48,12 @@ protected void awaitSuspicion(int clusterSize) { } protected NetworkEmulatorTransport createTransport() { - return new NetworkEmulatorTransport(TransportImpl.bindAwait()); + return createTransport(TransportConfig.defaultConfig()); } protected NetworkEmulatorTransport createTransport(TransportConfig transportConfig) { - return new NetworkEmulatorTransport(TransportImpl.bindAwait(transportConfig)); + return new NetworkEmulatorTransport( + TransportImpl.bindAwait(transportConfig.transportFactory(new TcpTransportFactory()))); } protected void destroyTransport(Transport transport) { diff --git a/cluster/src/test/java/io/scalecube/cluster/ClusterNamespacesTest.java b/cluster/src/test/java/io/scalecube/cluster/ClusterNamespacesTest.java index 1d493649..bbcbd1ab 100644 --- a/cluster/src/test/java/io/scalecube/cluster/ClusterNamespacesTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/ClusterNamespacesTest.java @@ -31,10 +31,11 @@ public void testInvalidNamespaceFormat(String namespace) { Assertions.assertAll( () -> assertEquals( - "Invalid cluster config: membership.namespace format is invalid", + "Invalid cluster config: membership namespace format is invalid", actualException.getMessage())); } + @SuppressWarnings("unused") public static Stream testInvalidNamespaceFormat() { return Stream.of( of(""), @@ -231,6 +232,7 @@ public void testIsolatedParentNamespaces() { parent1.address(), parent2.address(), bob.address(), carol.address())) .startAwait(); + //noinspection unused Cluster eve = new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) diff --git a/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java b/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java index ff3b6f4a..1b7b94e0 100644 --- a/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java @@ -8,6 +8,7 @@ import io.scalecube.cluster.membership.MembershipEvent.Type; import io.scalecube.cluster.metadata.MetadataCodec; import io.scalecube.net.Address; +import io.scalecube.transport.netty.tcp.TcpTransportFactory; import java.net.InetAddress; import java.time.Duration; import java.util.ArrayList; @@ -25,7 +26,7 @@ import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.ReplayProcessor; +import reactor.core.publisher.Sinks; public class ClusterTest extends BaseTest { @@ -44,6 +45,7 @@ public void testStartStopRepeatedly() throws Exception { .membership(opts -> opts.syncInterval(100)) .transport(opts -> opts.port(address.port())) .transport(opts -> opts.connectTimeout(CONNECT_TIMEOUT)) + .transportFactory(TcpTransportFactory::new) .startAwait(); Cluster otherNode = @@ -53,6 +55,7 @@ public void testStartStopRepeatedly() throws Exception { .failureDetector(opts -> opts.pingInterval(100)) .membership(opts -> opts.syncInterval(100)) .transport(opts -> opts.connectTimeout(CONNECT_TIMEOUT)) + .transportFactory(TcpTransportFactory::new) .startAwait(); assertEquals(2, seedNode.members().size()); @@ -69,6 +72,7 @@ public void testStartStopRepeatedly() throws Exception { .membership(opts -> opts.syncInterval(100)) .transport(opts -> opts.port(address.port())) .transport(opts -> opts.connectTimeout(CONNECT_TIMEOUT)) + .transportFactory(TcpTransportFactory::new) .startAwait(); TimeUnit.SECONDS.sleep(1); @@ -81,9 +85,12 @@ public void testStartStopRepeatedly() throws Exception { @Test public void testMembersAccessFromScheduler() { // Start seed node - Cluster seedNode = new ClusterImpl().startAwait(); + Cluster seedNode = new ClusterImpl().transportFactory(TcpTransportFactory::new).startAwait(); Cluster otherNode = - new ClusterImpl().membership(opts -> opts.seedMembers(seedNode.address())).startAwait(); + new ClusterImpl() + .membership(opts -> opts.seedMembers(seedNode.address())) + .transportFactory(TcpTransportFactory::new) + .startAwait(); assertEquals(2, seedNode.members().size()); assertEquals(2, otherNode.members().size()); @@ -115,6 +122,7 @@ public void testJoinLocalhostIgnored() throws InterruptedException { new ClusterImpl() .transport(opts -> opts.port(4801).connectTimeout(CONNECT_TIMEOUT)) .membership(opts -> opts.seedMembers(addresses)) + .transportFactory(TcpTransportFactory::new) .startAwait(); Thread.sleep(CONNECT_TIMEOUT); @@ -141,6 +149,7 @@ public void testJoinLocalhostIgnoredWithOverride() throws InterruptedException { new ClusterImpl(new ClusterConfig().externalHost("localhost").externalPort(7878)) .transport(opts -> opts.port(7878).connectTimeout(CONNECT_TIMEOUT)) .membership(opts -> opts.seedMembers(addresses)) + .transportFactory(TcpTransportFactory::new) .startAwait(); Thread.sleep(CONNECT_TIMEOUT); @@ -152,7 +161,7 @@ public void testJoinLocalhostIgnoredWithOverride() throws InterruptedException { @Test public void testJoinDynamicPort() { // Start seed node - Cluster seedNode = new ClusterImpl().startAwait(); + Cluster seedNode = new ClusterImpl().transportFactory(TcpTransportFactory::new).startAwait(); int membersNum = 10; List otherNodes = new ArrayList<>(membersNum); @@ -163,6 +172,7 @@ public void testJoinDynamicPort() { otherNodes.add( new ClusterImpl() .membership(opts -> opts.seedMembers(seedNode.address())) + .transportFactory(TcpTransportFactory::new) .startAwait()); } LOGGER.info("Start up time: {} ms", System.currentTimeMillis() - startAt); @@ -178,7 +188,7 @@ public void testJoinDynamicPort() { @Test public void testUpdateMetadata() throws Exception { // Start seed member - Cluster seedNode = new ClusterImpl().startAwait(); + Cluster seedNode = new ClusterImpl().transportFactory(TcpTransportFactory::new).startAwait(); Cluster metadataNode = null; int testMembersNum = 10; @@ -193,6 +203,7 @@ public void testUpdateMetadata() throws Exception { new ClusterImpl() .config(opts -> opts.metadata(metadata)) .membership(opts -> opts.seedMembers(seedNode.address())) + .transportFactory(TcpTransportFactory::new) .startAwait(); // Start other test members @@ -201,6 +212,7 @@ public void testUpdateMetadata() throws Exception { integer -> new ClusterImpl() .membership(opts -> opts.seedMembers(seedNode.address())) + .transportFactory(TcpTransportFactory::new) .handler( cluster -> new ClusterMessageHandler() { @@ -248,7 +260,7 @@ public void onMembershipEvent(MembershipEvent event) { @Test public void testUpdateMetadataProperty() throws Exception { // Start seed member - Cluster seedNode = new ClusterImpl().startAwait(); + Cluster seedNode = new ClusterImpl().transportFactory(TcpTransportFactory::new).startAwait(); Cluster metadataNode = null; int testMembersNum = 10; @@ -264,6 +276,7 @@ public void testUpdateMetadataProperty() throws Exception { new ClusterImpl() .config(opts -> opts.metadata(metadata)) .membership(opts -> opts.seedMembers(seedNode.address())) + .transportFactory(TcpTransportFactory::new) .startAwait(); // Start other test members @@ -272,6 +285,7 @@ public void testUpdateMetadataProperty() throws Exception { integer -> new ClusterImpl() .membership(opts -> opts.seedMembers(seedNode.address())) + .transportFactory(TcpTransportFactory::new) .handler( cluster -> new ClusterMessageHandler() { @@ -324,7 +338,7 @@ public void onMembershipEvent(MembershipEvent event) { @Test public void testRemoveMetadataProperty() throws Exception { // Start seed member - Cluster seedNode = new ClusterImpl().startAwait(); + Cluster seedNode = new ClusterImpl().transportFactory(TcpTransportFactory::new).startAwait(); Cluster metadataNode = null; int testMembersNum = 10; @@ -340,6 +354,7 @@ public void testRemoveMetadataProperty() throws Exception { new ClusterImpl() .config(opts -> opts.metadata(metadata)) .membership(opts -> opts.seedMembers(seedNode.address())) + .transportFactory(TcpTransportFactory::new) .startAwait(); // Start other test members @@ -348,6 +363,7 @@ public void testRemoveMetadataProperty() throws Exception { integer -> new ClusterImpl() .membership(opts -> opts.seedMembers(seedNode.address())) + .transportFactory(TcpTransportFactory::new) .handler( cluster -> new ClusterMessageHandler() { @@ -417,22 +433,29 @@ public void onMembershipEvent(MembershipEvent event) { }; // Start seed member - final Cluster seedNode = new ClusterImpl().handler(cluster -> listener).startAwait(); + final Cluster seedNode = + new ClusterImpl() + .transportFactory(TcpTransportFactory::new) + .handler(cluster -> listener) + .startAwait(); // Start nodes final Cluster node1 = new ClusterImpl() .membership(opts -> opts.seedMembers(seedNode.address())) + .transportFactory(TcpTransportFactory::new) .handler(cluster -> listener) .startAwait(); final Cluster node2 = new ClusterImpl() .membership(opts -> opts.seedMembers(seedNode.address())) + .transportFactory(TcpTransportFactory::new) .handler(cluster -> listener) .startAwait(); final Cluster node3 = new ClusterImpl() .membership(opts -> opts.seedMembers(seedNode.address())) + .transportFactory(TcpTransportFactory::new) .handler(cluster -> listener) .startAwait(); @@ -449,18 +472,19 @@ public void onMembershipEvent(MembershipEvent event) { @Test public void testMemberMetadataRemoved() throws InterruptedException { // Start seed member - ReplayProcessor seedEvents = ReplayProcessor.create(); + Sinks.Many seedEvents = Sinks.many().replay().all(); Map seedMetadata = new HashMap<>(); seedMetadata.put("seed", "shmid"); final Cluster seedNode = new ClusterImpl() .config(opts -> opts.metadata(seedMetadata)) + .transportFactory(TcpTransportFactory::new) .handler( cluster -> new ClusterMessageHandler() { @Override public void onMembershipEvent(MembershipEvent event) { - seedEvents.onNext(event); + seedEvents.tryEmitNext(event); } }) .startAwait(); @@ -469,26 +493,29 @@ public void onMembershipEvent(MembershipEvent event) { // Start member with metadata Map node1Metadata = new HashMap<>(); node1Metadata.put("node", "shmod"); - ReplayProcessor node1Events = ReplayProcessor.create(); + Sinks.Many node1Events = Sinks.many().replay().all(); final Cluster node1 = new ClusterImpl() .config(opts -> opts.metadata(node1Metadata)) .membership(opts -> opts.seedMembers(seedNode.address())) + .transportFactory(TcpTransportFactory::new) .handler( cluster -> new ClusterMessageHandler() { @Override public void onMembershipEvent(MembershipEvent event) { - node1Events.onNext(event); + node1Events.tryEmitNext(event); } }) .startAwait(); // Check events - MembershipEvent nodeAddedEvent = seedEvents.as(Mono::from).block(Duration.ofSeconds(3)); + MembershipEvent nodeAddedEvent = + seedEvents.asFlux().as(Mono::from).block(Duration.ofSeconds(3)); assertEquals(Type.ADDED, nodeAddedEvent.type()); - MembershipEvent seedAddedEvent = node1Events.as(Mono::from).block(Duration.ofSeconds(3)); + MembershipEvent seedAddedEvent = + node1Events.asFlux().as(Mono::from).block(Duration.ofSeconds(3)); assertEquals(Type.ADDED, seedAddedEvent.type()); // Check metadata @@ -499,6 +526,7 @@ public void onMembershipEvent(MembershipEvent event) { CountDownLatch latch = new CountDownLatch(1); AtomicReference> removedMetadata = new AtomicReference<>(); seedEvents + .asFlux() .filter(MembershipEvent::isRemoved) .subscribe( event -> { @@ -518,13 +546,17 @@ public void onMembershipEvent(MembershipEvent event) { @Test public void testJoinSeedClusterWithNoExistingSeedMember() { // Start seed node - Cluster seedNode = new ClusterImpl().startAwait(); + Cluster seedNode = new ClusterImpl().transportFactory(TcpTransportFactory::new).startAwait(); Address nonExistingSeed1 = Address.from("localhost:1234"); Address nonExistingSeed2 = Address.from("localhost:5678"); Address[] seeds = new Address[] {nonExistingSeed1, nonExistingSeed2, seedNode.address()}; - Cluster otherNode = new ClusterImpl().membership(opts -> opts.seedMembers(seeds)).startAwait(); + Cluster otherNode = + new ClusterImpl() + .membership(opts -> opts.seedMembers(seeds)) + .transportFactory(TcpTransportFactory::new) + .startAwait(); assertEquals(otherNode.member(), seedNode.otherMembers().iterator().next()); assertEquals(seedNode.member(), otherNode.otherMembers().iterator().next()); diff --git a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java index 192110dc..384c0c23 100644 --- a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java @@ -86,6 +86,7 @@ class GossipProtocolTest extends BaseTest { // } // } + @SuppressWarnings("unused") static Stream experiment() { return experiments.stream().map(objects -> Arguments.of(objects[0], objects[1], objects[2])); } @@ -291,7 +292,7 @@ private void destroyGossipProtocols(List gossipProtocols) { // Await a bit try { - Thread.sleep(gossipProtocols.size() * 20); + Thread.sleep(gossipProtocols.size() * 20L); } catch (InterruptedException ignore) { // ignore } diff --git a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java index d5035504..63cea74a 100644 --- a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java @@ -32,11 +32,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import reactor.core.Exceptions; -import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; -import reactor.core.publisher.ReplayProcessor; +import reactor.core.publisher.Sinks; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.util.retry.Retry; @@ -586,8 +584,8 @@ public void testRestartStoppedMembers() { .then() .block(TIMEOUT); - ReplayProcessor cmA_RemovedHistory = startRecordingRemoved(cmA); - ReplayProcessor cmB_RemovedHistory = startRecordingRemoved(cmB); + Sinks.Many cmA_RemovedHistory = startRecordingRemoved(cmA); + Sinks.Many cmB_RemovedHistory = startRecordingRemoved(cmB); stop(cmC); stop(cmD); @@ -668,8 +666,8 @@ public void testRestartStoppedMembersOnSameAddresses() { assertTrusted(cmC, cmB.member(), cmA.member(), cmD.member()); assertTrusted(cmD, cmB.member(), cmC.member(), cmA.member()); - ReplayProcessor cmA_RemovedHistory = startRecordingRemoved(cmA); - ReplayProcessor cmB_RemovedHistory = startRecordingRemoved(cmB); + Sinks.Many cmA_RemovedHistory = startRecordingRemoved(cmA); + Sinks.Many cmB_RemovedHistory = startRecordingRemoved(cmB); stop(cmC); stop(cmD); @@ -867,9 +865,9 @@ public void testNetworkPartitionDueNoInboundThenRemoved() { assertTrusted(cmB, cmA.member(), cmC.member()); assertTrusted(cmC, cmB.member(), cmA.member()); - ReplayProcessor cmA_RemovedHistory = startRecordingRemoved(cmA); - ReplayProcessor cmB_RemovedHistory = startRecordingRemoved(cmB); - ReplayProcessor cmC_RemovedHistory = startRecordingRemoved(cmC); + Sinks.Many cmA_RemovedHistory = startRecordingRemoved(cmA); + Sinks.Many cmB_RemovedHistory = startRecordingRemoved(cmB); + Sinks.Many cmC_RemovedHistory = startRecordingRemoved(cmC); // block inbound msgs from all c.networkEmulator().blockAllInbound(); @@ -907,9 +905,9 @@ public void testNetworkPartitionDueNoInboundUntilRemovedThenInboundRecover() { assertTrusted(cmB, cmA.member(), cmC.member()); assertTrusted(cmC, cmB.member(), cmA.member()); - ReplayProcessor cmA_RemovedHistory = startRecordingRemoved(cmA); - ReplayProcessor cmB_RemovedHistory = startRecordingRemoved(cmB); - ReplayProcessor cmC_RemovedHistory = startRecordingRemoved(cmC); + Sinks.Many cmA_RemovedHistory = startRecordingRemoved(cmA); + Sinks.Many cmB_RemovedHistory = startRecordingRemoved(cmB); + Sinks.Many cmC_RemovedHistory = startRecordingRemoved(cmC); // block inbound msgs from all c.networkEmulator().blockAllInbound(); @@ -1059,10 +1057,10 @@ public void testNetworkPartitionManyDueNoInboundThenRemovedThenRecover() { assertTrusted(cmD, cmA.member(), cmB.member(), cmC.member()); assertNoSuspected(cmD); - ReplayProcessor cmA_removedHistory = startRecordingRemoved(cmA); - ReplayProcessor cmB_removedHistory = startRecordingRemoved(cmB); - ReplayProcessor cmC_removedHistory = startRecordingRemoved(cmC); - ReplayProcessor cmD_removedHistory = startRecordingRemoved(cmD); + Sinks.Many cmA_removedHistory = startRecordingRemoved(cmA); + Sinks.Many cmB_removedHistory = startRecordingRemoved(cmB); + Sinks.Many cmC_removedHistory = startRecordingRemoved(cmC); + Sinks.Many cmD_removedHistory = startRecordingRemoved(cmD); // Split into several clusters Stream.of(a, b, c, d) @@ -1129,8 +1127,7 @@ private MembershipProtocolImpl createMembership( private MembershipProtocolImpl createMembership(Transport transport, ClusterConfig config) { Member localMember = new Member(newMemberId(), null, transport.address(), NAMESPACE); - DirectProcessor membershipProcessor = DirectProcessor.create(); - FluxSink membershipSink = membershipProcessor.sink(); + Sinks.Many membershipProcessor = Sinks.many().multicast().directBestEffort(); CorrelationIdGenerator cidGenerator = new CorrelationIdGenerator(localMember.id()); @@ -1138,14 +1135,18 @@ private MembershipProtocolImpl createMembership(Transport transport, ClusterConf new FailureDetectorImpl( localMember, transport, - membershipProcessor, + membershipProcessor.asFlux().onBackpressureBuffer(), config.failureDetectorConfig(), scheduler, cidGenerator); GossipProtocolImpl gossipProtocol = new GossipProtocolImpl( - localMember, transport, membershipProcessor, config.gossipConfig(), scheduler); + localMember, + transport, + membershipProcessor.asFlux().onBackpressureBuffer(), + config.gossipConfig(), + scheduler); MetadataStoreImpl metadataStore = new MetadataStoreImpl(localMember, transport, null, config, scheduler, cidGenerator); @@ -1162,7 +1163,12 @@ private MembershipProtocolImpl createMembership(Transport transport, ClusterConf cidGenerator, new ClusterMonitorModel.Builder()); - membership.listen().subscribe(membershipSink::next); + membership + .listen() + .subscribe( + membershipProcessor::tryEmitNext, + membershipProcessor::tryEmitError, + membershipProcessor::tryEmitComplete); try { failureDetector.start(); @@ -1239,9 +1245,9 @@ private void assertSuspected(MembershipProtocolImpl membership, Member... expect } } - private void assertRemoved(ReplayProcessor recording, Member... expected) { + private void assertRemoved(Sinks.Many recording, Member... expected) { List actual = new ArrayList<>(); - recording.map(MembershipEvent::member).subscribe(actual::add); + recording.asFlux().map(MembershipEvent::member).subscribe(actual::add); assertEquals( expected.length, actual.size(), @@ -1261,10 +1267,6 @@ private void assertSelfTrusted(MembershipProtocolImpl membership) { assertTrusted(membership); } - private void assertNoRemoved(ReplayProcessor recording) { - assertRemoved(recording); - } - private void assertMemberAndType( MembershipEvent membershipEvent, String expectedMemberId, MembershipEvent.Type expectedType) { @@ -1283,10 +1285,12 @@ private List membersByStatus(MembershipProtocolImpl membership, MemberSt .collect(Collectors.toList()); } - private ReplayProcessor startRecordingRemoved( - MembershipProtocolImpl membership) { - ReplayProcessor recording = ReplayProcessor.create(); - membership.listen().filter(MembershipEvent::isRemoved).subscribe(recording); + private Sinks.Many startRecordingRemoved(MembershipProtocolImpl membership) { + Sinks.Many recording = Sinks.many().replay().all(); + membership + .listen() + .filter(MembershipEvent::isRemoved) + .subscribe(recording::tryEmitNext, recording::tryEmitError, recording::tryEmitComplete); return recording; } } diff --git a/codec-parent/pom.xml b/codec-parent/pom.xml index ab173007..b707c1b7 100644 --- a/codec-parent/pom.xml +++ b/codec-parent/pom.xml @@ -1,5 +1,7 @@ - + scalecube-cluster-parent diff --git a/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java b/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java index 0c9fc72f..cff607f2 100644 --- a/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java +++ b/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java @@ -6,6 +6,7 @@ import io.scalecube.cluster.ClusterConfig; import io.scalecube.cluster.ClusterImpl; import io.scalecube.cluster.Member; +import io.scalecube.transport.netty.tcp.TcpTransportFactory; import java.util.Collections; import java.util.Map; @@ -19,13 +20,18 @@ public class ClusterJoinExamples { /** Main method. */ public static void main(String[] args) { // Start seed member Alice - Cluster alice = new ClusterImpl().config(opts -> opts.memberAlias("Alice")).startAwait(); + Cluster alice = + new ClusterImpl() + .config(opts -> opts.memberAlias("Alice")) + .transportFactory(TcpTransportFactory::new) + .startAwait(); // Join Bob to cluster with Alice Cluster bob = new ClusterImpl() .config(opts -> opts.memberAlias("Bob")) .membership(opts -> opts.seedMembers(alice.address())) + .transportFactory(TcpTransportFactory::new) .startAwait(); // Join Carol to cluster with metadata @@ -34,6 +40,7 @@ public static void main(String[] args) { new ClusterImpl() .config(opts -> opts.memberAlias("Carol").metadata(metadata)) .membership(opts -> opts.seedMembers(alice.address())) + .transportFactory(TcpTransportFactory::new) .startAwait(); // Start Dan on port 3000 @@ -42,7 +49,10 @@ public static void main(String[] args) { .memberAlias("Dan") .membership(opts -> opts.seedMembers(alice.address())) .transport(opts -> opts.port(3000)); - Cluster dan = new ClusterImpl(configWithFixedPort).startAwait(); + Cluster dan = + new ClusterImpl(configWithFixedPort) + .transportFactory(TcpTransportFactory::new) + .startAwait(); // Start Eve in separate cluster (separate sync group) ClusterConfig configWithSyncGroup = @@ -56,7 +66,10 @@ public static void main(String[] args) { carol.address(), dan.address()) // won't join anyway .namespace("another-cluster")); - Cluster eve = new ClusterImpl(configWithSyncGroup).startAwait(); + Cluster eve = + new ClusterImpl(configWithSyncGroup) + .transportFactory(TcpTransportFactory::new) + .startAwait(); // Print cluster members of each node diff --git a/examples/src/main/java/io/scalecube/examples/ClusterJoinNamespacesExamples.java b/examples/src/main/java/io/scalecube/examples/ClusterJoinNamespacesExamples.java index 33fc1adb..bb1910b1 100644 --- a/examples/src/main/java/io/scalecube/examples/ClusterJoinNamespacesExamples.java +++ b/examples/src/main/java/io/scalecube/examples/ClusterJoinNamespacesExamples.java @@ -5,6 +5,7 @@ import io.scalecube.cluster.Cluster; import io.scalecube.cluster.ClusterImpl; import io.scalecube.cluster.Member; +import io.scalecube.transport.netty.tcp.TcpTransportFactory; public class ClusterJoinNamespacesExamples { @@ -15,6 +16,7 @@ public static void main(String[] args) { new ClusterImpl() .config(opts -> opts.memberAlias("Alice")) .membership(opts -> opts.namespace("alice")) + .transportFactory(TcpTransportFactory::new) .startAwait(); // Join Bob to cluster (seed: Alice) @@ -23,6 +25,7 @@ public static void main(String[] args) { .config(opts -> opts.memberAlias("Bob")) .membership(opts -> opts.namespace("alice/bob-and-carol")) .membership(opts -> opts.seedMembers(alice.address())) + .transportFactory(TcpTransportFactory::new) .startAwait(); // Join Carol to cluster (seed: Alice and Bob) @@ -31,6 +34,7 @@ public static void main(String[] args) { .config(opts -> opts.memberAlias("Carol")) .membership(opts -> opts.namespace("alice/bob-and-carol")) .membership(opts -> opts.seedMembers(alice.address())) + .transportFactory(TcpTransportFactory::new) .startAwait(); Cluster bobAndCarolChild1 = @@ -38,6 +42,7 @@ public static void main(String[] args) { .config(opts -> opts.memberAlias("Bob-and-Carol-Child-1")) .membership(opts -> opts.namespace("alice/bob-and-carol/child-1")) .membership(opts -> opts.seedMembers(alice.address())) + .transportFactory(TcpTransportFactory::new) .startAwait(); Cluster carolChild2 = @@ -45,6 +50,7 @@ public static void main(String[] args) { .config(opts -> opts.memberAlias("Bob-and-Carol-Child-2")) .membership(opts -> opts.namespace("alice/bob-and-carol/child-2")) .membership(opts -> opts.seedMembers(alice.address())) + .transportFactory(TcpTransportFactory::new) .startAwait(); // Join Dan to cluster @@ -53,6 +59,7 @@ public static void main(String[] args) { .config(opts -> opts.memberAlias("Dan")) .membership(opts -> opts.namespace("alice/dan-and-eve")) .membership(opts -> opts.seedMembers(alice.address())) + .transportFactory(TcpTransportFactory::new) .startAwait(); // Join Eve to cluster @@ -61,6 +68,7 @@ public static void main(String[] args) { .config(opts -> opts.memberAlias("Eve")) .membership(opts -> opts.namespace("alice/dan-and-eve")) .membership(opts -> opts.seedMembers(alice.address())) + .transportFactory(TcpTransportFactory::new) .startAwait(); // Print cluster members of each node diff --git a/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java b/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java index ac632a29..95824c93 100644 --- a/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java +++ b/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java @@ -5,6 +5,7 @@ import io.scalecube.cluster.ClusterMessageHandler; import io.scalecube.cluster.Member; import io.scalecube.cluster.transport.api.Message; +import io.scalecube.transport.netty.tcp.TcpTransportFactory; import java.util.Collections; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -22,7 +23,7 @@ public class ClusterMetadataExample { /** Main method. */ public static void main(String[] args) throws Exception { // Start seed cluster member Alice - Cluster alice = new ClusterImpl().startAwait(); + Cluster alice = new ClusterImpl().transportFactory(TcpTransportFactory::new).startAwait(); // Join Joe to cluster with metadata and listen for incoming messages and print them to stdout //noinspection unused @@ -30,6 +31,7 @@ public static void main(String[] args) throws Exception { new ClusterImpl() .config(opts -> opts.metadata(Collections.singletonMap("name", "Joe"))) .membership(opts -> opts.seedMembers(alice.address())) + .transportFactory(TcpTransportFactory::new) .handler( cluster -> { //noinspection CodeBlock2Expr diff --git a/examples/src/main/java/io/scalecube/examples/GossipExample.java b/examples/src/main/java/io/scalecube/examples/GossipExample.java index d33ebaaf..2135880c 100644 --- a/examples/src/main/java/io/scalecube/examples/GossipExample.java +++ b/examples/src/main/java/io/scalecube/examples/GossipExample.java @@ -4,6 +4,7 @@ import io.scalecube.cluster.ClusterImpl; import io.scalecube.cluster.ClusterMessageHandler; import io.scalecube.cluster.transport.api.Message; +import io.scalecube.transport.netty.tcp.TcpTransportFactory; /** * Basic example for member gossiping between cluster members. to run the example Start ClusterNodeA @@ -18,6 +19,7 @@ public static void main(String[] args) throws Exception { // Start cluster nodes and subscribe on listening gossips Cluster alice = new ClusterImpl() + .transportFactory(TcpTransportFactory::new) .handler( cluster -> { return new ClusterMessageHandler() { @@ -33,6 +35,7 @@ public void onGossip(Message gossip) { Cluster bob = new ClusterImpl() .membership(opts -> opts.seedMembers(alice.address())) + .transportFactory(TcpTransportFactory::new) .handler( cluster -> { return new ClusterMessageHandler() { @@ -48,6 +51,7 @@ public void onGossip(Message gossip) { Cluster carol = new ClusterImpl() .membership(opts -> opts.seedMembers(alice.address())) + .transportFactory(TcpTransportFactory::new) .handler( cluster -> { return new ClusterMessageHandler() { @@ -63,6 +67,7 @@ public void onGossip(Message gossip) { Cluster dan = new ClusterImpl() .membership(opts -> opts.seedMembers(alice.address())) + .transportFactory(TcpTransportFactory::new) .handler( cluster -> { return new ClusterMessageHandler() { @@ -76,7 +81,10 @@ public void onGossip(Message gossip) { // Start cluster node Eve that joins cluster and spreads gossip Cluster eve = - new ClusterImpl().membership(opts -> opts.seedMembers(alice.address())).startAwait(); + new ClusterImpl() + .membership(opts -> opts.seedMembers(alice.address())) + .transportFactory(TcpTransportFactory::new) + .startAwait(); eve.spreadGossip(Message.fromData("Gossip from Eve")) .doOnError(System.err::println) .subscribe(null, Throwable::printStackTrace); diff --git a/examples/src/main/java/io/scalecube/examples/MembershipEventsExample.java b/examples/src/main/java/io/scalecube/examples/MembershipEventsExample.java index 20bdd636..2040588f 100644 --- a/examples/src/main/java/io/scalecube/examples/MembershipEventsExample.java +++ b/examples/src/main/java/io/scalecube/examples/MembershipEventsExample.java @@ -7,6 +7,7 @@ import io.scalecube.cluster.fdetector.FailureDetectorConfig; import io.scalecube.cluster.membership.MembershipConfig; import io.scalecube.cluster.membership.MembershipEvent; +import io.scalecube.transport.netty.tcp.TcpTransportFactory; import java.text.SimpleDateFormat; import java.util.Collections; import java.util.Date; @@ -28,6 +29,7 @@ public static void main(String[] args) throws Exception { new ClusterImpl() .config(opts -> opts.memberAlias("Alice")) .config(opts -> opts.metadata(Collections.singletonMap("name", "Alice"))) + .transportFactory(TcpTransportFactory::new) .handler( cluster -> { return new ClusterMessageHandler() { @@ -46,6 +48,7 @@ public void onMembershipEvent(MembershipEvent event) { .config(opts -> opts.memberAlias("Bob")) .config(opts -> opts.metadata(Collections.singletonMap("name", "Bob"))) .membership(opts -> opts.seedMembers(alice.address())) + .transportFactory(TcpTransportFactory::new) .handler( cluster -> { return new ClusterMessageHandler() { @@ -64,6 +67,7 @@ public void onMembershipEvent(MembershipEvent event) { .config(opts -> opts.memberAlias("Carol")) .config(opts -> opts.metadata(Collections.singletonMap("name", "Carol"))) .membership(opts -> opts.seedMembers(bob.address())) + .transportFactory(TcpTransportFactory::new) .handler( cluster -> { return new ClusterMessageHandler() { diff --git a/examples/src/main/java/io/scalecube/examples/MessagingExample.java b/examples/src/main/java/io/scalecube/examples/MessagingExample.java index 8a695a38..b747b229 100644 --- a/examples/src/main/java/io/scalecube/examples/MessagingExample.java +++ b/examples/src/main/java/io/scalecube/examples/MessagingExample.java @@ -4,6 +4,7 @@ import io.scalecube.cluster.ClusterImpl; import io.scalecube.cluster.ClusterMessageHandler; import io.scalecube.cluster.transport.api.Message; +import io.scalecube.transport.netty.tcp.TcpTransportFactory; import reactor.core.publisher.Flux; /** @@ -19,6 +20,7 @@ public static void main(String[] args) throws Exception { // Start cluster node Alice to listen and respond for incoming greeting messages Cluster alice = new ClusterImpl() + .transportFactory(TcpTransportFactory::new) .handler( cluster -> { return new ClusterMessageHandler() { @@ -38,6 +40,7 @@ public void onMessage(Message msg) { Cluster bob = new ClusterImpl() .membership(opts -> opts.seedMembers(alice.address())) + .transportFactory(TcpTransportFactory::new) .handler( cluster -> { return new ClusterMessageHandler() { @@ -56,6 +59,7 @@ public void onMessage(Message msg) { Cluster carol = new ClusterImpl() .membership(opts -> opts.seedMembers(alice.address(), bob.address())) + .transportFactory(TcpTransportFactory::new) .handler( cluster -> { return new ClusterMessageHandler() { diff --git a/transport-parent/pom.xml b/transport-parent/pom.xml index 09fb2923..6d0f5e91 100644 --- a/transport-parent/pom.xml +++ b/transport-parent/pom.xml @@ -1,5 +1,7 @@ - + 4.0.0 diff --git a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportFactory.java b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportFactory.java index 64a746cb..f806934c 100644 --- a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportFactory.java +++ b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportFactory.java @@ -1,10 +1,6 @@ package io.scalecube.cluster.transport.api; -import io.scalecube.utils.ServiceLoaderUtil; - public interface TransportFactory { - TransportFactory INSTANCE = ServiceLoaderUtil.findFirst(TransportFactory.class).orElse(null); - Transport createTransport(TransportConfig config); } diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java index 3385cd44..95fcac1f 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java @@ -1,36 +1,37 @@ package io.scalecube.transport.netty; +import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.ByteBufOutputStream; +import io.netty.buffer.Unpooled; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.EncoderException; +import io.netty.util.ReferenceCountUtil; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.MessageCodec; import io.scalecube.cluster.transport.api.Transport; import io.scalecube.cluster.transport.api.TransportConfig; -import io.scalecube.cluster.transport.api.TransportFactory; import io.scalecube.net.Address; -import io.scalecube.transport.netty.tcp.TcpTransportFactory; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Consumer; import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; import reactor.core.Disposables; import reactor.core.Exceptions; -import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.SignalType; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.EmitFailureHandler; +import reactor.core.publisher.Sinks.EmitResult; import reactor.netty.Connection; import reactor.netty.DisposableServer; import reactor.netty.resources.LoopResources; @@ -42,12 +43,11 @@ public final class TransportImpl implements Transport { private final MessageCodec messageCodec; // Subject - private final DirectProcessor subject = DirectProcessor.create(); - private final FluxSink sink = subject.sink(); + private final Sinks.Many sink = Sinks.many().multicast().directBestEffort(); // Close handler - private final MonoProcessor stop = MonoProcessor.create(); - private final MonoProcessor onStop = MonoProcessor.create(); + private final Sinks.One stop = Sinks.one(); + private final Sinks.One onStop = Sinks.one(); // Server private Address address; @@ -73,12 +73,13 @@ public TransportImpl(MessageCodec messageCodec, Receiver receiver, Sender sender } private static Address prepareAddress(DisposableServer server) { - InetAddress address = ((InetSocketAddress) server.address()).getAddress(); - int port = ((InetSocketAddress) server.address()).getPort(); - if (address.isAnyLocalAddress()) { + final InetSocketAddress serverAddress = (InetSocketAddress) server.address(); + InetAddress inetAddress = serverAddress.getAddress(); + int port = serverAddress.getPort(); + if (inetAddress.isAnyLocalAddress()) { return Address.create(Address.getLocalIpAddress().getHostAddress(), port); } else { - return Address.create(address.getHostAddress(), port); + return Address.create(inetAddress.getHostAddress(), port); } } @@ -86,8 +87,9 @@ private void init(DisposableServer server) { this.server = server; this.address = prepareAddress(server); // Setup cleanup - stop.then(doStop()) - .doFinally(s -> onStop.onComplete()) + stop.asMono() + .then(doStop()) + .doFinally(s -> onStop.emitEmpty(RetryEmitFailureHandler.INSTANCE)) .subscribe( null, ex -> LOGGER.warn("[{}][doStop] Exception occurred: {}", address, ex.toString())); } @@ -134,11 +136,8 @@ public static Mono bind() { * @return promise for bind operation */ public static Mono bind(TransportConfig config) { - return Optional.ofNullable( - Optional.ofNullable(config.transportFactory()).orElse(TransportFactory.INSTANCE)) - .orElse(new TcpTransportFactory()) - .createTransport(config) - .start(); + Objects.requireNonNull(config.transportFactory(), "[bind] transportFactory"); + return config.transportFactory().createTransport(config).start(); } /** @@ -148,17 +147,17 @@ public static Mono bind(TransportConfig config) { */ @Override public Mono start() { - return Mono.deferWithContext(context -> receiver.bind()) + return Mono.deferContextual(context -> receiver.bind()) .doOnNext(this::init) - .doOnSuccess(t -> LOGGER.info("[bind0][{}] Bound cluster transport", t.address())) - .doOnError(ex -> LOGGER.error("[bind0][{}] Exception occurred: {}", address, ex.toString())) + .doOnSuccess(t -> LOGGER.info("[start][{}] Bound cluster transport", t.address())) + .doOnError(ex -> LOGGER.error("[start][{}] Exception occurred: {}", address, ex.toString())) .thenReturn(this) .cast(Transport.class) - .subscriberContext( + .contextWrite( context -> context.put( ReceiverContext.class, - new ReceiverContext(loopResources, this::toMessage, sink::next))); + new ReceiverContext(address, sink, loopResources, this::decodeMessage))); } @Override @@ -168,15 +167,15 @@ public Address address() { @Override public boolean isStopped() { - return onStop.isDisposed(); + return onStop.asMono().toFuture().isDone(); } @Override public final Mono stop() { return Mono.defer( () -> { - stop.onComplete(); - return onStop; + stop.emitEmpty(RetryEmitFailureHandler.INSTANCE); + return onStop.asMono(); }); } @@ -185,7 +184,7 @@ private Mono doStop() { () -> { LOGGER.info("[{}][doStop] Stopping", address); // Complete incoming messages observable - sink.complete(); + sink.emitComplete(RetryEmitFailureHandler.INSTANCE); return Flux.concatDelayError(closeServer(), shutdownLoopResources()) .then() .doFinally(s -> connections.clear()) @@ -195,20 +194,20 @@ private Mono doStop() { @Override public final Flux listen() { - return subject.onBackpressureBuffer(); + return sink.asFlux().onBackpressureBuffer(); } @Override public Mono send(Address address, Message message) { - return Mono.deferWithContext(context -> connections.computeIfAbsent(address, this::connect0)) + return Mono.deferContextual(context -> connections.computeIfAbsent(address, this::connect)) .flatMap( connection -> - Mono.deferWithContext(context -> sender.send(message)) - .subscriberContext(context -> context.put(Connection.class, connection))) - .subscriberContext( + Mono.deferContextual(context -> sender.send(message)) + .contextWrite(context -> context.put(Connection.class, connection))) + .contextWrite( context -> context.put( - SenderContext.class, new SenderContext(loopResources, this::toByteBuf))); + SenderContext.class, new SenderContext(loopResources, this::encodeMessage))); } @Override @@ -238,7 +237,7 @@ public Mono requestResponse(Address address, final Message request) { }); } - private Message toMessage(ByteBuf byteBuf) { + private Message decodeMessage(ByteBuf byteBuf) { try (ByteBufInputStream stream = new ByteBufInputStream(byteBuf, true)) { return messageCodec.deserialize(stream); } catch (Exception e) { @@ -247,7 +246,7 @@ private Message toMessage(ByteBuf byteBuf) { } } - private ByteBuf toByteBuf(Message message) { + private ByteBuf encodeMessage(Message message) { ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(); ByteBufOutputStream stream = new ByteBufOutputStream(byteBuf); try { @@ -260,20 +259,29 @@ private ByteBuf toByteBuf(Message message) { return byteBuf; } - private Mono connect0(Address address1) { + private Mono connect(Address remoteAddress) { return sender - .connect(address1) + .connect(remoteAddress) .doOnSuccess( connection -> { - connection.onDispose().doOnTerminate(() -> connections.remove(address1)).subscribe(); + connection + .onDispose() + .doOnTerminate(() -> connections.remove(remoteAddress)) + .subscribe(); LOGGER.debug( - "[{}][connected][{}] Channel: {}", address, address1, connection.channel()); + "[{}][connect][success] remoteAddress: {}, channel: {}", + address, + remoteAddress, + connection.channel()); }) .doOnError( th -> { LOGGER.warn( - "[{}][connect0][{}] Exception occurred: {}", address, address1, th.toString()); - connections.remove(address1); + "[{}][connect][error] remoteAddress: {}, cause: {}", + address, + remoteAddress, + th.toString()); + connections.remove(remoteAddress); }) .cache(); } @@ -301,29 +309,46 @@ private Mono shutdownLoopResources() { public static final class ReceiverContext { + private final Address address; + private final Sinks.Many sink; private final LoopResources loopResources; private final Function messageDecoder; - private final Consumer messageConsumer; private ReceiverContext( + Address address, + Sinks.Many sink, LoopResources loopResources, - Function messageDecoder, - Consumer messageConsumer) { + Function messageDecoder) { + this.address = address; + this.sink = sink; this.loopResources = loopResources; this.messageDecoder = messageDecoder; - this.messageConsumer = messageConsumer; } public LoopResources loopResources() { return loopResources; } - public Function messageDecoder() { - return messageDecoder; - } - - public void onMessage(Message message) { - messageConsumer.accept(message); + /** + * Inbound message handler method. Filters out junk byteBufs, decodes accepted byteBufs into + * messages, and then publishing messages on sink instance. + * + * @param byteBuf byteBuf + */ + public void onMessage(ByteBuf byteBuf) { + try { + if (byteBuf == Unpooled.EMPTY_BUFFER) { + return; + } + if (!byteBuf.isReadable()) { + ReferenceCountUtil.safeRelease(byteBuf); + return; + } + final Message message = messageDecoder.apply(byteBuf); + sink.emitNext(message, RetryEmitFailureHandler.INSTANCE); + } catch (Exception e) { + LOGGER.error("[{}][onMessage] Exception occurred:", address, e); + } } } @@ -345,4 +370,14 @@ public Function messageEncoder() { return messageEncoder; } } + + private static class RetryEmitFailureHandler implements EmitFailureHandler { + + private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler(); + + @Override + public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) { + return emitResult == FAIL_NON_SERIALIZED; + } + } } diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpReceiver.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpReceiver.java index f2cb8c4e..5d9192dd 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpReceiver.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpReceiver.java @@ -4,7 +4,6 @@ import io.scalecube.cluster.transport.api.TransportConfig; import io.scalecube.transport.netty.Receiver; import io.scalecube.transport.netty.TransportImpl.ReceiverContext; -import java.net.InetSocketAddress; import reactor.core.publisher.Mono; import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer; @@ -19,17 +18,11 @@ final class TcpReceiver implements Receiver { @Override public Mono bind() { - return Mono.deferWithContext(context -> Mono.just(context.get(ReceiverContext.class))) + return Mono.deferContextual(context -> Mono.just(context.get(ReceiverContext.class))) .flatMap( context -> newTcpServer(context) - .handle( - (in, out) -> - in.receive() - .retain() - .map(context.messageDecoder()) - .doOnNext(context::onMessage) - .then()) + .handle((in, out) -> in.receive().retain().doOnNext(context::onMessage).then()) .bind() .cast(DisposableServer.class)); } @@ -37,14 +30,13 @@ public Mono bind() { private TcpServer newTcpServer(ReceiverContext context) { return TcpServer.create() .runOn(context.loopResources()) - .bindAddress(() -> new InetSocketAddress(config.port())) + .port(config.port()) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.SO_REUSEADDR, true) .doOnChannelInit( - (connectionObserver, channel, remoteAddress) -> { - new TcpChannelInitializer(config.maxFrameLength()) - .accept(connectionObserver, channel); - }); + (connectionObserver, channel, remoteAddress) -> + new TcpChannelInitializer(config.maxFrameLength()) + .accept(connectionObserver, channel)); } } diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java index 5cfa2c29..50e68002 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java @@ -8,7 +8,6 @@ import io.scalecube.transport.netty.TransportImpl.SenderContext; import reactor.core.publisher.Mono; import reactor.netty.Connection; -import reactor.netty.resources.ConnectionProvider; import reactor.netty.tcp.TcpClient; final class TcpSender implements Sender { @@ -21,14 +20,14 @@ final class TcpSender implements Sender { @Override public Mono connect(Address address) { - return Mono.deferWithContext(context -> Mono.just(context.get(SenderContext.class))) + return Mono.deferContextual(context -> Mono.just(context.get(SenderContext.class))) .map(context -> newTcpClient(context, address)) .flatMap(TcpClient::connect); } @Override public Mono send(Message message) { - return Mono.deferWithContext( + return Mono.deferContextual( context -> { Connection connection = context.get(Connection.class); SenderContext senderContext = context.get(SenderContext.class); @@ -41,7 +40,7 @@ public Mono send(Message message) { private TcpClient newTcpClient(SenderContext context, Address address) { TcpClient tcpClient = - TcpClient.create(ConnectionProvider.newConnection()) + TcpClient.newConnection() .runOn(context.loopResources()) .host(address.host()) .port(address.port()) diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketReceiver.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketReceiver.java index e204ac3b..e15860a7 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketReceiver.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketReceiver.java @@ -1,11 +1,9 @@ package io.scalecube.transport.netty.websocket; import io.netty.channel.ChannelOption; -import io.netty.util.ReferenceCountUtil; import io.scalecube.cluster.transport.api.TransportConfig; import io.scalecube.transport.netty.Receiver; import io.scalecube.transport.netty.TransportImpl.ReceiverContext; -import java.net.InetSocketAddress; import reactor.core.publisher.Mono; import reactor.netty.DisposableServer; import reactor.netty.http.server.HttpServer; @@ -24,7 +22,7 @@ final class WebsocketReceiver implements Receiver { @Override public Mono bind() { - return Mono.deferWithContext(context -> Mono.just(context.get(ReceiverContext.class))) + return Mono.deferContextual(context -> Mono.just(context.get(ReceiverContext.class))) .flatMap( context -> newHttpServer(context) @@ -36,7 +34,7 @@ public Mono bind() { private HttpServer newHttpServer(ReceiverContext context) { return HttpServer.create() .runOn(context.loopResources()) - .bindAddress(() -> new InetSocketAddress(config.port())) + .port(config.port()) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.SO_REUSEADDR, true); @@ -48,17 +46,6 @@ private Mono onMessage( HttpServerResponse response) { return response.sendWebsocket( (WebsocketInbound inbound, WebsocketOutbound outbound) -> - inbound - .receive() - .retain() - .doOnNext( - byteBuf -> { - if (!byteBuf.isReadable()) { - ReferenceCountUtil.safeRelease(byteBuf); - return; - } - context.onMessage(context.messageDecoder().apply(byteBuf)); - }) - .then()); + inbound.receive().retain().doOnNext(context::onMessage).then()); } } diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketSender.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketSender.java index 14f66500..34b134d4 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketSender.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketSender.java @@ -22,14 +22,14 @@ final class WebsocketSender implements Sender { @Override public Mono connect(Address address) { - return Mono.deferWithContext(context -> Mono.just(context.get(SenderContext.class))) + return Mono.deferContextual(context -> Mono.just(context.get(SenderContext.class))) .map(context -> newWebsocketSender(context, address)) .flatMap(sender -> sender.uri("/").connect()); } @Override public Mono send(Message message) { - return Mono.deferWithContext( + return Mono.deferContextual( context -> { Connection connection = context.get(Connection.class); SenderContext senderContext = context.get(SenderContext.class); diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java index f3da9803..f8bb8daa 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java @@ -22,7 +22,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import reactor.core.publisher.Flux; -import reactor.core.publisher.ReplayProcessor; +import reactor.core.publisher.Sinks; import reactor.test.StepVerifier; public class TcpTransportTest extends BaseTest { @@ -322,8 +322,10 @@ public void testBlockAndUnblockTraffic() throws Exception { server.listen().subscribe(message -> server.send(message.sender(), message).subscribe()); - ReplayProcessor responses = ReplayProcessor.create(); - client.listen().subscribe(responses); + Sinks.Many responses = Sinks.many().replay().all(); + client + .listen() + .subscribe(responses::tryEmitNext, responses::tryEmitError, responses::tryEmitComplete); // test at unblocked transport send(client, server.address(), Message.fromQualifier("q/unblocked")).subscribe(); @@ -333,7 +335,7 @@ public void testBlockAndUnblockTraffic() throws Exception { client.networkEmulator().blockOutbound(server.address()); send(client, server.address(), Message.fromQualifier("q/blocked")).subscribe(); - StepVerifier.create(responses) + StepVerifier.create(responses.asFlux()) .assertNext(message -> assertEquals("q/unblocked", message.qualifier())) .expectNoEvent(Duration.ofMillis(300)) .thenCancel() diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java index 88495dcf..050474af 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java @@ -22,7 +22,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import reactor.core.publisher.Flux; -import reactor.core.publisher.ReplayProcessor; +import reactor.core.publisher.Sinks; import reactor.test.StepVerifier; public class WebsocketTransportTest extends BaseTest { @@ -322,8 +322,10 @@ public void testBlockAndUnblockTraffic() throws Exception { server.listen().subscribe(message -> server.send(message.sender(), message).subscribe()); - ReplayProcessor responses = ReplayProcessor.create(); - client.listen().subscribe(responses); + Sinks.Many responses = Sinks.many().replay().all(); + client + .listen() + .subscribe(responses::tryEmitNext, responses::tryEmitError, responses::tryEmitComplete); // test at unblocked transport send(client, server.address(), Message.fromQualifier("q/unblocked")).subscribe(); @@ -333,7 +335,7 @@ public void testBlockAndUnblockTraffic() throws Exception { client.networkEmulator().blockOutbound(server.address()); send(client, server.address(), Message.fromQualifier("q/blocked")).subscribe(); - StepVerifier.create(responses) + StepVerifier.create(responses.asFlux()) .assertNext(message -> assertEquals("q/unblocked", message.qualifier())) .expectNoEvent(Duration.ofMillis(300)) .thenCancel() diff --git a/transport-parent/transport-netty/src/test/resources/log4j2-test.xml b/transport-parent/transport-netty/src/test/resources/log4j2-test.xml index 372d711c..fe7356a9 100644 --- a/transport-parent/transport-netty/src/test/resources/log4j2-test.xml +++ b/transport-parent/transport-netty/src/test/resources/log4j2-test.xml @@ -17,7 +17,7 @@ - +