diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..ae35877 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,26 @@ +; This file is for unifying the coding style for different editors and IDEs. +; More information at http://editorconfig.org + +root = true + +[*] +indent_style = space +indent_size = 4 +end_of_line = lf +insert_final_newline = true +trim_trailing_whitespace = true + +[*.bat] +end_of_line = crlf + +[*.yml] +indent_size = 2 + +[*.xml] +indent_size = 2 + +[Makefile] +indent_style = tab + +[*.neon] +indent_style = tab \ No newline at end of file diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..46cfe56 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,112 @@ +name: CI + +on: + push: + + pull_request: + branches: + - '*' + +jobs: + testsuite: + runs-on: ubuntu-22.04 + strategy: + fail-fast: false + matrix: + php-version: ['8.1', '8.2', '8.3'] + db-type: [sqlite, mysql, pgsql] + prefer-lowest: [''] + + steps: + - name: Setup MySQL latest + if: matrix.db-type == 'mysql' + run: docker run --rm --name=mysqld -e MYSQL_ROOT_PASSWORD=root -e MYSQL_DATABASE=cakephp -p 3306:3306 -d mysql --default-authentication-plugin=mysql_native_password --disable-log-bin + + - name: Setup PostgreSQL latest + if: matrix.db-type == 'pgsql' + run: docker run --rm --name=postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=cakephp -p 5432:5432 -d postgres + + - uses: actions/checkout@v4 + + - name: Setup PHP + uses: shivammathur/setup-php@v2 + with: + php-version: ${{ matrix.php-version }} + extensions: mbstring, intl, apcu, sqlite, pdo_sqlite, pdo_${{ matrix.db-type }}, ${{ matrix.db-type }} + ini-values: apc.enable_cli = 1 + coverage: pcov + + - name: Get composer cache directory + id: composer-cache + run: echo "dir=$(composer config cache-files-dir)" >> $GITHUB_OUTPUT + + - name: Get date part for cache key + id: key-date + run: echo "date=$(date +'%Y-%m')" >> $GITHUB_OUTPUT + + - name: Cache composer dependencies + uses: actions/cache@v4 + with: + path: ${{ steps.composer-cache.outputs.dir }} + key: ${{ runner.os }}-composer-${{ steps.key-date.outputs.date }}-${{ hashFiles('composer.json') }}-${{ matrix.prefer-lowest }} + + - name: composer install + run: | + if ${{ matrix.prefer-lowest == 'prefer-lowest' }}; then + composer update --prefer-lowest --prefer-stable + else + composer update + fi + + - name: Setup problem matchers for PHPUnit + if: matrix.php-version == '8.1' && matrix.db-type == 'mysql' + run: echo "::add-matcher::${{ runner.tool_cache }}/phpunit.json" + + - name: Run PHPUnit + run: | + if [[ ${{ matrix.db-type }} == 'sqlite' ]]; then export DB_URL='sqlite:///:memory:'; fi + if [[ ${{ matrix.db-type }} == 'mysql' ]]; then export DB_URL='mysql://root:root@127.0.0.1/cakephp?encoding=utf8'; fi + if [[ ${{ matrix.db-type }} == 'pgsql' ]]; then export DB_URL='postgres://postgres:postgres@127.0.0.1/postgres'; fi + vendor/bin/phpunit + + cs-stan: + name: Coding Standard & Static Analysis + runs-on: ubuntu-22.04 + + steps: + - uses: actions/checkout@v4 + + - name: Setup PHP + uses: shivammathur/setup-php@v2 + with: + php-version: '8.1' + extensions: mbstring, intl, apcu + coverage: none + + - name: Get composer cache directory + id: composer-cache + run: echo "dir=$(composer config cache-files-dir)" >> $GITHUB_OUTPUT + + - name: Get date part for cache key + id: key-date + run: echo "date=$(date +'%Y-%m')" >> $GITHUB_OUTPUT + + - name: Cache composer dependencies + uses: actions/cache@v4 + with: + path: ${{ steps.composer-cache.outputs.dir }} + key: ${{ runner.os }}-composer-${{ steps.key-date.outputs.date }}-${{ hashFiles('composer.json') }}-${{ matrix.prefer-lowest }} + + - name: composer install + run: composer stan-setup + + - name: Run PHP CodeSniffer + run: composer cs-check + + - name: Run psalm + if: success() || failure() + run: vendor/bin/psalm.phar --output-format=github + + - name: Run phpstan + if: success() || failure() + run: composer stan diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..c7536ff --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,18 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [2.0] - 2024-04-17 + +### Added + +- First release for CakePHP 5.0 + +## [1.0] - 2024-04-17 + +### Added + +- First release for CakePHP 4.4 diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..7e20154 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,6 @@ +Contributing +============ + +This repository follows the [CakeDC Plugin Standard](https://www.cakedc.com/plugin-standard). If you'd like to +contribute new features, enhancements or bug fixes to the plugin, please read our +[Contribution Guidelines](https://www.cakedc.com/contribution-guidelines) for detailed instructions. diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..a64ec0e --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 Cake Development Corporation + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..f4f70c9 --- /dev/null +++ b/README.md @@ -0,0 +1,115 @@ +# CakeDC Queue Monitor Plugin for CakePHP + +## Versions and branches +| CakePHP | CakeDC Queue Monitor Plugin | Tag | Notes | +|:-------:|:--------------------------------------------------------------------------:|:------------:|:-------| +| ^5.0 | [2.0.0](https://github.com/CakeDC/cakephp-queue-monitor/tree/2.next-cake5) | 2.next-cake5 | stable | +| ^4.4 | [1.0.0](https://github.com/CakeDC/cakephp-queue-monitor/tree/1.next-cake4) | 1.next-cake4 | stable | + +## Overview + +The CakeDC Queue Monitor Plugin adds the ability to monitor jobs in queues that are handled by the +[CakePHP Queue Plugin](https://github.com/cakephp/queue). This plugin checks the duration of work of +individual Jobs and sends a notification when this time is exceeded by a configurable value. + +## Requirements +* CakePHP 4.4 +* PHP 8.1+ + +## Installation + +You can install this plugin into your CakePHP application using [composer](https://getcomposer.org). + +The recommended way to install composer package is: +``` +composer require cakedc/queue-monitor +``` + +## Configuration + +Add QueueMonitorPlugin to your `Application::bootstrap`: +```php +use Cake\Http\BaseApplication; +use CakeDC\QueueMonitor\QueueMonitorPlugin; + +class Application extends BaseApplication +{ + // ... + + public function bootstrap(): void + { + parent::bootstrap(); + + $this->addPlugin(QueueMonitorPlugin::class); + } + + // ... +} + +``` + +Set up the QueueMonitor configuration in your `config/app_local.php`: +```php +// ... + 'QueueMonitor' => [ + // mailer config, the default is `default` mailer, you can ommit + // this setting if you use default value + 'mailerConfig' => 'myCustomMailer', + + // the default is 30 minutes, you can ommit this setting if you + // use the default value + 'longJobInMinutes' => 45, + + // the default is 30 days, you can ommit this setting if you + // its advised to set this value correctly after queue usage analysis to avoid + // high space usage in db + 'purgeLogsOlderThanDays' => 10, + + // comma separated list of recipients of notification about long running queue jobs + 'notificationRecipients' => 'recipient1@yourdomain.com,recipient2@yourdomain.com,recipient3@yourdomain.com', + ], +// ... +``` + +Run the required migrations +```shell +bin/cake migrations migrate -p CakeDC/QueueMonitor +``` + +For each queue configuration add `listener` setting +```php +// ... + 'Queue' => [ + 'default' => [ + // ... + 'listener' => \CakeDC\QueueMonitor\Listener\QueueMonitorListener::class, + // ... + ] + ], +// ... +``` + +## Notification command + +To set up notifications when there are long running or possible stuck jobs please use command +```shell +bin/cake queue_monitor notify +``` + +This command will send notification emails to recipients specified in `QueueMonitor.notificationRecipients`. Best is +to use it as a cronjob + +## Purge command + +The logs table may grow overtime, to keep it slim you can use the purge command: +```shell +bin/cake queue_monitor purge +``` + +This command will purge logs older than value specified in `QueueMonitor.purgeLogsOlderThanDays`, the value is in +days, the default is 30 days. Best is to use it as a cronjob + +## Important + +Make sure your Job classes have a property value of maxAttempts because if it's missing, the log table can quickly +grow to gigantic size in the event of an uncaught exception in Job, Job is re-queued indefinitely in such a case. diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..4444909 --- /dev/null +++ b/composer.json @@ -0,0 +1,77 @@ +{ + "name": "cakedc/queue-monitor", + "description": "CakeDC Queue Monitor plugin for CakePHP", + "type": "cakephp-plugin", + "license": "MIT", + "keywords": [ + "cakephp", + "queue", + "queue monitoring", + "queue monitor" + ], + "homepage": "https://github.com/CakeDC/cakephp-queue-monitor", + "authors": [ + { + "name": "CakeDC", + "homepage": "https://www.cakedc.com", + "role": "Author" + }, + { + "name": "Others", + "homepage": "https://github.com/CakeDC/cakephp-queue-monitor/graphs/contributors" + } + ], + "support": { + "issues": "https://github.com/CakeDC/cakephp-queue-monitor/issues", + "source": "https://github.com/CakeDC/cakephp-queue-monitor" + }, + "require": { + "php": ">=8.1", + "cakephp/cakephp": "^4.4", + "cakephp/queue": "^1.1", + "ext-json": "*" + }, + "require-dev": { + "phpunit/phpunit": "^9.6", + "cakephp/migrations": "^3.9", + "cakephp/cakephp-codesniffer": "^4.5" + }, + "autoload": { + "psr-4": { + "CakeDC\\QueueMonitor\\": "src/" + } + }, + "autoload-dev": { + "psr-4": { + "CakeDC\\QueueMonitor\\Test\\": "tests/", + "Cake\\Test\\": "vendor/cakephp/cakephp/tests/" + } + }, + "scripts": { + "analyse": [ + "@stan", + "@psalm" + ], + "check": [ + "@cs-check", + "@test", + "@analyse" + ], + "cs-check": "phpcs -n -p ./src ./tests", + "cs-fix": "phpcbf ./src ./tests", + "test": "phpunit --stderr", + "stan": "phpstan analyse src/", + "psalm": "php vendor/psalm/phar/psalm.phar --show-info=false src/ ", + "stan-setup": "cp composer.json composer.backup && composer require --dev phpstan/phpstan:~1.9.0 psalm/phar:~5.1.0 && mv composer.backup composer.json", + "stan-rebuild-baseline": "phpstan analyse --configuration phpstan.neon --error-format baselineNeon src/ > phpstan-baseline.neon", + "psalm-rebuild-baseline": "php vendor/psalm/phar/psalm.phar --show-info=false --set-baseline=psalm-baseline.xml src/", + "rector": "rector process src/", + "rector-setup": "cp composer.json composer.backup && composer require --dev rector/rector:^0.11.2 && mv composer.backup composer.json", + "coverage-test": "phpunit --stderr --coverage-clover=clover.xml" + }, + "config": { + "allow-plugins": { + "dealerdirect/phpcodesniffer-composer-installer": true + } + } +} diff --git a/config/Migrations/20231213090000_CreateQueueMonitoringLogs.php b/config/Migrations/20231213090000_CreateQueueMonitoringLogs.php new file mode 100644 index 0000000..335b709 --- /dev/null +++ b/config/Migrations/20231213090000_CreateQueueMonitoringLogs.php @@ -0,0 +1,108 @@ +table('queue_monitoring_logs', ['id' => false, 'primary_key' => ['id']]) + ->addColumn('id', 'uuid', [ + 'default' => null, + 'limit' => null, + 'null' => false, + ]) + ->addColumn('created', 'datetime', [ + 'default' => null, + 'limit' => null, + 'null' => false, + 'precision' => 6, + ]) + ->addColumn('message_id', 'uuid', [ + 'default' => null, + 'limit' => null, + 'null' => false, + ]) + ->addColumn('message_timestamp', 'datetime', [ + 'default' => null, + 'limit' => null, + 'null' => false, + ]) + ->addColumn('event', 'tinyinteger', [ + 'default' => null, + 'limit' => null, + 'null' => false, + ]) + ->addColumn('job', 'string', [ + 'default' => null, + 'limit' => 255, + 'null' => true, + ]) + ->addColumn('exception', 'string', [ + 'default' => null, + 'limit' => 255, + 'null' => true, + ]) + ->addColumn('content', 'text', [ + 'default' => null, + 'limit' => 4294967295, + 'null' => false, + ]) + ->addIndex( + [ + 'created', + ], + [ + 'name' => 'logs_created_index', + ] + ) + ->addIndex( + [ + 'event', + ], + [ + 'name' => 'logs_event_index', + ] + ) + ->addIndex( + [ + 'message_id', + ], + [ + 'name' => 'logs_message_id_index', + ] + ) + ->create(); + } + + /** + * Down Method. + * + * More information on this method is available here: + * https://book.cakephp.org/phinx/0/en/migrations.html#the-down-method + * + * @return void + */ + public function down(): void + { + $this->table('queue_monitoring_logs')->drop()->save(); + } +} diff --git a/phpcs.xml b/phpcs.xml new file mode 100644 index 0000000..e46a08d --- /dev/null +++ b/phpcs.xml @@ -0,0 +1,6 @@ + + + + + + diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon new file mode 100644 index 0000000..e69de29 diff --git a/phpstan.neon b/phpstan.neon new file mode 100644 index 0000000..7dc051f --- /dev/null +++ b/phpstan.neon @@ -0,0 +1,10 @@ +includes: + - phpstan-baseline.neon +parameters: + level: 8 + checkMissingIterableValueType: false + checkGenericClassInNonGenericObjectType: false + bootstrapFiles: + - tests/bootstrap.php + paths: + - src/ diff --git a/phpunit.xml.dist b/phpunit.xml.dist new file mode 100644 index 0000000..5262acf --- /dev/null +++ b/phpunit.xml.dist @@ -0,0 +1,30 @@ + + + + + + + + + + + tests/TestCase/ + + + + + + + + + + + src/ + + + diff --git a/psalm.xml b/psalm.xml new file mode 100644 index 0000000..2119034 --- /dev/null +++ b/psalm.xml @@ -0,0 +1,34 @@ + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/Command/NotifyCommand.php b/src/Command/NotifyCommand.php new file mode 100644 index 0000000..3301af1 --- /dev/null +++ b/src/Command/NotifyCommand.php @@ -0,0 +1,79 @@ +setDescription(__('Queue Monitoring notifier')); + } + + /** + * @inheritDoc + */ + public function execute(Arguments $args, ConsoleIo $io) + { + try { + $this->queueMonitoringService->notifyAboutLongRunningJobs( + (int)Configure::read( + 'QueueMonitor.longJobInMinutes', + self::DEFAULT_LONG_JOB_IN_MINUTES + ) + ); + } catch (Exception $e) { + $this->log("Failed to send queue stuck notifications, reason: {$e->getMessage()}"); + + return self::CODE_ERROR; + } + + return self::CODE_SUCCESS; + } +} diff --git a/src/Command/PurgeCommand.php b/src/Command/PurgeCommand.php new file mode 100644 index 0000000..221da97 --- /dev/null +++ b/src/Command/PurgeCommand.php @@ -0,0 +1,89 @@ +setDescription(__('Queue Monitoring log purger')); + } + + /** + * @inheritDoc + */ + public function execute(Arguments $args, ConsoleIo $io) + { + $purgeToDate = $this->queueMonitoringService->getPurgeToDate( + (int)Configure::read( + 'QueueMonitor.purgeLogsOlderThanDays', + self::DEFAULT_PURGE_DAYS_OLD + ) + ); + $this->log( + "Purging queue logs older than {$purgeToDate->toDateTimeString()} UTC", + LogLevel::INFO + ); + try { + $rowCount = $this->queueMonitoringService->purgeLogs(self::DEFAULT_PURGE_DAYS_OLD); + $this->log( + "Purged $rowCount queue messages older than {$purgeToDate->toDateTimeString()} UTC", + LogLevel::INFO + ); + } catch (Exception $e) { + $this->log("Failed puring `queue stuck` logs, reason: {$e->getMessage()}"); + + return self::CODE_ERROR; + } + + return self::CODE_SUCCESS; + } +} diff --git a/src/Exception/QueueMonitorException.php b/src/Exception/QueueMonitorException.php new file mode 100644 index 0000000..911566b --- /dev/null +++ b/src/Exception/QueueMonitorException.php @@ -0,0 +1,22 @@ +QueueMonitoringLogs = $this->fetchTable(LogsTable::class); + } + + /** + * Implemented events + */ + public function implementedEvents(): array + { + /** + * @uses \CakeDC\QueueMonitor\Listener\QueueMonitorListener::handleMessageEvent() + * @uses \CakeDC\QueueMonitor\Listener\QueueMonitorListener::handleException() + * @uses \CakeDC\QueueMonitor\Listener\QueueMonitorListener::handleSeen() + */ + return [ + 'Processor.message.exception' => 'handleException', + 'Processor.message.invalid' => 'handleMessageEvent', + 'Processor.message.reject' => 'handleMessageEvent', + 'Processor.message.success' => 'handleMessageEvent', + 'Processor.message.failure' => 'handleMessageEvent', + 'Processor.message.seen' => 'handleSeen', + 'Processor.message.start' => 'handleMessageEvent', + ]; + } + + /** + * Handle event `Processor.message.exception` + */ + public function handleException(EventInterface $event, ?Message $message, ?Throwable $exception = null): void + { + try { + $message = $this->validateQueueMessage($message); + + if (!$exception) { + throw new QueueMonitorException( + 'Queue Exception is null, ensure that the queue job is set up correctly' + ); + } + + $this->storeEvent( + $event->getName(), + implode('::', $message->getTarget()), + $message->getOriginalMessage(), + $exception + ); + } catch (Exception $e) { + $this->log("Unable to handle queue monitoring exception message event, reason: {$e->getMessage()}"); + } + } + + /** + * Handle events + * `Processor.message.invalid` + * `Processor.message.reject` + * `Processor.message.success` + * `Processor.message.failure` + * `Processor.message.start` + */ + public function handleMessageEvent(EventInterface $event, ?Message $message): void + { + try { + $message = $this->validateQueueMessage($message); + + $this->storeEvent( + $event->getName(), + implode('::', $message->getTarget()), + $message->getOriginalMessage() + ); + } catch (Exception $e) { + $this->log('Unable to handle queue monitoring message event ' . + "`{$event->getName()}`, reason: {$e->getMessage()}"); + } + } + + /** + * Handle event `Processor.message.seen` + */ + public function handleSeen(EventInterface $event, ?QueueMessage $queueMessage): void + { + try { + $queueMessage = $this->validateInteropQueueMessage($queueMessage); + $messageBody = json_decode($queueMessage->getBody(), true); + $target = is_array($messageBody) ? + implode('::', Hash::get($messageBody, 'class')) : + ''; + + $this->storeEvent( + $event->getName(), + $target, + $queueMessage + ); + } catch (Exception $e) { + $this->log('Unable to handle queue monitoring message event ' . + "`{$event->getName()}`, reason: {$e->getMessage()}"); + } + } + + /** + * @throws \Exception + */ + private function storeEvent( + string $eventName, + string $target, + QueueMessage $queueMessage, + ?Throwable $exception = null + ): void { + if (!$queueMessage->getMessageId()) { + throw new QueueMonitorException('Missing message id in queue message'); + } + if (!$queueMessage->getTimestamp()) { + throw new QueueMonitorException('Missing timestamp in queue message'); + } + + /** @var \CakeDC\QueueMonitor\Model\Entity\Log $queueMonitoringLog */ + $queueMonitoringLog = $this->QueueMonitoringLogs->newEmptyEntity(); + + $queueMonitoringLog->message_id = (string)$queueMessage->getMessageId(); + $queueMonitoringLog->message_timestamp = FrozenTime::createFromTimestamp( + (int)$queueMessage->getTimestamp(), + 'UTC' + ); + $queueMonitoringLog->event = MessageEvent::from($eventName)->getEventAsInt(); + $queueMonitoringLog->job = $target; + $queueMonitoringLog->exception = $exception ? get_class($exception) : null; + $queueMonitoringLog->content = (string)json_encode([ + 'body' => json_decode($queueMessage->getBody(), true), + 'headers' => $queueMessage->getHeaders(), + 'properties' => $queueMessage->getProperties(), + ]); + + $this->QueueMonitoringLogs->saveOrFail($queueMonitoringLog); + } + + /** + * Validate queue message + * + * @throws \CakeDC\QueueMonitor\Exception\QueueMonitorException + */ + public function validateQueueMessage(?Message $message): Message + { + if (!($message instanceof Message)) { + throw new QueueMonitorException( + 'Message is not an instance of \Cake\Queue\Job\Message, ' . + 'ensure that the queue job is set up correctly' + ); + } + + return $message; + } + + /** + * Validate Interop Queue Message + * + * @throws \CakeDC\QueueMonitor\Exception\QueueMonitorException + */ + public function validateInteropQueueMessage(?QueueMessage $queueMessage): QueueMessage + { + if (!($queueMessage instanceof QueueMessage)) { + throw new QueueMonitorException( + 'Interop QueueMessage is not an instance of \Interop\Queue\Message, ' . + 'ensure that the queue job is set up correctly' + ); + } + + return $queueMessage; + } +} diff --git a/src/Model/Entity/Log.php b/src/Model/Entity/Log.php new file mode 100644 index 0000000..bd3e1a2 --- /dev/null +++ b/src/Model/Entity/Log.php @@ -0,0 +1,51 @@ + + */ + protected $_accessible = [ + 'created' => true, + 'message_id' => true, + 'message_timestamp' => true, + 'event' => true, + 'job' => true, + 'exception' => true, + 'content' => true, + ]; +} diff --git a/src/Model/Status/MessageEvent.php b/src/Model/Status/MessageEvent.php new file mode 100644 index 0000000..3142d85 --- /dev/null +++ b/src/Model/Status/MessageEvent.php @@ -0,0 +1,77 @@ + 1, + self::Invalid => 2, + self::Start => 3, + self::Exception => 4, + self::Success => 5, + self::Reject => 6, + self::Failure => 7, + }; + } + + /** + * Get as options + */ + public static function getOptions(): array + { + return collection(self::cases()) + ->combine( + fn (MessageEvent $messageEvent) => $messageEvent->getEventAsInt(), + fn (MessageEvent $messageEvent) => $messageEvent->name + ) + ->toArray(); + } + + /** + * Get events that indicates that job ended + */ + public static function getNotEndingEvents(): array + { + return [ + self::Seen, + self::Start, + ]; + } + + /** + * Get events that indicates that job ended (int array) + */ + public static function getNotEndingEventsAsInts(): array + { + return collection(self::getNotEndingEvents()) + ->map(fn (self $messageEvent): int => $messageEvent->getEventAsInt()) + ->toList(); + } +} diff --git a/src/Model/Table/LogsTable.php b/src/Model/Table/LogsTable.php new file mode 100644 index 0000000..22f80a8 --- /dev/null +++ b/src/Model/Table/LogsTable.php @@ -0,0 +1,105 @@ +setTable('queue_monitoring_logs'); + $this->setDisplayField('event'); + $this->setPrimaryKey('id'); + $this->addBehavior('Timestamp'); + } + + /** + * Find entity with last event + * + * @uses \CakeDC\QueueMonitor\Model\Table\LogsTable::findLastEvent() + */ + public function findWithLastEvent(Query $query): Query + { + return $query + ->find('lastEvent') + ->select($this); + } + + /** + * Find last event + */ + public function findLastEvent(Query $query): Query + { + return $query + ->select([ + 'last_event' => $query->func()->max($this->aliasField('event'), ['integer']), + 'last_created' => $query->func()->max($this->aliasField('created'), ['datetime']), + 'message_timestamp', + ]) + ->group($this->aliasField('message_id')); + } + + /** + * Find stuck jobs + * + * @throws \Exception + * @uses \CakeDC\QueueMonitor\Model\Table\LogsTable::findLastEvent() + */ + public function findStuckJobs(Query $query, array $options): Query + { + if (!array_key_exists('olderThan', $options)) { + throw new QueueMonitorException('Missing `olderThan` option'); + } + $olderThan = $options['olderThan']; + + if (!($olderThan instanceof ChronosInterface)) { + throw new QueueMonitorException( + 'Option `olderThan` should be an instance of \Cake\Chronos\ChronosInterface' + ); + } + + return $query + ->find('lastEvent') + ->having(fn (QueryExpression $queryExpression): QueryExpression => $queryExpression + ->in('last_event', MessageEvent::getNotEndingEventsAsInts()) + ->lte('last_created', $olderThan->toDateTimeString())); + } +} diff --git a/src/QueueMonitorPlugin.php b/src/QueueMonitorPlugin.php new file mode 100644 index 0000000..37e3084 --- /dev/null +++ b/src/QueueMonitorPlugin.php @@ -0,0 +1,69 @@ +add('queue_monitor purge', PurgeCommand::class) + ->add('queue_monitor notify', NotifyCommand::class); + } + + /** + * @inheritDoc + */ + public function services(ContainerInterface $container): void + { + $container->add(QueueMonitoringService::class); + $container + ->add(PurgeCommand::class) + ->addArguments([ + QueueMonitoringService::class, + ]); + $container + ->add(NotifyCommand::class) + ->addArguments([ + QueueMonitoringService::class, + ]); + } +} diff --git a/src/Service/QueueMonitoringService.php b/src/Service/QueueMonitoringService.php new file mode 100644 index 0000000..ea41529 --- /dev/null +++ b/src/Service/QueueMonitoringService.php @@ -0,0 +1,110 @@ +QueueMonitoringLogsTable = $this->fetchTable(LogsTable::class); + } + + /** + * Get purge `to date` value + */ + public function getPurgeToDate(int $daysOld): FrozenTime + { + return FrozenTime::now('UTC') + ->subDays($daysOld) + ->endOfDay(); + } + + /** + * Purge old logs + */ + public function purgeLogs(int $daysOld): int + { + return $this->QueueMonitoringLogsTable->deleteAll( + fn (QueryExpression $queryExpression): QueryExpression => $queryExpression->lte( + $this->QueueMonitoringLogsTable->aliasField('message_timestamp'), + $this->getPurgeToDate($daysOld), + TableSchemaInterface::TYPE_DATETIME + ) + ); + } + + /** + * get the list of jobs that are have last event older than 30 minutes and event type is not finished + * in any way (seen, start) + * + * @throws \Exception + */ + public function notifyAboutLongRunningJobs(int $longJobsInMinutes): void + { + $olderThan = FrozenTime::now('UTC')->subMinutes($longJobsInMinutes); + + /** + * @uses \CakeDC\QueueMonitor\Model\Table\LogsTable::findStuckJobs() + */ + $runningJobs = $this->QueueMonitoringLogsTable + ->find('stuckJobs', compact('olderThan')) + ->all(); + + if ($runningJobs->count()) { + $notifyEmails = Configure::read('QueueMonitor.notificationRecipients'); + if (!$notifyEmails) { + throw new QueueMonitorException( + 'Missing `QueueMonitor.notificationRecipients` configuration' + ); + } + $notifyEmails = explode(',', $notifyEmails); + $mailerConfig = Configure::read('QueueMonitor.mailerConfig', 'default'); + $mailer = new Mailer($mailerConfig); + foreach ($notifyEmails as $notifyEmail) { + if (!Validation::email($notifyEmail)) { + throw new QueueMonitorException( + 'Invalid notification email in `QueueMonitor.notificationRecipients`' + ); + } + $mailer->addTo(trim($notifyEmail)); + } + $mailer->setSubject('Emergency. There are jobs stuck in queue.') + ->deliver('This is automated message about queue job stuck in queue engine.' . + " \n\nThere are {$runningJobs->count()} stuck in queue for the last $longJobsInMinutes " . + 'minutes and more.'); + } + } +} diff --git a/tests/TestCase/Listener/QueueMonitorListenerTest.php b/tests/TestCase/Listener/QueueMonitorListenerTest.php new file mode 100644 index 0000000..8209bf8 --- /dev/null +++ b/tests/TestCase/Listener/QueueMonitorListenerTest.php @@ -0,0 +1,215 @@ + [static::class, $method], + 'data' => ['sample_data' => 'a value', 'key' => md5($method)], + ]; + $connectionFactory = new NullConnectionFactory(); + $context = $connectionFactory->createContext(); + $queueMessage = new NullMessage(json_encode($messageBody)); + + $events = new EventList(); + $logger = new ArrayLog(); + $processor = new Processor($logger); + $processor->getEventManager()->setEventList($events); + $processor->getEventManager()->on(new QueueMonitorListener()); + + $result = $processor->process($queueMessage, $context); + $this->assertEquals(InteropProcessor::REQUEUE, $result); + } + + /** + * Test process method + * + * @dataProvider dataProviderTestProcess + */ + public function testProcess($jobMethod, $expected, $logMessage, $dispatchedEvent) + { + $messageBody = [ + 'class' => [static::class, $jobMethod], + 'args' => [], + ]; + $connectionFactory = new NullConnectionFactory(); + $context = $connectionFactory->createContext(); + $queueMessage = new NullMessage(json_encode($messageBody)); + $message = new Message($queueMessage, $context); + + $events = new EventList(); + $logger = new ArrayLog(); + $processor = new Processor($logger); + $processor->getEventManager()->setEventList($events); + $processor->getEventManager()->on(new QueueMonitorListener()); + + $actual = $processor->process($queueMessage, $context); + $this->assertSame($expected, $actual); + + $logs = $logger->read(); + $this->assertCount(1, $logs); + $this->assertStringContainsString('debug', $logs[0]); + $this->assertStringContainsString($logMessage, $logs[0]); + + $this->assertSame(3, $events->count()); + $this->assertSame('Processor.message.seen', $events[0]->getName()); + $this->assertEquals(['queueMessage' => $queueMessage], $events[0]->getData()); + + // Events should contain a message with the same payload. + $this->assertSame('Processor.message.start', $events[1]->getName()); + $data = $events[1]->getData(); + $this->assertArrayHasKey('message', $data); + $this->assertSame($message->jsonSerialize(), $data['message']->jsonSerialize()); + + $this->assertSame($dispatchedEvent, $events[2]->getName()); + $data = $events[2]->getData(); + $this->assertArrayHasKey('message', $data); + $this->assertSame($message->jsonSerialize(), $data['message']->jsonSerialize()); + } + + /** + * Job to be used in test testHandleException + * + * @throws \Exception + */ + public static function processAndThrowException(Message $message) + { + throw new Exception('Something went wrong'); + } + + /** + * Job to be used in test testHandleException + * + * @throws \TypeError + */ + public static function processAndThrowTypeError(Message $message) + { + throw new TypeError('Type error'); + } + + /** + * Job to be used in test testHandleException + * + * @throws \Error + */ + public static function processAndThrowError(Message $message) + { + throw new Error('Error'); + } + + /** + * Data provider used by testHandleException + * + * @return array[] + */ + public function dataProviderTestHandleException(): array + { + return [ + ['processAndThrowException'], + ['processAndThrowTypeError'], + ['processAndThrowError'], + ]; + } + + /** + * Data provider for testProcess method + * + * @return array + */ + public function dataProviderTestProcess(): array + { + return [ + 'ack' => ['processReturnAck', InteropProcessor::ACK, 'Message processed successfully', 'Processor.message.success'], + 'null' => ['processReturnNull', InteropProcessor::ACK, 'Message processed successfully', 'Processor.message.success'], + 'reject' => ['processReturnReject', InteropProcessor::REJECT, 'Message processed with rejection', 'Processor.message.reject'], + 'requeue' => ['processReturnRequeue', InteropProcessor::REQUEUE, 'Message processed with failure, requeuing', 'Processor.message.failure'], + 'string' => ['processReturnString', InteropProcessor::REQUEUE, 'Message processed with failure, requeuing', 'Processor.message.failure'], + ]; + } + + /** + * Job to be used in test testProcess + */ + public static function processReturnNull(Message $message) + { + static::$lastProcessMessage = $message; + + return null; + } + + /** + * Job to be used in test testProcess + */ + public static function processReturnReject(Message $message) + { + static::$lastProcessMessage = $message; + + return InteropProcessor::REJECT; + } + + /** + * Job to be used in test testProcess + */ + public static function processReturnAck(Message $message) + { + static::$lastProcessMessage = $message; + + return InteropProcessor::ACK; + } + + /** + * Job to be used in test testProcess + */ + public static function processReturnRequeue(Message $message) + { + static::$lastProcessMessage = $message; + + return InteropProcessor::REQUEUE; + } + + /** + * Job to be used in test testProcess + */ + public static function processReturnString(Message $message) + { + static::$lastProcessMessage = $message; + + return 'invalid value'; + } +} diff --git a/tests/bootstrap.php b/tests/bootstrap.php new file mode 100644 index 0000000..f417bf3 --- /dev/null +++ b/tests/bootstrap.php @@ -0,0 +1,74 @@ + getenv('db_dsn'), + 'timezone' => 'UTC', +]); + +/** + * Load schema from a SQL dump file. + * + * If your plugin does not use database fixtures you can + * safely delete this. + * + * If you want to support multiple databases, consider + * using migrations to provide schema for your plugin, + * and using \Migrations\TestSuite\Migrator to load schema. + */ +(new Migrator())->run(); diff --git a/tests/schema.sql b/tests/schema.sql new file mode 100644 index 0000000..08835b7 --- /dev/null +++ b/tests/schema.sql @@ -0,0 +1 @@ +-- Test database schema for QueueMonitorTemplate