Skip to content

Commit

Permalink
Version 1.3.0 fixes PR 127
Browse files Browse the repository at this point in the history
  • Loading branch information
bennidi committed Oct 2, 2016
1 parent 96986c3 commit 744c029
Show file tree
Hide file tree
Showing 29 changed files with 271 additions and 165 deletions.
82 changes: 45 additions & 37 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
MBassador
=========

MBassador is a light-weight, high-performance message (event) bus implementation based on the [publish subscribe pattern](https://en.wikipedia.org/wiki/Publish-subscribe_pattern). It is designed for ease of use and aims to be feature rich and extensible while preserving resource efficiency and performance.
MBassador is a light-weight, high-performance event bus implementing the [publish subscribe pattern](https://en.wikipedia.org/wiki/Publish-subscribe_pattern). It is designed for ease of use and aims to be feature rich and extensible while preserving resource efficiency and performance.

The core of MBassador's high performance is a specialized data structure that minimizes lock contention such that performance degradation of concurrent read/write access is minimal. The advantages of this design are illustrated in the [eventbus-performance](https://github.com/bennidi/eventbus-performance) github repository.
The core of MBassador's high performance is a specialized data structure that provides non-blocking readers and minimizes lock contention for writers such that performance degradation of concurrent read/write access is minimal. The advantages of this design are illustrated in this [github repository](https://github.com/bennidi/eventbus-performance).

Read this introduction to get an overview of MBassadors features. There is also some documentation in the Wiki - although admittedly not enough to make a developer happy. Additionally, you can browse the [javadoc](http://bennidi.github.io/mbassador/)
The code is production ready: 86% instruction coverage, 82% branch coverage with highly randomized and concurrently run test sets, no severe bugs have been reported in the last 18 month. No modifications to the core will be made without thouroughly testing the code.

There is a [spring-extension](https://github.com/bennidi/mbassador-spring) available to support CDI-like transactional message sending in a Spring environment. This is a good example of integration with other frameworks.
For documentation you can browse the [javadoc](http://bennidi.github.io/mbassador/), read this overview, check out the wiki resources.

> ################ NOTE ####################
> [15.10.2015] My spare time programming efforts have shifted to another open source project - [openmediaid](http://www.openmediaid.org). I will not be able to actively push development of this library anymore. Any developers interested in becoming co-maintainers of this library... you are very welcome!
There is a [spring-extension](https://github.com/bennidi/mbassador-spring) available to support CDI-like transactional message sending in a Spring environment. This is a good example of integration with other frameworks. An example of [Guice integration](https://github.com/bennidi/mbassador/wiki/Guice-Integration) also exists.

Table of contents:
* [Usage](#usage)
Expand All @@ -30,24 +28,27 @@ Using MBassador in your project is very easy. Create as many instances of MBassa

As a first reference, consider this illustrative example. You might want to have a look at the collection of [examples](./examples) to see its features on more detail.

// Define your listener
class SimpleFileListener{
@Handler
public void handle(File msg){
// do something with the file
}
}

// somewhere else in your code

MBassador bus = new MBassador();
Object listener = new SimpleFileListener();
bus.subscribe (listener);
bus.post(new File("/tmp/smallfile.csv")).now();
bus.post(new File("/tmp/bigfile.csv")).asynchronously();

```java

// Define your listener
class SimpleFileListener{

@Handler
public void handle(File msg){
// do something with the file
}

}

// somewhere else in your code

MBassador bus = new MBassador();
Object listener = new SimpleFileListener();
bus.subscribe (listener);
bus.post(new File("/tmp/smallfile.csv")).now();
bus.post(new File("/tmp/bigfile.csv")).asynchronously();

```

##Features

Expand All @@ -56,24 +57,29 @@ As a first reference, consider this illustrative example. You might want to have
|Annotation|Function|
|:-----|:-----|
|`@Handler`|Defines and customizes a message handler. Any well-formed method annotated with `@Handler` will cause instances of the defining class to be treated as message listeners|
|`@Handler`|Mark a method as message handler|
|`@Listener`|Can be used to customize listener wide configuration like the used reference type|
|`@Enveloped`|A message envelope can be used to pass messages of different types into a single handler|
|`@Filter`|Add filtering to prevent certain messages from being published|

> Delivers everything
> Delivers everything, respects type hierarchy
Messages do not need to implement any interface and can be of any type. The class hierarchy of a message is considered during message delivery, such that handlers will also receive subtypes of the message type they consume for - e.g. a handler of Object.class receives everything. Messages that do not match any handler result in the publication of a `DeadMessage` object which wraps the original message. DeadMessage events can be handled by registering listeners that handle DeadMessage.

> Synchronous and asynchronous message delivery
A handler can be invoked to handle a message either synchronously or asynchronously. This is configurable for each handler via annotations. Message publication itself supports synchronous (method blocks until messages are delivered to all handlers) or asynchronous (fire and forget) dispatch
There are two types of (a-)synchronicity when using MBassador: message dispatch and handler invocation.
For message dispatch _synchronous_ means that the publishing method blocks until messages are delivered to all handlers and _asynchronous_ means that the publish method returns immediately and the message will be dispatched in another thread (fire and forget).

For handler invocation synchronous means that within a running publication all handlers are called sequentially. _Asynchronous_ means that the handler invocation is pushed into a queue and the next handler is invoked with waiting for the previous to finish.

> Configurable reference types
By default, MBassador uses weak references for listeners to relieve the programmer of the need to explicitly unsubscribe listeners that are not used anymore and avoid memory-leaks. This is very comfortable in container managed environments where listeners are created and destroyed by frameworks, i.e. Spring, Guice etc. Just stuff everything into the message bus, it will ignore objects without message handlers and automatically clean-up orphaned weak references after the garbage collector has done its job. Instead of using weak references, a listener can be configured to be referenced using strong references using `@Listener(references=References.Strong)`. Strongly referenced listeners will stick around until explicitly unsubscribed.
By default, MBassador uses **weak references** for listeners to relieve the programmer of the need to explicitly unsubscribe listeners that are not used anymore and **avoid memory-leaks**. This is very comfortable in container managed environments where listeners are created and destroyed by frameworks, i.e. Spring, Guice etc. Just add everything to the bus, it will ignore objects without handlers and automatically clean-up orphaned weak references after the garbage collector has done its job.

Instead of using weak references, a listener can be configured to be referenced using strong references using `@Listener(references=References.Strong)`. Strongly referenced listeners will stick around until explicitly unsubscribed.

> Filtering
> Message filtering
MBassador offers static message filtering. Filters are configured using annotations and multiple filters can be attached to a single message handler. Since version 1.2.0 Java EL expressions in `@Handler` are another way to define conditional message dispatch. Messages that have matching handlers but do not pass the configured filters result in the publication of a FilteredMessage object which wraps the original message. FilteredMessage events can be handled by registering listeners that handle FilteredMessage.

Expand All @@ -96,11 +102,13 @@ MBassador is designed to be extensible with custom implementations of various co
<h2>Installation</h2>
MBassador is available from the Maven Central Repository using the following coordinates:
```xml
<dependency>
<groupId>net.engio</groupId>
<artifactId>mbassador</artifactId>
<version>{see.git.tags.for.latest.version}</version>
</dependency>

<dependency>
<groupId>net.engio</groupId>
<artifactId>mbassador</artifactId>
<version>{see.git.tags.for.latest.version}</version>
</dependency>

```

You can also download binary release and javadoc from the [maven central repository](http://search.maven.org/#search|ga|1|mbassador). Of course you can always clone the repository and build from source.
Expand All @@ -119,7 +127,7 @@ I liked the simplicity of its design and I trust in the code quality of google l

I want to thank the development team from [friendsurance](http://www.friendsurance.de) for their support and feedback on the bus implementation and the management for allowing me to publish the component as an open source project.

I also want to thank all githubbers who have made [contributions](https://github.com/bennidi/mbassador/pulls?q=is%3Apr+is%3Aclosed). It was always a pleasure to see how users got engaged into the libraries development and support. Special thanks go to
I also want to thank all githubbers who have made [contributions](https://github.com/bennidi/mbassador/pulls?q=is%3Apr+is%3Aclosed). Special thanks go to
+ [arne-vandamme](http://github.com/arne-vandamme) for adding support for [meta-annotations](https://github.com/bennidi/mbassador/pull/74)
+ [Bernd Rosstauscher](http://github.com/Rossi1337) for providing an initial integration with JUEL
+ [David Sowerby](http://github.com/davidsowerby) for answering user questions, his tutorial on [guice integration](bennidi/mbassador/wiki/guice-integration) and his various PRs
Expand All @@ -129,7 +137,7 @@ I also want to thank all githubbers who have made [contributions](https://github
Many thanks also to ej-technologies for providing me with an open source license of
[![JProfiler](http://www.ej-technologies.com/images/banners/jprofiler_small.png)](http://www.ej-technologies.com/products/jprofiler/overview.html) and Jetbrains for a license of [IntelliJ IDEA](http://www.jetbrains.com/idea/)

Mbassador uses the following open source projects:
MBassador uses the following open source projects:

* [jUnit](http://www.junit.org)
* [maven](http://www.maven.org)
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apply plugin: 'java'
apply plugin: 'war' // needed for providedCompile

group="org.mbassy"
version="1.2.4-SNAPSHOT"
version="1.2.5-SNAPSHOT"



Expand Down
10 changes: 10 additions & 0 deletions changelog/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
### 1.3.0
+ Non-Breaking API changes
+ Extended IMessagePublication to allow for error reporting using `hasError()` and `getError()`
+ Any publication method now returns an IMessagePublication object. This resolves [PR-127](../pull/127). Any dispatched
message publication can now be inspected for execution error. Does not support collection of multiple errors due to implied
GC and memory allocation overhead in high-throughput scenarios.
+ Breaking API changes
+ Added MessagePublication to IHandlerInvocation.invoke(...)
+ Added MessagePublication to IMessageDispatcher.dispatch(...)

### 1.2.4.2
+ Updated pom. Now using nexus-staging plugin
+ Removed pmd
Expand Down
11 changes: 8 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>net.engio</groupId>
<artifactId>mbassador</artifactId>
<version>1.2.5-SNAPSHOT</version>
<version>1.3.0</version>
<packaging>bundle</packaging>
<name>mbassador</name>
<description>
Expand All @@ -24,8 +24,8 @@
Documentation for this pom see
http://central.sonatype.org/pages/apache-maven.html
mvn clean deploy
mvn nexus-staging:release
For deployment (and release of non SNAPSHOT): mvn clean deploy
For manual release of staging: mvn nexus-staging:release
git push origin <tag>
-->
Expand Down Expand Up @@ -323,6 +323,11 @@
Sets the name of the property containing the settings
for JaCoCo runtime agent.
-->
<!--
<excludes>
<exclude>net.engio.mbassy.bus.error.*</exclude>
</excludes>
-->
<propertyName>surefireArgLine</propertyName>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public BusRuntime getRuntime() {
return runtime;
}

protected IMessagePublication createMessagePublication(T message) {
protected MessagePublication createMessagePublication(T message) {
Collection<Subscription> subscriptions = getSubscriptionsByMessageType(message.getClass());
if ((subscriptions == null || subscriptions.isEmpty()) && !message.getClass()
.equals(DeadMessage.class)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void run() {


// this method queues a message delivery request
protected IMessagePublication addAsynchronousPublication(IMessagePublication publication) {
protected IMessagePublication addAsynchronousPublication(MessagePublication publication) {
try {
pendingMessages.put(publication);
return publication.markScheduled();
Expand All @@ -93,7 +93,7 @@ protected IMessagePublication addAsynchronousPublication(IMessagePublication pub
}

// this method queues a message delivery request
protected IMessagePublication addAsynchronousPublication(IMessagePublication publication, long timeout, TimeUnit unit) {
protected IMessagePublication addAsynchronousPublication(MessagePublication publication, long timeout, TimeUnit unit) {
try {
return pendingMessages.offer(publication, timeout, unit)
? publication.markScheduled()
Expand Down
13 changes: 3 additions & 10 deletions src/main/java/net/engio/mbassy/bus/IMessagePublication.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package net.engio.mbassy.bus;

import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.subscription.Subscription;

/**
Expand All @@ -15,11 +16,6 @@
*/
public interface IMessagePublication {

boolean add(Subscription subscription); // TODO: this method should not be part of the interface

/*
TODO: document state transitions
*/
void execute();

boolean isFinished();
Expand All @@ -28,17 +24,14 @@ public interface IMessagePublication {

boolean isScheduled();

void markDelivered(); // TODO: this method should not be part of the interface
boolean hasError();

IMessagePublication markScheduled(); // TODO: this method should not be part of the interface
PublicationError getError();

boolean isDeadMessage();

boolean isFilteredMessage();

Object getMessage();


// TODO: This interface should only be used as return type to public API calls (clients). Internally the implementation
// of the interface should be used. This would allow to remove the unwanted methods from this interface.
}
14 changes: 6 additions & 8 deletions src/main/java/net/engio/mbassy/bus/MBassador.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ public MBassador(IBusConfiguration configuration) {
super(configuration);
}





public IMessagePublication publishAsync(T message) {
return addAsynchronousPublication(createMessagePublication(message));
}
Expand All @@ -64,17 +60,19 @@ public IMessagePublication publishAsync(T message, long timeout, TimeUnit unit)
*
* @param message
*/
public void publish(T message) {
public IMessagePublication publish(T message) {
IMessagePublication publication = createMessagePublication(message);
try {
IMessagePublication publication = createMessagePublication(message);
publication.execute();
} catch (Throwable e) {
handlePublicationError(new PublicationError()
.setMessage("Error during publication of message")
.setCause(e)
.setPublishedMessage(message));
.setPublication(publication));
}
finally{
return publication;
}

}


Expand Down
28 changes: 21 additions & 7 deletions src/main/java/net/engio/mbassy/bus/MessagePublication.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import net.engio.mbassy.bus.common.DeadMessage;
import net.engio.mbassy.bus.common.FilteredMessage;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.subscription.Subscription;

import java.util.Collection;
Expand All @@ -23,8 +24,10 @@ public class MessagePublication implements IMessagePublication {
private final Object message;
// message publications can be referenced by multiple threads to query publication progress
private volatile State state = State.Initial;
private volatile boolean delivered = false;
private volatile boolean dispatched = false;
private final BusRuntime runtime;
private PublicationError error = null;


protected MessagePublication(BusRuntime runtime, Collection<Subscription> subscriptions, Object message, State initialState) {
this.runtime = runtime;
Expand All @@ -51,7 +54,7 @@ public void execute() {
// This happens if subscriptions are empty (due to GC of weak listeners or explicit desubscription)
// or if configured filters do not let a message pass. The flag is set by the dispatchers.
// META: This seems to be a suboptimal design
if (!delivered) {
if (!dispatched) {
if (!isFilteredMessage() && !isDeadMessage()) {
runtime.getProvider().publish(new FilteredMessage(message));
} else if (!isDeadMessage()) {
Expand All @@ -73,8 +76,20 @@ public boolean isScheduled() {
return state.equals(State.Scheduled);
}

public void markDelivered() {
delivered = true;
public boolean hasError() {
return this.error != null;
}

@Override
public PublicationError getError() {
return error;
}

public void markDispatched() {
dispatched = true;
}
public void markError(PublicationError error) {
this.error = error;
}

public MessagePublication markScheduled() {
Expand All @@ -84,7 +99,6 @@ public MessagePublication markScheduled() {
return this;
}


public boolean isDeadMessage() {
return DeadMessage.class.equals(message.getClass());
}
Expand All @@ -98,12 +112,12 @@ public Object getMessage() {
}

private enum State {
Initial, Scheduled, Running, Finished, Error
Initial, Scheduled, Running, Finished
}

public static class Factory {

public IMessagePublication createPublication(BusRuntime runtime, Collection<Subscription> subscriptions, Object message) {
public MessagePublication createPublication(BusRuntime runtime, Collection<Subscription> subscriptions, Object message) {
return new MessagePublication(runtime, subscriptions, message, State.Initial);
}

Expand Down
Loading

0 comments on commit 744c029

Please sign in to comment.