Skip to content

Commit

Permalink
Add better error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Maxime Rainville committed Dec 2, 2024
1 parent e33f638 commit 0c755ac
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 13 deletions.
6 changes: 4 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
"php": "^8.1",
"psr/event-dispatcher": "^1.0",
"revolt/event-loop": "^1.0",
"amphp/amp": "^3.0"
"amphp/amp": "^3.0",
"psr/log": "^1.1 || ^2.0 || ^3.0"
},
"require-dev": {
"phpunit/phpunit": "^10.0",
"phpstan/phpstan": "^1.10",
"friendsofphp/php-cs-fixer": "^3.14"
"friendsofphp/php-cs-fixer": "^3.14",
"colinodell/psr-testlogger": "^1.3"
},
"autoload": {
"psr-4": {
Expand Down
14 changes: 14 additions & 0 deletions examples/basic-usage.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,17 @@ function () use ($event) {
$event = new UserCreatedEvent('789', '[email protected]');
$future = $dispatcher->dispatch($event, new TimeoutCancellation(30));
EventLoop::run();

// Set up logging for your dispatcher - all errors will be logged to PSR logger
$logger = new ColinODell\PsrTestLogger\TestLogger();
$dispatcher = new AsyncEventDispatcher(
$listenerProvider,
$logger
);

// Let errors bubble up. Useful for unit testing and debugging.
$dispatcher = new AsyncEventDispatcher(
$listenerProvider,
$logger,
AsyncEventDispatcher::THROW_ON_ERROR
);
36 changes: 32 additions & 4 deletions src/AsyncEventDispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@
namespace ArchiPro\EventDispatcher;

use function Amp\async;
use function Amp\Future\await;

use Amp\Cancellation;
use Amp\Future;

use function Amp\Future\awaitAll;

use Amp\NullCancellation;
use Closure;
use Psr\EventDispatcher\EventDispatcherInterface;
use Psr\EventDispatcher\ListenerProviderInterface;
use Psr\EventDispatcher\StoppableEventInterface;
use Psr\Log\LoggerInterface;
use Throwable;

/**
* Asynchronous implementation of PSR-14 EventDispatcherInterface using Revolt and AMPHP.
Expand All @@ -24,11 +28,15 @@
*/
class AsyncEventDispatcher implements EventDispatcherInterface
{
const THROW_ON_ERROR = 0b0001;

/**
* @param ListenerProviderInterface $listenerProvider The provider of event listeners
*/
public function __construct(
private readonly ListenerProviderInterface $listenerProvider
private readonly ListenerProviderInterface $listenerProvider,
private readonly ?LoggerInterface $logger = null,
private readonly int $options = 0b0000
) {
}

Expand Down Expand Up @@ -85,7 +93,7 @@ private function dispatchStoppableEvent(
// that doesn't mean we want to block other listeners outside this loop.
$future = async(function () use ($event, $listener) {
$listener($event);
});
})->catch(Closure::fromCallable([$this, 'errorHandler']));

$future->await($cancellation);

Expand Down Expand Up @@ -120,14 +128,34 @@ private function dispatchNonStoppableEvent(
foreach ($listeners as $listener) {
$futures[] = async(function () use ($event, $listener) {
$listener($event);
});
})->catch(Closure::fromCallable([$this, 'errorHandler']));
}

// Wait for all listeners to complete
awaitAll($futures, $cancellation);
if ($this->options & self::THROW_ON_ERROR) {
// Let the errors bubble up
await($futures, $cancellation);
} else {
// Carry on despite errors
awaitAll($futures, $cancellation);
}

return $event;
});
}

/**
* Handler for errors thrown by listeners.
* Based on the settings provided to the constructor, this method can log the errors and/or rethrow them.
*/
protected function errorHandler(Throwable $exception): void
{
if ($this->logger) {
$this->logger->error('Error dispatching event', ['exception' => $exception]);
}

if (($this->options & self::THROW_ON_ERROR) === self::THROW_ON_ERROR) {
throw $exception;
}
}
}
47 changes: 40 additions & 7 deletions tests/AsyncEventDispatcherTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use Exception;
use PHPUnit\Framework\TestCase;
use Revolt\EventLoop;

use ColinODell\PsrTestLogger\TestLogger;
use Throwable;

/**
Expand All @@ -33,19 +33,19 @@ class AsyncEventDispatcherTest extends TestCase
{
private ListenerProvider $listenerProvider;
private AsyncEventDispatcher $dispatcher;
private TestLogger $logger;

/**
* Sets up the test environment before each test.
*/
protected function setUp(): void
{
$this->listenerProvider = new ListenerProvider();
$this->dispatcher = new AsyncEventDispatcher($this->listenerProvider);

EventLoop::setErrorHandler(function (Throwable $err) {
throw $err;
});

$this->logger = new TestLogger();
$this->dispatcher = new AsyncEventDispatcher(
$this->listenerProvider,
$this->logger
);
}

/**
Expand Down Expand Up @@ -80,6 +80,8 @@ public function testDispatchEventToMultipleListeners(): void
$this->assertCount(2, $results);
$this->assertContains('listener1: test data', $results);
$this->assertContains('listener2: test data', $results);

$this->assertCount(0, $this->logger->records, 'No errors are logged');
}

/**
Expand All @@ -103,6 +105,8 @@ public function testSynchronousStoppableEvent(): void

$this->assertCount(1, $results);
$this->assertEquals(['listener1'], $results);

$this->assertCount(0, $this->logger->records, 'No errors are logged');
}

/**
Expand All @@ -114,6 +118,7 @@ public function testNoListenersForEvent(): void
$dispatchedEvent = $this->dispatcher->dispatch($event);

$this->assertSame($event, $dispatchedEvent->await());
$this->assertCount(0, $this->logger->records, 'No errors are logged');
}

/**
Expand Down Expand Up @@ -168,6 +173,12 @@ public function testDispatchesFailureInOneListenerDoesNotAffectOthers(): void
$futureEvent->calledTwice,
'The second listener should have been called despite the failure of the first listener'
);

$this->assertCount(
2,
$this->logger->records,
'Errors are logged to the logger'
);
}

public function testCancellationOfStoppableEvent(): void
Expand All @@ -187,6 +198,8 @@ public function testCancellationOfStoppableEvent(): void
$this->expectException(CancelledException::class);

$this->dispatcher->dispatch($event, $cancellation)->await();

$this->assertCount(0, $this->logger->records, 'No errors are logged');
}

public function testCancellationOfNonStoppableEvent(): void
Expand All @@ -206,6 +219,26 @@ public function testCancellationOfNonStoppableEvent(): void
$this->expectException(CancelledException::class);

$this->dispatcher->dispatch($event, $cancellation)->await();

$this->assertCount(0, $this->logger->records, 'No errors are logged');
}

public function testThrowsErrors(): void
{
$this->dispatcher = new AsyncEventDispatcher(
$this->listenerProvider,
$this->logger,
AsyncEventDispatcher::THROW_ON_ERROR
);

$event = new class () {};
$this->listenerProvider->addListener(get_class($event), function ($event) {
throw new Exception('This exception will bubble up because we set the THROW_ON_ERROR option');
});

$this->expectException(Exception::class);
$this->expectExceptionMessage('This exception will bubble up because we set the THROW_ON_ERROR option');

$this->dispatcher->dispatch($event)->await();
}
}

0 comments on commit 0c755ac

Please sign in to comment.