Skip to content

Commit

Permalink
First draft of background worker
Browse files Browse the repository at this point in the history
  • Loading branch information
CatoTH committed Dec 1, 2024
1 parent 56c6f04 commit a40fb36
Show file tree
Hide file tree
Showing 8 changed files with 263 additions and 89 deletions.
2 changes: 1 addition & 1 deletion assets/db/create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ CREATE TABLE `###TABLE_PREFIX###backgroundJob` (
`dateStarted` timestamp NULL DEFAULT NULL,
`dateUpdated` timestamp NULL DEFAULT NULL,
`dateFinished` timestamp NULL DEFAULT NULL,
`payload` mediumblob NOT NULL
`payload` mediumtext NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

--
Expand Down
4 changes: 1 addition & 3 deletions commands/BackgroundJobController.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@ public function actionRun(): void
if ($row) {
$processor->processRow($row);
} else {
usleep(100000);
usleep(100_000);
}

file_put_contents('/tmp/memory_usage.log', date("Y-m-d H:i:s") . ": " . memory_get_peak_usage() . "\n", FILE_APPEND);
}
}

Expand Down
35 changes: 27 additions & 8 deletions components/BackgroundJobProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace app\components;

use app\models\backgroundJobs\IBackgroundJob;
use yii\db\Connection;

class BackgroundJobProcessor
Expand All @@ -18,10 +19,10 @@ public function __construct(Connection $connection) {
$this->startedAt = new \DateTimeImmutable();
}

