Skip to content

Commit

Permalink
Merge pull request #991 from CatoTH/v4-background-jobs
Browse files Browse the repository at this point in the history
Background Jobs
  • Loading branch information
CatoTH authored Dec 25, 2024
2 parents cb64810 + 6834667 commit 4d56bd4
Show file tree
Hide file tree
Showing 18 changed files with 608 additions and 83 deletions.
4 changes: 4 additions & 0 deletions History.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Version history

## Version 4.16.0 [not released yet]

- An optional mechanism for background job execution is introduced, making it possible to send e-mails asynchronously (therefore not blocking regular requests).

### Version 4.15.1 [not released yet]

- Next to the status-dropdown for motions and amendments, there is now a link to a reference page, explaining the uses of these different statuses.
Expand Down
23 changes: 22 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ Instead of "antragsgruen_sites", a custom plugin managing the authentication and

Redis can be used to cache the changes in amendments, user sessions, and many other aspects of the site. To enable redis, simply add a `redis` configuration key to the `config.json` and point it to your setup:

Add the following settings to your config.json (and adapt them to your needs):
Add the following settings to your `config.json` (and adapt them to your needs):
```json
{
"redis": {
Expand All @@ -292,6 +292,27 @@ Add the following settings to your config.json (and adapt them to your needs):
}
```

### Enable background job processing

Some processes that are potentially blocking or long-running can be executed as background jobs, by using a permanently running worker-job that executes these jobs asynchonously.

