From 744c029ea08c536b3c2fd7f86bcf4ad04b5d311b Mon Sep 17 00:00:00 2001 From: bennidi Date: Sun, 2 Oct 2016 23:01:46 +0200 Subject: [PATCH] Version 1.3.0 fixes PR 127 --- README.md | 82 ++++++++++--------- build.gradle | 2 +- changelog/README.md | 10 +++ pom.xml | 11 ++- .../mbassy/bus/AbstractPubSubSupport.java | 2 +- .../bus/AbstractSyncAsyncMessageBus.java | 4 +- .../engio/mbassy/bus/IMessagePublication.java | 13 +-- .../java/net/engio/mbassy/bus/MBassador.java | 14 ++-- .../engio/mbassy/bus/MessagePublication.java | 28 +++++-- .../net/engio/mbassy/bus/SyncMessageBus.java | 13 +-- .../mbassy/bus/common/PubSubSupport.java | 4 +- .../mbassy/bus/error/MessageBusException.java | 3 - .../mbassy/bus/error/PublicationError.java | 49 ++++++----- .../bus/publication/IPublicationCommand.java | 4 +- .../bus/publication/SyncAsyncPostCommand.java | 4 +- .../AsynchronousHandlerInvocation.java | 5 +- .../dispatch/EnvelopedMessageDispatcher.java | 3 +- .../dispatch/FilteredMessageDispatcher.java | 3 +- .../mbassy/dispatch/HandlerInvocation.java | 4 +- .../mbassy/dispatch/IHandlerInvocation.java | 3 +- .../mbassy/dispatch/IMessageDispatcher.java | 3 +- .../mbassy/dispatch/MessageDispatcher.java | 8 +- .../dispatch/ReflectiveHandlerInvocation.java | 42 +++++----- .../SynchronizedHandlerInvocation.java | 5 +- .../mbassy/subscription/Subscription.java | 3 +- .../net/engio/mbassy/ConcurrentSetTest.java | 62 ++++++++++++-- .../net/engio/mbassy/MethodDispatchTest.java | 45 +++++----- .../engio/mbassy/common/AssertSupport.java | 4 + .../listeners/CustomInvocationListener.java | 3 +- 29 files changed, 271 insertions(+), 165 deletions(-) diff --git a/README.md b/README.md index 3a25b46f..86180ef2 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,15 @@ MBassador ========= -MBassador is a light-weight, high-performance message (event) bus implementation based on the [publish subscribe pattern](https://en.wikipedia.org/wiki/Publish-subscribe_pattern). It is designed for ease of use and aims to be feature rich and extensible while preserving resource efficiency and performance. +MBassador is a light-weight, high-performance event bus implementing the [publish subscribe pattern](https://en.wikipedia.org/wiki/Publish-subscribe_pattern). It is designed for ease of use and aims to be feature rich and extensible while preserving resource efficiency and performance. -The core of MBassador's high performance is a specialized data structure that minimizes lock contention such that performance degradation of concurrent read/write access is minimal. The advantages of this design are illustrated in the [eventbus-performance](https://github.com/bennidi/eventbus-performance) github repository. +The core of MBassador's high performance is a specialized data structure that provides non-blocking readers and minimizes lock contention for writers such that performance degradation of concurrent read/write access is minimal. The advantages of this design are illustrated in this [github repository](https://github.com/bennidi/eventbus-performance). -Read this introduction to get an overview of MBassadors features. There is also some documentation in the Wiki - although admittedly not enough to make a developer happy. Additionally, you can browse the [javadoc](http://bennidi.github.io/mbassador/) +The code is production ready: 86% instruction coverage, 82% branch coverage with highly randomized and concurrently run test sets, no severe bugs have been reported in the last 18 month. No modifications to the core will be made without thouroughly testing the code. -There is a [spring-extension](https://github.com/bennidi/mbassador-spring) available to support CDI-like transactional message sending in a Spring environment. This is a good example of integration with other frameworks. +For documentation you can browse the [javadoc](http://bennidi.github.io/mbassador/), read this overview, check out the wiki resources. -> ################ NOTE #################### - -> [15.10.2015] My spare time programming efforts have shifted to another open source project - [openmediaid](http://www.openmediaid.org). I will not be able to actively push development of this library anymore. Any developers interested in becoming co-maintainers of this library... you are very welcome! +There is a [spring-extension](https://github.com/bennidi/mbassador-spring) available to support CDI-like transactional message sending in a Spring environment. This is a good example of integration with other frameworks. An example of [Guice integration](https://github.com/bennidi/mbassador/wiki/Guice-Integration) also exists. Table of contents: * [Usage](#usage) @@ -30,24 +28,27 @@ Using MBassador in your project is very easy. Create as many instances of MBassa As a first reference, consider this illustrative example. You might want to have a look at the collection of [examples](./examples) to see its features on more detail. - // Define your listener - class SimpleFileListener{ - - @Handler - public void handle(File msg){ - // do something with the file - } - - } - - // somewhere else in your code - - MBassador bus = new MBassador(); - Object listener = new SimpleFileListener(); - bus.subscribe (listener); - bus.post(new File("/tmp/smallfile.csv")).now(); - bus.post(new File("/tmp/bigfile.csv")).asynchronously(); - +```java + +// Define your listener +class SimpleFileListener{ + + @Handler + public void handle(File msg){ + // do something with the file + } + +} + +// somewhere else in your code + +MBassador bus = new MBassador(); +Object listener = new SimpleFileListener(); +bus.subscribe (listener); +bus.post(new File("/tmp/smallfile.csv")).now(); +bus.post(new File("/tmp/bigfile.csv")).asynchronously(); + +``` ##Features @@ -56,24 +57,29 @@ As a first reference, consider this illustrative example. You might want to have |Annotation|Function| |:-----|:-----| -|`@Handler`|Defines and customizes a message handler. Any well-formed method annotated with `@Handler` will cause instances of the defining class to be treated as message listeners| +|`@Handler`|Mark a method as message handler| |`@Listener`|Can be used to customize listener wide configuration like the used reference type| |`@Enveloped`|A message envelope can be used to pass messages of different types into a single handler| |`@Filter`|Add filtering to prevent certain messages from being published| -> Delivers everything +> Delivers everything, respects type hierarchy Messages do not need to implement any interface and can be of any type. The class hierarchy of a message is considered during message delivery, such that handlers will also receive subtypes of the message type they consume for - e.g. a handler of Object.class receives everything. Messages that do not match any handler result in the publication of a `DeadMessage` object which wraps the original message. DeadMessage events can be handled by registering listeners that handle DeadMessage. > Synchronous and asynchronous message delivery -A handler can be invoked to handle a message either synchronously or asynchronously. This is configurable for each handler via annotations. Message publication itself supports synchronous (method blocks until messages are delivered to all handlers) or asynchronous (fire and forget) dispatch +There are two types of (a-)synchronicity when using MBassador: message dispatch and handler invocation. +For message dispatch _synchronous_ means that the publishing method blocks until messages are delivered to all handlers and _asynchronous_ means that the publish method returns immediately and the message will be dispatched in another thread (fire and forget). + +For handler invocation synchronous means that within a running publication all handlers are called sequentially. _Asynchronous_ means that the handler invocation is pushed into a queue and the next handler is invoked with waiting for the previous to finish. > Configurable reference types -By default, MBassador uses weak references for listeners to relieve the programmer of the need to explicitly unsubscribe listeners that are not used anymore and avoid memory-leaks. This is very comfortable in container managed environments where listeners are created and destroyed by frameworks, i.e. Spring, Guice etc. Just stuff everything into the message bus, it will ignore objects without message handlers and automatically clean-up orphaned weak references after the garbage collector has done its job. Instead of using weak references, a listener can be configured to be referenced using strong references using `@Listener(references=References.Strong)`. Strongly referenced listeners will stick around until explicitly unsubscribed. +By default, MBassador uses **weak references** for listeners to relieve the programmer of the need to explicitly unsubscribe listeners that are not used anymore and **avoid memory-leaks**. This is very comfortable in container managed environments where listeners are created and destroyed by frameworks, i.e. Spring, Guice etc. Just add everything to the bus, it will ignore objects without handlers and automatically clean-up orphaned weak references after the garbage collector has done its job. + +Instead of using weak references, a listener can be configured to be referenced using strong references using `@Listener(references=References.Strong)`. Strongly referenced listeners will stick around until explicitly unsubscribed. -> Filtering +> Message filtering MBassador offers static message filtering. Filters are configured using annotations and multiple filters can be attached to a single message handler. Since version 1.2.0 Java EL expressions in `@Handler` are another way to define conditional message dispatch. Messages that have matching handlers but do not pass the configured filters result in the publication of a FilteredMessage object which wraps the original message. FilteredMessage events can be handled by registering listeners that handle FilteredMessage. @@ -96,11 +102,13 @@ MBassador is designed to be extensible with custom implementations of various co

Installation

MBassador is available from the Maven Central Repository using the following coordinates: ```xml - - net.engio - mbassador - {see.git.tags.for.latest.version} - + + + net.engio + mbassador + {see.git.tags.for.latest.version} + + ``` You can also download binary release and javadoc from the [maven central repository](http://search.maven.org/#search|ga|1|mbassador). Of course you can always clone the repository and build from source. @@ -119,7 +127,7 @@ I liked the simplicity of its design and I trust in the code quality of google l I want to thank the development team from [friendsurance](http://www.friendsurance.de) for their support and feedback on the bus implementation and the management for allowing me to publish the component as an open source project. -I also want to thank all githubbers who have made [contributions](https://github.com/bennidi/mbassador/pulls?q=is%3Apr+is%3Aclosed). It was always a pleasure to see how users got engaged into the libraries development and support. Special thanks go to +I also want to thank all githubbers who have made [contributions](https://github.com/bennidi/mbassador/pulls?q=is%3Apr+is%3Aclosed). Special thanks go to + [arne-vandamme](http://github.com/arne-vandamme) for adding support for [meta-annotations](https://github.com/bennidi/mbassador/pull/74) + [Bernd Rosstauscher](http://github.com/Rossi1337) for providing an initial integration with JUEL + [David Sowerby](http://github.com/davidsowerby) for answering user questions, his tutorial on [guice integration](bennidi/mbassador/wiki/guice-integration) and his various PRs @@ -129,7 +137,7 @@ I also want to thank all githubbers who have made [contributions](https://github Many thanks also to ej-technologies for providing me with an open source license of [![JProfiler](http://www.ej-technologies.com/images/banners/jprofiler_small.png)](http://www.ej-technologies.com/products/jprofiler/overview.html) and Jetbrains for a license of [IntelliJ IDEA](http://www.jetbrains.com/idea/) -Mbassador uses the following open source projects: +MBassador uses the following open source projects: * [jUnit](http://www.junit.org) * [maven](http://www.maven.org) diff --git a/build.gradle b/build.gradle index 66a2d2aa..fc72a799 100644 --- a/build.gradle +++ b/build.gradle @@ -2,7 +2,7 @@ apply plugin: 'java' apply plugin: 'war' // needed for providedCompile group="org.mbassy" -version="1.2.4-SNAPSHOT" +version="1.2.5-SNAPSHOT" diff --git a/changelog/README.md b/changelog/README.md index a262424e..02092050 100644 --- a/changelog/README.md +++ b/changelog/README.md @@ -1,3 +1,13 @@ +### 1.3.0 + + Non-Breaking API changes + + Extended IMessagePublication to allow for error reporting using `hasError()` and `getError()` + + Any publication method now returns an IMessagePublication object. This resolves [PR-127](../pull/127). Any dispatched + message publication can now be inspected for execution error. Does not support collection of multiple errors due to implied + GC and memory allocation overhead in high-throughput scenarios. + + Breaking API changes + + Added MessagePublication to IHandlerInvocation.invoke(...) + + Added MessagePublication to IMessageDispatcher.dispatch(...) + ### 1.2.4.2 + Updated pom. Now using nexus-staging plugin + Removed pmd diff --git a/pom.xml b/pom.xml index dfded10f..5269fc22 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 net.engio mbassador - 1.2.5-SNAPSHOT + 1.3.0 bundle mbassador @@ -24,8 +24,8 @@ Documentation for this pom see http://central.sonatype.org/pages/apache-maven.html - mvn clean deploy - mvn nexus-staging:release + For deployment (and release of non SNAPSHOT): mvn clean deploy + For manual release of staging: mvn nexus-staging:release git push origin --> @@ -323,6 +323,11 @@ Sets the name of the property containing the settings for JaCoCo runtime agent. --> + surefireArgLine diff --git a/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java b/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java index b52c35ca..8c6bd11f 100644 --- a/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java +++ b/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java @@ -80,7 +80,7 @@ public BusRuntime getRuntime() { return runtime; } - protected IMessagePublication createMessagePublication(T message) { + protected MessagePublication createMessagePublication(T message) { Collection subscriptions = getSubscriptionsByMessageType(message.getClass()); if ((subscriptions == null || subscriptions.isEmpty()) && !message.getClass() .equals(DeadMessage.class)) { diff --git a/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java index 82c7e2c8..563dd881 100644 --- a/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java +++ b/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java @@ -82,7 +82,7 @@ public void run() { // this method queues a message delivery request - protected IMessagePublication addAsynchronousPublication(IMessagePublication publication) { + protected IMessagePublication addAsynchronousPublication(MessagePublication publication) { try { pendingMessages.put(publication); return publication.markScheduled(); @@ -93,7 +93,7 @@ protected IMessagePublication addAsynchronousPublication(IMessagePublication pub } // this method queues a message delivery request - protected IMessagePublication addAsynchronousPublication(IMessagePublication publication, long timeout, TimeUnit unit) { + protected IMessagePublication addAsynchronousPublication(MessagePublication publication, long timeout, TimeUnit unit) { try { return pendingMessages.offer(publication, timeout, unit) ? publication.markScheduled() diff --git a/src/main/java/net/engio/mbassy/bus/IMessagePublication.java b/src/main/java/net/engio/mbassy/bus/IMessagePublication.java index 47d63226..40b78264 100644 --- a/src/main/java/net/engio/mbassy/bus/IMessagePublication.java +++ b/src/main/java/net/engio/mbassy/bus/IMessagePublication.java @@ -1,5 +1,6 @@ package net.engio.mbassy.bus; +import net.engio.mbassy.bus.error.PublicationError; import net.engio.mbassy.subscription.Subscription; /** @@ -15,11 +16,6 @@ */ public interface IMessagePublication { - boolean add(Subscription subscription); // TODO: this method should not be part of the interface - - /* - TODO: document state transitions - */ void execute(); boolean isFinished(); @@ -28,9 +24,9 @@ public interface IMessagePublication { boolean isScheduled(); - void markDelivered(); // TODO: this method should not be part of the interface + boolean hasError(); - IMessagePublication markScheduled(); // TODO: this method should not be part of the interface + PublicationError getError(); boolean isDeadMessage(); @@ -38,7 +34,4 @@ public interface IMessagePublication { Object getMessage(); - - // TODO: This interface should only be used as return type to public API calls (clients). Internally the implementation - // of the interface should be used. This would allow to remove the unwanted methods from this interface. } diff --git a/src/main/java/net/engio/mbassy/bus/MBassador.java b/src/main/java/net/engio/mbassy/bus/MBassador.java index a9f96285..90187fdd 100644 --- a/src/main/java/net/engio/mbassy/bus/MBassador.java +++ b/src/main/java/net/engio/mbassy/bus/MBassador.java @@ -45,10 +45,6 @@ public MBassador(IBusConfiguration configuration) { super(configuration); } - - - - public IMessagePublication publishAsync(T message) { return addAsynchronousPublication(createMessagePublication(message)); } @@ -64,17 +60,19 @@ public IMessagePublication publishAsync(T message, long timeout, TimeUnit unit) * * @param message */ - public void publish(T message) { + public IMessagePublication publish(T message) { + IMessagePublication publication = createMessagePublication(message); try { - IMessagePublication publication = createMessagePublication(message); publication.execute(); } catch (Throwable e) { handlePublicationError(new PublicationError() .setMessage("Error during publication of message") .setCause(e) - .setPublishedMessage(message)); + .setPublication(publication)); + } + finally{ + return publication; } - } diff --git a/src/main/java/net/engio/mbassy/bus/MessagePublication.java b/src/main/java/net/engio/mbassy/bus/MessagePublication.java index eec2fd58..16a1775c 100644 --- a/src/main/java/net/engio/mbassy/bus/MessagePublication.java +++ b/src/main/java/net/engio/mbassy/bus/MessagePublication.java @@ -2,6 +2,7 @@ import net.engio.mbassy.bus.common.DeadMessage; import net.engio.mbassy.bus.common.FilteredMessage; +import net.engio.mbassy.bus.error.PublicationError; import net.engio.mbassy.subscription.Subscription; import java.util.Collection; @@ -23,8 +24,10 @@ public class MessagePublication implements IMessagePublication { private final Object message; // message publications can be referenced by multiple threads to query publication progress private volatile State state = State.Initial; - private volatile boolean delivered = false; + private volatile boolean dispatched = false; private final BusRuntime runtime; + private PublicationError error = null; + protected MessagePublication(BusRuntime runtime, Collection subscriptions, Object message, State initialState) { this.runtime = runtime; @@ -51,7 +54,7 @@ public void execute() { // This happens if subscriptions are empty (due to GC of weak listeners or explicit desubscription) // or if configured filters do not let a message pass. The flag is set by the dispatchers. // META: This seems to be a suboptimal design - if (!delivered) { + if (!dispatched) { if (!isFilteredMessage() && !isDeadMessage()) { runtime.getProvider().publish(new FilteredMessage(message)); } else if (!isDeadMessage()) { @@ -73,8 +76,20 @@ public boolean isScheduled() { return state.equals(State.Scheduled); } - public void markDelivered() { - delivered = true; + public boolean hasError() { + return this.error != null; + } + + @Override + public PublicationError getError() { + return error; + } + + public void markDispatched() { + dispatched = true; + } + public void markError(PublicationError error) { + this.error = error; } public MessagePublication markScheduled() { @@ -84,7 +99,6 @@ public MessagePublication markScheduled() { return this; } - public boolean isDeadMessage() { return DeadMessage.class.equals(message.getClass()); } @@ -98,12 +112,12 @@ public Object getMessage() { } private enum State { - Initial, Scheduled, Running, Finished, Error + Initial, Scheduled, Running, Finished } public static class Factory { - public IMessagePublication createPublication(BusRuntime runtime, Collection subscriptions, Object message) { + public MessagePublication createPublication(BusRuntime runtime, Collection subscriptions, Object message) { return new MessagePublication(runtime, subscriptions, message, State.Initial); } diff --git a/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java index 49432176..5633f17e 100644 --- a/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java +++ b/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java @@ -42,14 +42,17 @@ public SyncMessageBus(IBusConfiguration configuration) { } @Override - public void publish(T message) { + public IMessagePublication publish(T message) { + IMessagePublication publication = createMessagePublication(message); try { - IMessagePublication publication = createMessagePublication(message); publication.execute(); } catch (Throwable e) { handlePublicationError(new PublicationError().setMessage("Error during publication of message") .setCause(e) - .setPublishedMessage(message)); + .setPublication(publication)); + } + finally{ + return publication; } } @@ -67,8 +70,8 @@ public SyncPostCommand(T message) { } @Override - public void now() { - publish(message); + public IMessagePublication now() { + return publish(message); } } } diff --git a/src/main/java/net/engio/mbassy/bus/common/PubSubSupport.java b/src/main/java/net/engio/mbassy/bus/common/PubSubSupport.java index f0cebc1c..e791fd55 100644 --- a/src/main/java/net/engio/mbassy/bus/common/PubSubSupport.java +++ b/src/main/java/net/engio/mbassy/bus/common/PubSubSupport.java @@ -1,5 +1,7 @@ package net.engio.mbassy.bus.common; +import net.engio.mbassy.bus.IMessagePublication; + /** * This interface defines the very basic message publication semantics according to the publish subscribe pattern. * Listeners can be subscribed and unsubscribed using the corresponding methods. When a listener is subscribed its @@ -37,5 +39,5 @@ public interface PubSubSupport extends RuntimeProvider{ * * @param message */ - void publish(T message); + IMessagePublication publish(T message); } diff --git a/src/main/java/net/engio/mbassy/bus/error/MessageBusException.java b/src/main/java/net/engio/mbassy/bus/error/MessageBusException.java index bbc6b6fd..1c819bcb 100644 --- a/src/main/java/net/engio/mbassy/bus/error/MessageBusException.java +++ b/src/main/java/net/engio/mbassy/bus/error/MessageBusException.java @@ -8,9 +8,6 @@ */ public class MessageBusException extends Exception{ - public MessageBusException() { - } - public MessageBusException(String message) { super(message); } diff --git a/src/main/java/net/engio/mbassy/bus/error/PublicationError.java b/src/main/java/net/engio/mbassy/bus/error/PublicationError.java index 42484660..a7c6198a 100644 --- a/src/main/java/net/engio/mbassy/bus/error/PublicationError.java +++ b/src/main/java/net/engio/mbassy/bus/error/PublicationError.java @@ -21,53 +21,55 @@ public class PublicationError{ // Internal state private Throwable cause; - private String message; + private String errorMsg; private Method handler; private Object listener; - private Object publishedMessage; + private IMessagePublication publication; + private Object message; + /** * Compound constructor, creating a PublicationError from the supplied objects. * * @param cause The Throwable giving rise to this PublicationError. - * @param message The message to send. + * @param errorMsg The message to send. * @param handler The method where the error was created. * @param listener The object in which the PublicationError was generated. - * @param publishedObject The published object which gave rise to the error. + * @param publication The publication that errored */ public PublicationError(final Throwable cause, - final String message, + final String errorMsg, final Method handler, final Object listener, - final Object publishedObject) { + final IMessagePublication publication) { this.cause = cause; - this.message = message; + this.errorMsg = errorMsg; this.handler = handler; this.listener = listener; - this.publishedMessage = publishedObject; + this.publication = publication; + this.message = publication != null ? publication.getMessage() : null; } public PublicationError(final Throwable cause, - final String message, + final String errorMsg, final IMessagePublication publication) { this.cause = cause; - this.message = message; - this.publishedMessage = publication != null ? publication.getMessage() : null; + this.errorMsg = errorMsg; } public PublicationError(final Throwable cause, - final String message, + final String errorMsg, final SubscriptionContext context) { this.cause = cause; - this.message = message; + this.errorMsg = errorMsg; this.handler = context.getHandler().getMethod(); } - public PublicationError(Throwable cause, String message) { + public PublicationError(Throwable cause, String errorMsg) { this.cause = cause; - this.message = message; + this.errorMsg = errorMsg; } @@ -97,10 +99,15 @@ public PublicationError setCause(Throwable cause) { } public String getMessage() { - return message; + return errorMsg; } public PublicationError setMessage(String message) { + this.errorMsg = message; + return this; + } + + public PublicationError setPublishedMessage(Object message) { this.message = message; return this; } @@ -124,11 +131,11 @@ public PublicationError setListener(Object listener) { } public Object getPublishedMessage() { - return publishedMessage; + return message; } - public PublicationError setPublishedMessage(Object publishedMessage) { - this.publishedMessage = publishedMessage; + public PublicationError setPublication(IMessagePublication publication) { + this.publication = publication; return this; } @@ -142,13 +149,13 @@ public String toString() { newLine + "\tcause=" + cause + newLine + - "\tmessage='" + message + '\'' + + "\tmessage='" + errorMsg + '\'' + newLine + "\thandler=" + handler + newLine + "\tlistener=" + listener + newLine + - "\tpublishedMessage=" + publishedMessage + + "\tpublishedMessage=" + getPublishedMessage() + '}'; } } diff --git a/src/main/java/net/engio/mbassy/bus/publication/IPublicationCommand.java b/src/main/java/net/engio/mbassy/bus/publication/IPublicationCommand.java index f26e6953..409bba50 100644 --- a/src/main/java/net/engio/mbassy/bus/publication/IPublicationCommand.java +++ b/src/main/java/net/engio/mbassy/bus/publication/IPublicationCommand.java @@ -1,5 +1,7 @@ package net.engio.mbassy.bus.publication; +import net.engio.mbassy.bus.IMessagePublication; + /** * A publication command is used as an intermediate object created by a call to the message bus' post method. * It encapsulates the message publication flavors provided by the message bus implementation that created the command. @@ -11,5 +13,5 @@ public interface IPublicationCommand { * Execute the message publication immediately. This call blocks until every matching message handler * has been invoked. */ - void now(); + IMessagePublication now(); } diff --git a/src/main/java/net/engio/mbassy/bus/publication/SyncAsyncPostCommand.java b/src/main/java/net/engio/mbassy/bus/publication/SyncAsyncPostCommand.java index 87601da6..bf5550e6 100644 --- a/src/main/java/net/engio/mbassy/bus/publication/SyncAsyncPostCommand.java +++ b/src/main/java/net/engio/mbassy/bus/publication/SyncAsyncPostCommand.java @@ -22,8 +22,8 @@ public SyncAsyncPostCommand(MBassador mBassador, T message) { } @Override - public void now() { - mBassador.publish(message); + public IMessagePublication now() { + return mBassador.publish(message); } @Override diff --git a/src/main/java/net/engio/mbassy/dispatch/AsynchronousHandlerInvocation.java b/src/main/java/net/engio/mbassy/dispatch/AsynchronousHandlerInvocation.java index 47457292..4b6996cf 100644 --- a/src/main/java/net/engio/mbassy/dispatch/AsynchronousHandlerInvocation.java +++ b/src/main/java/net/engio/mbassy/dispatch/AsynchronousHandlerInvocation.java @@ -1,5 +1,6 @@ package net.engio.mbassy.dispatch; +import net.engio.mbassy.bus.MessagePublication; import net.engio.mbassy.bus.config.IBusConfiguration; import net.engio.mbassy.subscription.AbstractSubscriptionContextAware; @@ -27,11 +28,11 @@ public AsynchronousHandlerInvocation(IHandlerInvocation delegate) { * {@inheritDoc} */ @Override - public void invoke(final Object listener, final Object message){ + public void invoke(final Object listener, final Object message, final MessagePublication publication){ executor.execute(new Runnable() { @Override public void run() { - delegate.invoke(listener, message); + delegate.invoke(listener, message, publication); } }); } diff --git a/src/main/java/net/engio/mbassy/dispatch/EnvelopedMessageDispatcher.java b/src/main/java/net/engio/mbassy/dispatch/EnvelopedMessageDispatcher.java index f2eb79ba..44e6bc5b 100644 --- a/src/main/java/net/engio/mbassy/dispatch/EnvelopedMessageDispatcher.java +++ b/src/main/java/net/engio/mbassy/dispatch/EnvelopedMessageDispatcher.java @@ -1,6 +1,7 @@ package net.engio.mbassy.dispatch; import net.engio.mbassy.bus.IMessagePublication; +import net.engio.mbassy.bus.MessagePublication; import net.engio.mbassy.subscription.MessageEnvelope; /** @@ -20,7 +21,7 @@ public EnvelopedMessageDispatcher(IMessageDispatcher dispatcher) { } @Override - public void dispatch(IMessagePublication publication, Object message, Iterable listeners){ + public void dispatch(MessagePublication publication, Object message, Iterable listeners){ getDelegate().dispatch(publication, new MessageEnvelope(message), listeners); } } diff --git a/src/main/java/net/engio/mbassy/dispatch/FilteredMessageDispatcher.java b/src/main/java/net/engio/mbassy/dispatch/FilteredMessageDispatcher.java index fe53c6a4..522f25c4 100644 --- a/src/main/java/net/engio/mbassy/dispatch/FilteredMessageDispatcher.java +++ b/src/main/java/net/engio/mbassy/dispatch/FilteredMessageDispatcher.java @@ -1,6 +1,7 @@ package net.engio.mbassy.dispatch; import net.engio.mbassy.bus.IMessagePublication; +import net.engio.mbassy.bus.MessagePublication; import net.engio.mbassy.listener.IMessageFilter; /** @@ -36,7 +37,7 @@ private boolean passesFilter(Object message) { @Override - public void dispatch(IMessagePublication publication, Object message, Iterable listeners){ + public void dispatch(MessagePublication publication, Object message, Iterable listeners){ if (passesFilter(message)) { getDelegate().dispatch(publication, message, listeners); } diff --git a/src/main/java/net/engio/mbassy/dispatch/HandlerInvocation.java b/src/main/java/net/engio/mbassy/dispatch/HandlerInvocation.java index 118998ab..2cd36c76 100644 --- a/src/main/java/net/engio/mbassy/dispatch/HandlerInvocation.java +++ b/src/main/java/net/engio/mbassy/dispatch/HandlerInvocation.java @@ -1,5 +1,6 @@ package net.engio.mbassy.dispatch; +import net.engio.mbassy.bus.MessagePublication; import net.engio.mbassy.bus.error.IPublicationErrorHandler; import net.engio.mbassy.bus.error.PublicationError; import net.engio.mbassy.subscription.AbstractSubscriptionContextAware; @@ -20,7 +21,8 @@ public HandlerInvocation(SubscriptionContext context) { super(context); } - protected final void handlePublicationError(PublicationError error){ + protected final void handlePublicationError(MessagePublication publication, PublicationError error){ + publication.markError(error); getContext().handleError(error); } } diff --git a/src/main/java/net/engio/mbassy/dispatch/IHandlerInvocation.java b/src/main/java/net/engio/mbassy/dispatch/IHandlerInvocation.java index 58468bcd..40ec4c2a 100644 --- a/src/main/java/net/engio/mbassy/dispatch/IHandlerInvocation.java +++ b/src/main/java/net/engio/mbassy/dispatch/IHandlerInvocation.java @@ -1,5 +1,6 @@ package net.engio.mbassy.dispatch; +import net.engio.mbassy.bus.MessagePublication; import net.engio.mbassy.subscription.ISubscriptionContextAware; /** @@ -25,5 +26,5 @@ public interface IHandlerInvocation extends ISubscriptionConte * @param message The message to be delivered to the handler. This can be any object compatible with the object * type that the handler consumes */ - void invoke(HANDLER handler, MESSAGE message); + void invoke(HANDLER handler, MESSAGE message, MessagePublication publication); } diff --git a/src/main/java/net/engio/mbassy/dispatch/IMessageDispatcher.java b/src/main/java/net/engio/mbassy/dispatch/IMessageDispatcher.java index 23fcafe7..deacb61b 100644 --- a/src/main/java/net/engio/mbassy/dispatch/IMessageDispatcher.java +++ b/src/main/java/net/engio/mbassy/dispatch/IMessageDispatcher.java @@ -1,6 +1,7 @@ package net.engio.mbassy.dispatch; import net.engio.mbassy.bus.IMessagePublication; +import net.engio.mbassy.bus.MessagePublication; import net.engio.mbassy.subscription.ISubscriptionContextAware; /** @@ -29,7 +30,7 @@ public interface IMessageDispatcher extends ISubscriptionContextAware { * @param message The message that should be delivered to the listeners * @param listeners The listeners that should receive the message */ - void dispatch(IMessagePublication publication, Object message, Iterable listeners); + void dispatch(MessagePublication publication, Object message, Iterable listeners); /** * Get the handler invocation that will be used to deliver the diff --git a/src/main/java/net/engio/mbassy/dispatch/MessageDispatcher.java b/src/main/java/net/engio/mbassy/dispatch/MessageDispatcher.java index 2aef9e96..d32a87ca 100644 --- a/src/main/java/net/engio/mbassy/dispatch/MessageDispatcher.java +++ b/src/main/java/net/engio/mbassy/dispatch/MessageDispatcher.java @@ -1,6 +1,6 @@ package net.engio.mbassy.dispatch; -import net.engio.mbassy.bus.IMessagePublication; +import net.engio.mbassy.bus.MessagePublication; import net.engio.mbassy.subscription.AbstractSubscriptionContextAware; import net.engio.mbassy.subscription.SubscriptionContext; @@ -24,10 +24,10 @@ public MessageDispatcher(SubscriptionContext context, IHandlerInvocation invocat } @Override - public void dispatch(final IMessagePublication publication, final Object message, final Iterable listeners){ - publication.markDelivered(); + public void dispatch(final MessagePublication publication, final Object message, final Iterable listeners){ + publication.markDispatched(); for (Object listener : listeners) { - getInvocation().invoke(listener, message); + getInvocation().invoke(listener, message, publication); } } diff --git a/src/main/java/net/engio/mbassy/dispatch/ReflectiveHandlerInvocation.java b/src/main/java/net/engio/mbassy/dispatch/ReflectiveHandlerInvocation.java index b4430a26..5ea96ddc 100644 --- a/src/main/java/net/engio/mbassy/dispatch/ReflectiveHandlerInvocation.java +++ b/src/main/java/net/engio/mbassy/dispatch/ReflectiveHandlerInvocation.java @@ -1,5 +1,6 @@ package net.engio.mbassy.dispatch; +import net.engio.mbassy.bus.MessagePublication; import net.engio.mbassy.bus.error.PublicationError; import net.engio.mbassy.subscription.SubscriptionContext; @@ -18,34 +19,31 @@ public ReflectiveHandlerInvocation(SubscriptionContext context) { super(context); } - protected void invokeHandler(final Object message, final Object listener, Method handler){ + /** + * {@inheritDoc} + */ + @Override + public void invoke(final Object listener, final Object message, MessagePublication publication){ + final Method handler = getContext().getHandler().getMethod(); try { handler.invoke(listener, message); } catch (IllegalAccessException e) { - handlePublicationError(new PublicationError(e, "Error during invocation of message handler. " + - "The class or method is not accessible", - handler, listener, message)); + handlePublicationError(publication, new PublicationError(e, "Error during invocation of message handler. " + + "The class or method is not accessible", + handler, listener, publication)); } catch (IllegalArgumentException e) { - handlePublicationError(new PublicationError(e, "Error during invocation of message handler. " + - "Wrong arguments passed to method. Was: " + message.getClass() - + "Expected: " + handler.getParameterTypes()[0], - handler, listener, message)); + handlePublicationError(publication, new PublicationError(e, "Error during invocation of message handler. " + + "Wrong arguments passed to method. Was: " + message.getClass() + + "Expected: " + handler.getParameterTypes()[0], + handler, listener, publication)); } catch (InvocationTargetException e) { - handlePublicationError( new PublicationError(e, "Error during invocation of message handler. " + - "Message handler threw exception", - handler, listener, message)); + handlePublicationError(publication, new PublicationError(e, "Error during invocation of message handler. " + + "There might be an access rights problem. Do you use non public inner classes?", + handler, listener, publication)); } catch (Throwable e) { - handlePublicationError( new PublicationError(e, "Error during invocation of message handler. " + - "The handler code threw an exception", - handler, listener, message)); + handlePublicationError(publication, new PublicationError(e, "Error during invocation of message handler. " + + "The handler code threw an exception", + handler, listener, publication)); } } - - /** - * {@inheritDoc} - */ - @Override - public void invoke(final Object listener, final Object message){ - invokeHandler(message, listener, getContext().getHandler().getMethod()); - } } diff --git a/src/main/java/net/engio/mbassy/dispatch/SynchronizedHandlerInvocation.java b/src/main/java/net/engio/mbassy/dispatch/SynchronizedHandlerInvocation.java index 9213f911..67e78546 100644 --- a/src/main/java/net/engio/mbassy/dispatch/SynchronizedHandlerInvocation.java +++ b/src/main/java/net/engio/mbassy/dispatch/SynchronizedHandlerInvocation.java @@ -1,5 +1,6 @@ package net.engio.mbassy.dispatch; +import net.engio.mbassy.bus.MessagePublication; import net.engio.mbassy.subscription.AbstractSubscriptionContextAware; /** @@ -21,9 +22,9 @@ public SynchronizedHandlerInvocation(IHandlerInvocation delegate) { * {@inheritDoc} */ @Override - public void invoke(final Object listener, final Object message){ + public void invoke(final Object listener, final Object message, MessagePublication publication){ synchronized (listener){ - delegate.invoke(listener, message); + delegate.invoke(listener, message, publication); } } diff --git a/src/main/java/net/engio/mbassy/subscription/Subscription.java b/src/main/java/net/engio/mbassy/subscription/Subscription.java index c5487f38..b1e34e7b 100644 --- a/src/main/java/net/engio/mbassy/subscription/Subscription.java +++ b/src/main/java/net/engio/mbassy/subscription/Subscription.java @@ -1,6 +1,7 @@ package net.engio.mbassy.subscription; import net.engio.mbassy.bus.IMessagePublication; +import net.engio.mbassy.bus.MessagePublication; import net.engio.mbassy.dispatch.IMessageDispatcher; import java.util.ArrayList; @@ -66,7 +67,7 @@ public Class[] getHandledMessageTypes(){ } - public void publish(IMessagePublication publication, Object message){ + public void publish(MessagePublication publication, Object message){ if(!listeners.isEmpty()) dispatcher.dispatch(publication, message, listeners); } diff --git a/src/test/java/net/engio/mbassy/ConcurrentSetTest.java b/src/test/java/net/engio/mbassy/ConcurrentSetTest.java index cfffeb1e..97514975 100644 --- a/src/test/java/net/engio/mbassy/ConcurrentSetTest.java +++ b/src/test/java/net/engio/mbassy/ConcurrentSetTest.java @@ -7,6 +7,7 @@ import org.junit.Test; import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicInteger; @@ -27,6 +28,7 @@ public abstract class ConcurrentSetTest extends AssertSupport { protected final int numberOfElements = 100000; protected final int numberOfThreads = 50; + // needed to avoid premature garbage collection for weakly referenced listeners protected Set gcProtector = new HashSet(); @Before @@ -38,6 +40,59 @@ public void beforeTest(){ protected abstract Collection createSet(); + @Test + public void testAddAll() { + final Collection testSet = createSet(); + final List notFound = new CopyOnWriteArrayList(); + // insert all elements (containing duplicates) into the set + final Random rand = new Random(); + ConcurrentExecutor.runConcurrent(new Runnable() { + @Override + public void run() { + final List source = new LinkedList(); + for (int i = 0; i < numberOfElements; i++) { + source.add(rand.nextDouble()); + } + testSet.addAll(source); + for (Number src : source) { + if(!testSet.contains(src)){ + notFound.add(src); + } + } + } + }, numberOfThreads); + + // check that the control set and the test set contain the exact same elements + assertEquals(notFound.size(), 0); + } + + @Test + public void testAdd() { + final Collection testSet = createSet(); + final List notFound = new CopyOnWriteArrayList(); + final Random rand = new Random(); + // insert all elements (containing duplicates) into the set + ConcurrentExecutor.runConcurrent(new Runnable() { + @Override + public void run() { + final List source = new LinkedList(); + for (int i = 0; i < numberOfElements; i++) { + source.add(rand.nextDouble()); + } + for (Number src : source) { + testSet.add(src); + if(!testSet.contains(src)){ + notFound.add(src); + } + } + } + }, numberOfThreads); + + // check that the control set and the test set contain the exact same elements + assertEquals(notFound.size(), 0); + } + + @Test public void testUniqueness() { final LinkedList duplicates = new LinkedList(); @@ -158,7 +213,7 @@ public void run() { // ensure that the test set does not contain any of the elements that have been removed from it for (Object tar : testSet) { - Assert.assertTrue(!toRemove.contains(tar)); + assertTrue(!toRemove.contains(tar)); } // ensure that the test set still contains all objects from the source set that have not been marked // for removal @@ -352,9 +407,4 @@ public Set createWithRandomIntegers(int size, List excluding){ result.remove(excluded); return result; } - - protected void protectFromGarbageCollector(Set elements){ - for(Object element : elements) - gcProtector.add(element); - } } diff --git a/src/test/java/net/engio/mbassy/MethodDispatchTest.java b/src/test/java/net/engio/mbassy/MethodDispatchTest.java index 6de45a91..3a5d9656 100644 --- a/src/test/java/net/engio/mbassy/MethodDispatchTest.java +++ b/src/test/java/net/engio/mbassy/MethodDispatchTest.java @@ -1,5 +1,6 @@ package net.engio.mbassy; +import net.engio.mbassy.bus.IMessagePublication; import net.engio.mbassy.bus.common.IMessageBus; import net.engio.mbassy.bus.config.Feature; import net.engio.mbassy.common.MessageBusTest; @@ -15,17 +16,17 @@ * @author bennidi * Date: 1/17/13 */ -public class MethodDispatchTest extends MessageBusTest{ +public class MethodDispatchTest extends MessageBusTest { - private boolean listener1Called = false; - private boolean listener2Called = false; + private boolean listener1Called = false; + private boolean listener2Called = false; // a simple event listener public class EventListener1 { @Handler public void handleString(String s) { - listener1Called = true; + listener1Called = true; } } @@ -35,44 +36,48 @@ public class EventListener2 extends EventListener1 { // redefine handler implementation (not configuration) public void handleString(String s) { - listener2Called = true; + listener2Called = true; } } @Test - public void testDispatch1(){ + public void testDispatchWithSequentialSubscription() { IMessageBus bus = createBus(SyncAsync()); EventListener2 listener2 = new EventListener2(); bus.subscribe(listener2); - bus.post("jfndf").now(); + bus.post("only one listener registered").now(); assertTrue(listener2Called); assertFalse(listener1Called); EventListener1 listener1 = new EventListener1(); bus.subscribe(listener1); - bus.post("jfndf").now(); + bus.post("second listener registered and called").now(); assertTrue(listener1Called); } @Test - public void testAsyncDispatchAfterExceptionInErrorHandler() throws InterruptedException - { - IMessageBus bus = createBus(SyncAsync(true /*configures an error handler that throws exception*/).addFeature(Feature.AsynchronousMessageDispatch.Default().setNumberOfMessageDispatchers(1))); - final AtomicInteger msgHandlerCounter=new AtomicInteger(0); - bus.subscribe(new Object() - { + public void testAsyncDispatchAfterExceptionInErrorHandler() throws InterruptedException { + IMessageBus bus = createBus(SyncAsync(false /*configures an error handler that throws exception*/).addFeature(Feature.AsynchronousMessageDispatch.Default().setNumberOfMessageDispatchers(1))); + final AtomicInteger msgHandlerCounter = new AtomicInteger(0); + bus.subscribe(new Object() { @Handler - public void handleAndThrowException(String s) throws Exception - { + public void handleAndThrowException(String s) throws Exception { msgHandlerCounter.incrementAndGet(); throw new Exception("error in msg handler on call no. " + msgHandlerCounter.get()); } }); - bus.post("first event - event handler will raise exception followed by another exception in the error handler").asynchronously(); - bus.post("second event - expecting that msg dispatcher will still dispatch this after encountering exception in error handler").asynchronously(); + IMessagePublication first = bus.post("first event - event handler will raise exception followed by another exception in the error handler").asynchronously(); + IMessagePublication second = bus.post("second event - expecting that msg dispatcher will still dispatch this after encountering exception in error handler").asynchronously(); pause(200); - Assert.assertEquals("msg handler is called also on the 2nd event after an exception in error handler following 1st event error", 2, msgHandlerCounter.get()); - Assert.assertFalse("no more messages left to process", bus.hasPendingMessages()); + assertEquals("msg handler is called also on the 2nd event after an exception in error handler following 1st event error", 2, msgHandlerCounter.get()); + assertFalse("no more messages left to process", bus.hasPendingMessages()); + assertTrue(first.hasError()); + assertTrue(first.isFinished()); + assertNotNull(first.getError().toString()); + assertEquals(first.getError().getPublishedMessage(),"first event - event handler will raise exception followed by another exception in the error handler" ); + assertTrue(second.hasError()); + assertTrue(second.isFinished()); + assertNotNull(second.getError().toString()); } } diff --git a/src/test/java/net/engio/mbassy/common/AssertSupport.java b/src/test/java/net/engio/mbassy/common/AssertSupport.java index f02560ef..f0e26881 100644 --- a/src/test/java/net/engio/mbassy/common/AssertSupport.java +++ b/src/test/java/net/engio/mbassy/common/AssertSupport.java @@ -92,6 +92,10 @@ public void assertFalse(String message, Boolean condition) { Assert.assertFalse(message, condition); } + public void assertEquals(String message, Object expected, Object actual) { + Assert.assertEquals(message, expected, actual); + } + public void assertEquals(Object expected, Object actual) { Assert.assertEquals(expected, actual); } diff --git a/src/test/java/net/engio/mbassy/listeners/CustomInvocationListener.java b/src/test/java/net/engio/mbassy/listeners/CustomInvocationListener.java index 72792f0d..b85abfb9 100644 --- a/src/test/java/net/engio/mbassy/listeners/CustomInvocationListener.java +++ b/src/test/java/net/engio/mbassy/listeners/CustomInvocationListener.java @@ -1,5 +1,6 @@ package net.engio.mbassy.listeners; +import net.engio.mbassy.bus.MessagePublication; import net.engio.mbassy.dispatch.HandlerInvocation; import net.engio.mbassy.listener.Handler; import net.engio.mbassy.listener.Listener; @@ -27,7 +28,7 @@ public HandleSubTestEventInvocation(SubscriptionContext context) { } @Override - public void invoke(CustomInvocationListener listener, StandardMessage message) { + public void invoke(CustomInvocationListener listener, StandardMessage message, MessagePublication publication) { listener.handle(message); } }