public function getJobAndSetStarted(): ?array {
$foundRow = null;
public function getJobAndSetStarted(): ?IBackgroundJob {
$foundJob = null;

$this->connection->transaction(function () use (&$foundRow) {
$this->connection->transaction(function () use (&$foundJob) {
$command = $this->connection->createCommand('SELECT * FROM backgroundJob WHERE dateStarted IS NULL ORDER BY id ASC LIMIT 0,1 FOR UPDATE');
$foundRows = $command->queryAll();
if (empty($foundRows)) {
Expand All @@ -30,16 +31,34 @@ public function getJobAndSetStarted(): ?array {

$foundRow = $foundRows[0];
$this->connection->createCommand('UPDATE backgroundJob SET dateStarted = NOW() WHERE id = :id', ['id' => $foundRow['id']])->execute();

$foundJob = IBackgroundJob::fromJson(
intval($foundRow['id']),
$foundRow['type'],
($foundRow['siteId'] > 0 ? $foundRow['siteId'] : null),
($foundRow['consultationId'] > 0 ? $foundRow['consultationId'] : null),
$foundRow['payload']
);
});

return $foundRow;
return $foundJob;
}

public function processRow(array $row): void
public function processRow(IBackgroundJob $job): void
{
echo "Processing row: " . $row['id'] . "\n";
sleep(2);
$this->connection->createCommand('UPDATE backgroundJob SET dateFinished = NOW() WHERE id = :id', ['id' => $row['id']])->execute();
echo "Processing row: " . $job->getId() . "\n";

$this->connection->createCommand(
'UPDATE backgroundJob SET dateUpdated = NOW() WHERE id = :id',
['id' => $job->getId()]
)->execute();

$job->execute();

$this->connection->createCommand(
'UPDATE backgroundJob SET dateFinished = NOW() WHERE id = :id',
['id' => $job->getId()]
)->execute();
}

public function getProcessedEvents(): int {
Expand Down
23 changes: 23 additions & 0 deletions components/BackgroundJobScheduler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

declare(strict_types=1);

namespace app\components;

use app\models\backgroundJobs\IBackgroundJob;

class BackgroundJobScheduler
{
public static function executeOrScheduleJob(IBackgroundJob $job): void
{
\Yii::$app->getDb()->createCommand(
'INSERT INTO backgroundJob (`siteId`, `consultationId`, `type`, `dateCreation`, `payload`) VALUES (:siteId, :consultationId, :type, NOW(), :payload)',
[
':siteId' => $job->getSite()?->id,
':consultationId' => $job->getConsultation()?->id,
':type' => $job->getTypeId(),
':payload' => $job->toJson(),
]
)->execute();
}
}
94 changes: 18 additions & 76 deletions components/mail/Tools.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@

namespace app\components\mail;

use app\components\RequestContext;
use app\components\BackgroundJobScheduler;
use app\models\backgroundJobs\SendNotification;
use app\models\settings\AntragsgruenApp;
use app\models\db\{Consultation, EMailLog, IMotion, User};
use app\models\exceptions\{MailNotSent, ServerConfiguration};
use Symfony\Component\Mailer\Exception\TransportExceptionInterface;
use app\models\db\{Consultation, IMotion, User};

class Tools
{
Expand Down Expand Up @@ -53,10 +52,6 @@ public static function getDefaultReplyTo(?IMotion $imotion = null, ?Consultation
return $replyTo;
}

/**
* @throws MailNotSent
* @throws ServerConfiguration
*/
public static function sendWithLog(
int $mailType,
?Consultation $fromConsultation,
Expand All @@ -70,79 +65,26 @@ public static function sendWithLog(
?string $replyTo = null
): void {
$params = AntragsgruenApp::getInstance();
$mailer = Base::createMailer($params->mailService);
if (!$mailer) {
throw new MailNotSent('E-Mail not configured');
}

$sendTextPlain = ($noLogReplaces ? str_replace(
array_keys($noLogReplaces),
array_values($noLogReplaces),
$textPlain
) : $textPlain);
$sendTextHtml = ($noLogReplaces ? str_replace(
array_keys($noLogReplaces),
array_values($noLogReplaces),
$textHtml
) : $textHtml);

$fromEmail = $params->mailFromEmail;
if (!$fromName) {
$fromName = static::getDefaultMailFromName($fromConsultation);
$fromName = Tools::getDefaultMailFromName($fromConsultation);
}
if (!$replyTo) {
$replyTo = static::getDefaultReplyTo(null, $fromConsultation);
}

$exception = null;
$messageId = '';
try {
$message = $mailer->createMessage(
$subject,
$sendTextPlain,
$sendTextHtml,
$fromName,
$fromEmail,
$replyTo,
$fromConsultation
);
$result = $mailer->send($message, $toEmail);
if (is_string($result)) {
$status = EMailLog::STATUS_SENT;
$messageId = $result;
} else {
$status = $result;
}
} catch (TransportExceptionInterface $e) {
$status = EMailLog::STATUS_DELIVERY_ERROR;
$exception = $e;
$replyTo = Tools::getDefaultReplyTo(null, $fromConsultation);
}

$obj = new EMailLog();
if ($toPersonId) {
$obj->toUserId = $toPersonId;
}
if ($fromConsultation) {
$obj->fromSiteId = $fromConsultation->siteId;
}
$obj->toEmail = $toEmail;
$obj->type = $mailType;
$obj->fromEmail = $fromName . ' <' . $fromEmail . '>';
$obj->subject = mb_substr($subject, 0, 190);
$obj->text = $textPlain;
$obj->dateSent = date('Y-m-d H:i:s');
$obj->status = $status;
$obj->messageId = $messageId;
$obj->save();

if ($exception) {
\Yii::error($exception->getMessage());
throw new MailNotSent($exception->getMessage());
}

if (YII_ENV === 'test') {
$pre = RequestContext::getSession()->getFlash('email', '');
RequestContext::getSession()->setFlash('email', $pre . 'E-Mail sent to: ' . $toEmail . " (Type $mailType)\n");
}
BackgroundJobScheduler::executeOrScheduleJob(new SendNotification(
$fromConsultation,
$mailType,
$toEmail,
$toPersonId,
$subject,
$textPlain,
$textHtml,
$noLogReplaces,
$fromEmail,
$fromName,
$replyTo
));
}
}
2 changes: 1 addition & 1 deletion migrations/m241201_100317_background_jobs.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public function safeUp(): void
'dateStarted' => 'TIMESTAMP DEFAULT NULL',
'dateUpdated' => 'TIMESTAMP DEFAULT NULL',
'dateFinished' => 'TIMESTAMP DEFAULT NULL',
'payload' => 'MEDIUMBLOB NOT NULL',
'payload' => 'MEDIUMTEXT NOT NULL',
]);

$this->addForeignKey('fk_background_site', 'backgroundJob', 'siteId', 'site', 'id');
Expand Down
80 changes: 80 additions & 0 deletions models/backgroundJobs/IBackgroundJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
<?php

declare(strict_types=1);

namespace app\models\backgroundJobs;

use app\components\Tools;
use app\models\db\{Consultation, Site};
use Symfony\Component\Serializer\Annotation\Ignore;

abstract class IBackgroundJob
{
protected ?Consultation $consultation = null;
protected ?Site $site = null;
protected ?int $id = null;

abstract public function getTypeId(): string;

abstract public function execute(): void;

/**
* @return array<string, class-string<IBackgroundJob>>
*/
public static function getAllBackgroundJobs(): array
{
return [
SendNotification::TYPE_ID => SendNotification::class,
];
}

public function toJson(): string
{
$serializer = Tools::getSerializer();

return $serializer->serialize($this, 'json');
}

/**
* @Ignore
*/
public function getConsultation(): ?Consultation
{
return $this->consultation;
}

/**
* @Ignore
*/
public function getSite(): ?Site
{
return $this->site;
}

/**
* @Ignore
*/
public function getId(): ?int
{
return $this->id;
}

public static function fromJson(int $id, string $typeId, ?int $siteId, ?int $consultationId, string $json): IBackgroundJob
{
$serializer = Tools::getSerializer();

$class = self::getAllBackgroundJobs()[$typeId];

/** @var IBackgroundJob $job */
$job = $serializer->deserialize($json, $class, 'json');
$job->id = $id;
if ($siteId !== null) {
$job->site = Site::findOne(['id' => $siteId]);
}
if ($consultationId !== null) {
$job->consultation = Consultation::findOne(['id' => $consultationId]);
}

return $job;
}
}
Loading

0 comments on commit a40fb36

Please sign in to comment.