The following example on how to run the background job processor uses [Supervisord](http://supervisord.org), but it is just as possible running it via any other process manager.
- Copy [supervisor.conf](docs/supervisor.conf) to your supervisord configuration directory, modify it to your needs, and run it.
- Create an API key for the health checks (optional) and its hash (via `password_encode($password, PASSWORD_DEFAULT)`).
- Enable background jobs by adding the following settings to your `config.json`.
- Set up a cronjob to clean the database at least once a day by executing `yii background-job/cleanup`.

```json
{
"backgroundJobs": {
"notifications": true
},
"healthCheckKey": "$2y$12$...."
}
```

Currently, this only affects the sending of e-mails.

### File-based View Caching (very large consultations)

Antragsgrün already does a decent amount of caching by default, and even more when enabling Redis. An even more aggressive caching mode that caches some fully rendered HTML pages and PDFs can be enabled by enabling the following option in the `config.json`:
Expand Down
41 changes: 41 additions & 0 deletions assets/db/create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,23 @@ CREATE TABLE `###TABLE_PREFIX###votingQuestion` (
`votingData` text DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

--
-- Table structure for table `backgroundJob`
--

CREATE TABLE `###TABLE_PREFIX###backgroundJob` (
`id` bigint UNSIGNED NOT NULL,
`siteId` int DEFAULT NULL,
`consultationId` int DEFAULT NULL,
`type` varchar(150) NOT NULL,
`dateCreation` timestamp NOT NULL,
`dateStarted` timestamp NULL DEFAULT NULL,
`dateUpdated` timestamp NULL DEFAULT NULL,
`dateFinished` timestamp NULL DEFAULT NULL,
`payload` mediumtext NOT NULL,
`error` text DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

--
-- Indexes for dumped tables
--
Expand Down Expand Up @@ -1155,6 +1172,16 @@ ALTER TABLE `###TABLE_PREFIX###votingQuestion`
ADD KEY `fk_question_block` (`votingBlockId`),
ADD KEY `fk_question_consultation` (`consultationId`);

--
-- Indexes for table `backgroundJob`
--
ALTER TABLE `###TABLE_PREFIX###backgroundJob`
ADD PRIMARY KEY (`id`),
ADD KEY `fk_background_site` (`siteId`),
ADD KEY `fk_background_consultation` (`consultationId`),
ADD KEY `ix_background_pending` (`dateStarted`,`id`),
ADD KEY `ix_background_todelete` (`dateFinished`);

--
-- AUTO_INCREMENT for dumped tables
--
Expand Down Expand Up @@ -1320,6 +1347,13 @@ ALTER TABLE `###TABLE_PREFIX###votingBlock`
--
ALTER TABLE `###TABLE_PREFIX###votingQuestion`
MODIFY `id` int(11) NOT NULL AUTO_INCREMENT;

--
-- AUTO_INCREMENT for table `backgroundJob`
--
ALTER TABLE `###TABLE_PREFIX###backgroundJob`
MODIFY `id` bigint UNSIGNED NOT NULL AUTO_INCREMENT;

--
-- Constraints for dumped tables
--
Expand Down Expand Up @@ -1629,6 +1663,13 @@ ALTER TABLE `###TABLE_PREFIX###votingQuestion`
ADD CONSTRAINT `fk_question_block` FOREIGN KEY (`votingBlockId`) REFERENCES `###TABLE_PREFIX###votingBlock` (`id`),
ADD CONSTRAINT `fk_question_consultation` FOREIGN KEY (`consultationId`) REFERENCES `###TABLE_PREFIX###consultation` (`id`);

--
-- Constraints for table `backgroundJob`
--
ALTER TABLE `###TABLE_PREFIX###backgroundJob`
ADD CONSTRAINT `fk_background_consultation` FOREIGN KEY (`consultationId`) REFERENCES `###TABLE_PREFIX###consultation` (`id`),
ADD CONSTRAINT `fk_background_site` FOREIGN KEY (`siteId`) REFERENCES `###TABLE_PREFIX###site` (`id`);

SET SQL_MODE = @OLD_SQL_MODE;
SET FOREIGN_KEY_CHECKS = @OLD_FOREIGN_KEY_CHECKS;
SET UNIQUE_CHECKS = @OLD_UNIQUE_CHECKS;
3 changes: 2 additions & 1 deletion assets/db/data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ INSERT INTO `migration` (`version`, `apply_time`) VALUES
('m240427_090527_motion_status_index', '1714209051'),
('m240830_181716_user_secret_key', '1725041937'),
('m241013_105549_pages_files', '1728817360'),
('m241027_074032_pages_policies', '1730015023')
('m241027_074032_pages_policies', '1730015023'),
('m241201_100317_background_jobs', '1733052690')
;

SET SQL_MODE = @OLD_SQL_MODE;
Expand Down
1 change: 1 addition & 0 deletions assets/db/delete.sql
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ DROP TABLE IF EXISTS `###TABLE_PREFIX###consultationMotionType`;
DROP TABLE IF EXISTS `###TABLE_PREFIX###consultationSettingsTag`;
DROP TABLE IF EXISTS `###TABLE_PREFIX###consultationLog`;
DROP TABLE IF EXISTS `###TABLE_PREFIX###texTemplate`;
DROP TABLE IF EXISTS `###TABLE_PREFIX###backgroundJob`;

SET SQL_MODE=@OLD_SQL_MODE;
SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS;
Expand Down
88 changes: 88 additions & 0 deletions commands/BackgroundJobController.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
<?php

declare(strict_types=1);

namespace app\commands;

use app\components\BackgroundJobProcessor;
use app\components\BackgroundJobScheduler;
use Yii;
use yii\console\Controller;

/**
* Run background commends
*/
class BackgroundJobController extends Controller
{
private const DEFAULT_MAX_EVENTS = 1000;
private const DEFAULT_MAX_RUNTIME_SECONDS = 600;
private const DEFAULT_MAX_MEMORY_USAGE = 64_000_000;

private const MAX_RETENTION_PERIOD_HOURS = 24 * 3;

protected int $maxEvents = self::DEFAULT_MAX_EVENTS;
protected int $maxRuntimeSeconds = self::DEFAULT_MAX_RUNTIME_SECONDS;
protected int $maxMemoryUsage = self::DEFAULT_MAX_MEMORY_USAGE;

public function options($actionID): array
{
return ['maxEvents', 'maxRuntimeSeconds', 'maxMemoryUsage'];
}

/**
* Runs the background job processor
* Options:
* --max-runtime-seconds 600
* --max-events 1000
* --max-memory-usage 64000000
*/
public function actionRun(): void
{
echo "Starting background job processor at: " . (new \DateTimeImmutable())->format("Y-m-d H:i:s.u") . "\n";

$connection = \Yii::$app->getDb();
$connection->enableLogging = false;

$processor = new BackgroundJobProcessor($connection);
while (!$this->needsRestart($processor)) {
$row = $processor->getJobAndSetStarted();
if ($row) {
$processor->processRow($row);
} else {
usleep(100_000);
}
}

echo "Stopping background job processor at: " . (new \DateTimeImmutable())->format("Y-m-d H:i:s.u") . "\n";
}

private function needsRestart(BackgroundJobProcessor $processor): bool
{
if ($processor->getProcessedEvents() >= $this->maxEvents) {
echo "Stopping because maximum number of processed events has been reached.\n";
return true;
}

if ($processor->getRuntimeInSeconds() >= $this->maxRuntimeSeconds) {
echo "Stopping because maximum runtime has been reached.\n";
return true;
}

if (memory_get_peak_usage() >= $this->maxMemoryUsage) {
echo "Stopping because maximum memory usage has been reached.\n";
return true;
}

return false;
}

/**
* Cleans up old tasks from database
*/
public function actionCleanup(): void
{
$deletedJobs = BackgroundJobScheduler::cleanup(self::MAX_RETENTION_PERIOD_HOURS);

echo "Deleted $deletedJobs jobs.\n";
}
}
76 changes: 76 additions & 0 deletions components/BackgroundJobProcessor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
<?php

declare(strict_types=1);

namespace app\components;

use app\models\backgroundJobs\IBackgroundJob;
use yii\db\Connection;

class BackgroundJobProcessor
{
private Connection $connection;

private int $processedEvents = 0;
private \DateTimeImmutable $startedAt;

public function __construct(Connection $connection) {
$this->connection = $connection;
$this->startedAt = new \DateTimeImmutable();
}

public function getJobAndSetStarted(): ?IBackgroundJob {
$foundJob = null;

$this->connection->transaction(function () use (&$foundJob) {
$command = $this->connection->createCommand('SELECT * FROM backgroundJob WHERE dateStarted IS NULL ORDER BY id ASC LIMIT 0,1 FOR UPDATE');
$foundRows = $command->queryAll();
if (empty($foundRows)) {
return;
}

$foundRow = $foundRows[0];
$this->connection->createCommand('UPDATE backgroundJob SET dateStarted = NOW() WHERE id = :id', ['id' => $foundRow['id']])->execute();

$foundJob = IBackgroundJob::fromJson(
intval($foundRow['id']),
$foundRow['type'],
($foundRow['siteId'] > 0 ? $foundRow['siteId'] : null),
($foundRow['consultationId'] > 0 ? $foundRow['consultationId'] : null),
$foundRow['payload']
);
});

return $foundJob;
}

public function processRow(IBackgroundJob $job): void
{
$this->connection->createCommand(
'UPDATE backgroundJob SET dateUpdated = NOW() WHERE id = :id',
['id' => $job->getId()]
)->execute();

try {
$job->execute();

$this->connection->createCommand(
'UPDATE backgroundJob SET dateFinished = NOW() WHERE id = :id',
['id' => $job->getId()]
)->execute();
} catch (\Throwable $exception) {
$this->connection->createCommand(
'UPDATE backgroundJob SET error = :error WHERE id = :id',
[':error' => $exception->getMessage() . PHP_EOL . $exception->getTraceAsString(), ':id' => $job->getId()]
)->execute();
}
}

public function getProcessedEvents(): int {
return $this->processedEvents;
}

public function getRuntimeInSeconds(): int {
return (new \DateTimeImmutable())->getTimestamp() - $this->startedAt->getTimestamp();
}
}
75 changes: 75 additions & 0 deletions components/BackgroundJobScheduler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
<?php

declare(strict_types=1);

namespace app\components;

use app\models\backgroundJobs\IBackgroundJob;
use app\models\settings\AntragsgruenApp;

class BackgroundJobScheduler
{
public const HEALTH_MAX_AGE_SECONDS = 120;

public static function executeOrScheduleJob(IBackgroundJob $job): void
{
if (isset(AntragsgruenApp::getInstance()->backgroundJobs['notifications']) && AntragsgruenApp::getInstance()->backgroundJobs['notifications']) {
\Yii::$app->getDb()->createCommand(
'INSERT INTO `backgroundJob` (`siteId`, `consultationId`, `type`, `dateCreation`, `payload`) VALUES (:siteId, :consultationId, :type, NOW(), :payload)',
[
':siteId' => $job->getSite()?->id,
':consultationId' => $job->getConsultation()?->id,
':type' => $job->getTypeId(),
':payload' => $job->toJson(),
]
)->execute();
} else {
$job->execute();
}
}

/**
* @return array{healthy: bool|null, data: array<string, mixed>}
*/
public static function getDiagnostics(): array
{
if (!isset(AntragsgruenApp::getInstance()->backgroundJobs['notifications']) || !AntragsgruenApp::getInstance()->backgroundJobs['notifications']) {
return [
'healthy' => null,
'data' => [],
];
}

$command = \Yii::$app->getDb()->createCommand('SELECT MIN(dateCreation) minAge, COUNT(*) num FROM backgroundJob WHERE dateStarted IS NULL');
$result = $command->queryAll()[0];
$unstarted = [
'num' => intval($result['num']),
'age' => ($result['minAge'] ? (time() - Tools::dateSql2timestamp($result['minAge'])) : 0),
];

$command = \Yii::$app->getDb()->createCommand('SELECT MIN(dateCreation) minAge, COUNT(*) num FROM backgroundJob WHERE dateFinished IS NULL');
$result = $command->queryAll()[0];
$unfinished = [
'num' => intval($result['num']),
'age' => ($result['minAge'] ? (time() - Tools::dateSql2timestamp($result['minAge'])) : 0),
];

return [
'healthy' => ($unstarted['age'] <= self::HEALTH_MAX_AGE_SECONDS && $unfinished['age'] <= self::HEALTH_MAX_AGE_SECONDS),
'data' => [
'unstarted' => $unstarted,
'unfinished' => $unfinished,
],
];
}

public static function cleanup(int $maxHageHours): int
{
$command = \Yii::$app->getDb()->createCommand(
'DELETE FROM backgroundJob WHERE dateFinished < NOW() - INTERVAL :hours HOUR',
[':hours' => $maxHageHours]
);

return $command->execute();
}
}
10 changes: 10 additions & 0 deletions components/SitePurger.php
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ public static function purgeConsultation(int $consultationId): void
[':conId' => $consultationId]
)->execute();

$connection->createCommand(
'DELETE FROM backgroundJob WHERE consultationId = :conId',
[':conId' => $consultationId]
)->execute();

$connection->createCommand(
'UPDATE site SET currentConsultationId = NULL WHERE currentConsultationId = :conId',
[':conId' => $consultationId]
Expand Down Expand Up @@ -164,6 +169,11 @@ public static function purgeSite(int $siteId): void
[':siteId' => $siteId]
)->execute();

$connection->createCommand(
'DELETE FROM backgroundJob WHERE siteId = :siteId',
[':siteId' => $siteId]
)->execute();

$connection->createCommand(
'DELETE FROM site WHERE id = :siteId',
[':siteId' => $siteId]
Expand Down
Loading

0 comments on commit 4d56bd4

Please sign in to comment.