Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add cdc pattern example #369

Merged
merged 5 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions apps/backoffice/backend/src/BackofficeBackendKernel.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

use function dirname;

class BackofficeBackendKernel extends Kernel
final class BackofficeBackendKernel extends Kernel
rgomezcasas marked this conversation as resolved.
Show resolved Hide resolved
{
use MicroKernelTrait;

Expand All @@ -36,7 +36,7 @@ public function getProjectDir(): string
protected function configureContainer(ContainerBuilder $container, LoaderInterface $loader): void
{
$container->addResource(new FileResource($this->getProjectDir() . '/config/bundles.php'));
$container->setParameter('container.dumper.inline_class_loader', true);
$container->setParameter('.container.dumper.inline_class_loader', true);
$confDir = $this->getProjectDir() . '/config';

$loader->load($confDir . '/services' . self::CONFIG_EXTS, 'glob');
Expand Down
4 changes: 2 additions & 2 deletions apps/backoffice/frontend/src/BackofficeFrontendKernel.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

use function dirname;

class BackofficeFrontendKernel extends Kernel
final class BackofficeFrontendKernel extends Kernel
rgomezcasas marked this conversation as resolved.
Show resolved Hide resolved
{
use MicroKernelTrait;

Expand All @@ -36,7 +36,7 @@ public function getProjectDir(): string
protected function configureContainer(ContainerBuilder $container, LoaderInterface $loader): void
{
$container->addResource(new FileResource($this->getProjectDir() . '/config/bundles.php'));
$container->setParameter('container.dumper.inline_class_loader', true);
$container->setParameter('.container.dumper.inline_class_loader', true);
$confDir = $this->getProjectDir() . '/config';

$loader->load($confDir . '/services' . self::CONFIG_EXTS, 'glob');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@
use CodelyTv\Shared\Infrastructure\Bus\Event\DomainEventSubscriberLocator;
use CodelyTv\Shared\Infrastructure\Bus\Event\MySql\MySqlDoctrineDomainEventsConsumer;
use CodelyTv\Shared\Infrastructure\Doctrine\DatabaseConnections;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

use function Lambdish\Phunctional\pipe;

#[AsCommand(name: 'codely:domain-events:mysql:consume', description: 'Consume domain events from MySql',)]
final class ConsumeMySqlDomainEventsCommand extends Command
{
protected static $defaultName = 'codelytv:domain-events:mysql:consume';

public function __construct(
private readonly MySqlDoctrineDomainEventsConsumer $consumer,
private readonly DatabaseConnections $connections,
Expand All @@ -29,9 +29,7 @@ public function __construct(

protected function configure(): void
{
$this
->setDescription('Consume domain events from MySql')
->addArgument('quantity', InputArgument::REQUIRED, 'Quantity of events to process');
$this->addArgument('quantity', InputArgument::REQUIRED, 'Quantity of events to process');
}

protected function execute(InputInterface $input, OutputInterface $output): int
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<?php

declare(strict_types=1);

namespace CodelyTv\Apps\Mooc\Backend\Command\DomainEvents;

use CodelyTv\Mooc\Courses\Infrastructure\Cdc\DatabaseMutationToCourseCreatedDomainEvent;
use CodelyTv\Shared\Domain\Bus\Event\EventBus;
use CodelyTv\Shared\Infrastructure\Cdc\DatabaseMutationAction;
use CodelyTv\Shared\Infrastructure\Cdc\DatabaseMutationToDomainEvent;
use Doctrine\ORM\EntityManager;
use RuntimeException;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

#[AsCommand(
name: 'codely:domain-events:generate-from-mutations',
description: 'Publish domain events from mutations',
)]
final class PublishDomainEventsFromMutationsCommand extends Command
{
private array $transformers;

public function __construct(
private readonly EntityManager $entityManager,
private readonly EventBus $eventBus
) {
parent::__construct();

$this->transformers = [
'courses' => [
DatabaseMutationAction::INSERT->value => DatabaseMutationToCourseCreatedDomainEvent::class,
DatabaseMutationAction::UPDATE->value => null,
DatabaseMutationAction::DELETE->value => null,
],
];
}

protected function configure(): void
{
$this->addArgument('quantity', InputArgument::REQUIRED, 'Quantity of mutations to process');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$totalMutations = (int) $input->getArgument('quantity');

$this->entityManager->wrapInTransaction(function (EntityManager $entityManager) use ($totalMutations) {
$mutations = $entityManager->getConnection()
->executeQuery("SELECT * FROM mutations ORDER BY id ASC LIMIT $totalMutations FOR UPDATE")
->fetchAllAssociative();

foreach ($mutations as $mutation) {
$transformer = $this->findTransformer($mutation['table_name'], $mutation['operation']);

if ($transformer === null) {
echo sprintf("Ignoring %s %s\n", $mutation['table_name'], $mutation['operation']);
continue;
}

$domainEvents = $transformer->transform($mutation);

$this->eventBus->publish(...$domainEvents);
}

$entityManager->getConnection()->executeStatement(
sprintf('DELETE FROM mutations WHERE id IN (%s)', implode(',', array_column($mutations, 'id')))
);
});

return 0;
}

private function findTransformer(string $tableName, string $operation): ?DatabaseMutationToDomainEvent

Check failure on line 77 in apps/mooc/backend/src/Command/DomainEvents/PublishDomainEventsFromMutationsCommand.php

View workflow job for this annotation

GitHub Actions / build

MoreSpecificReturnType

apps/mooc/backend/src/Command/DomainEvents/PublishDomainEventsFromMutationsCommand.php:77:74: MoreSpecificReturnType: The declared return type 'CodelyTv\Shared\Infrastructure\Cdc\DatabaseMutationToDomainEvent|null' for CodelyTv\Apps\Mooc\Backend\Command\DomainEvents\PublishDomainEventsFromMutationsCommand::findTransformer is more specific than the inferred return type 'null|object' (see https://psalm.dev/070)
{
if (!array_key_exists($tableName, $this->transformers) && array_key_exists(
$operation,
$this->transformers[$tableName]
)) {
throw new RuntimeException("Transformer not found for table $tableName and operation $operation");
}

$class = $this->transformers[$tableName][$operation];

return $class ? new $class() : null;

Check failure on line 88 in apps/mooc/backend/src/Command/DomainEvents/PublishDomainEventsFromMutationsCommand.php

View workflow job for this annotation

GitHub Actions / build

LessSpecificReturnStatement

apps/mooc/backend/src/Command/DomainEvents/PublishDomainEventsFromMutationsCommand.php:88:10: LessSpecificReturnStatement: The type 'null|object' is more general than the declared return type 'CodelyTv\Shared\Infrastructure\Cdc\DatabaseMutationToDomainEvent|null' for CodelyTv\Apps\Mooc\Backend\Command\DomainEvents\PublishDomainEventsFromMutationsCommand::findTransformer (see https://psalm.dev/129)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@
namespace CodelyTv\Apps\Mooc\Backend\Command\DomainEvents\RabbitMq;

use CodelyTv\Shared\Infrastructure\Bus\Event\RabbitMq\RabbitMqConfigurer;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Traversable;

#[AsCommand(
name: 'codely:domain-events:rabbitmq:configure',
description: 'Configure the RabbitMQ to allow publish & consume domain events',
)]
final class ConfigureRabbitMqCommand extends Command
{
protected static $defaultName = 'codelytv:domain-events:rabbitmq:configure';

public function __construct(
private readonly RabbitMqConfigurer $configurer,
private readonly string $exchangeName,
Expand All @@ -22,11 +25,6 @@ public function __construct(
parent::__construct();
}

protected function configure(): void
{
$this->setDescription('Configure the RabbitMQ to allow publish & consume domain events');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->configurer->configure($this->exchangeName, ...iterator_to_array($this->subscribers));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,20 @@
use CodelyTv\Shared\Infrastructure\Bus\Event\DomainEventSubscriberLocator;
use CodelyTv\Shared\Infrastructure\Bus\Event\RabbitMq\RabbitMqDomainEventsConsumer;
use CodelyTv\Shared\Infrastructure\Doctrine\DatabaseConnections;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

use function Lambdish\Phunctional\repeat;

#[AsCommand(
name: 'codely:domain-events:rabbitmq:consume',
description: 'Consume domain events from the RabbitMQ',
)]
final class ConsumeRabbitMqDomainEventsCommand extends Command
{
protected static $defaultName = 'codelytv:domain-events:rabbitmq:consume';

public function __construct(
private readonly RabbitMqDomainEventsConsumer $consumer,
private readonly DatabaseConnections $connections,
Expand All @@ -29,7 +32,6 @@ public function __construct(
protected function configure(): void
{
$this
->setDescription('Consume domain events from the RabbitMQ')
->addArgument('queue', InputArgument::REQUIRED, 'Queue name')
->addArgument('quantity', InputArgument::REQUIRED, 'Quantity of events to process');
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,23 @@
use CodelyTv\Shared\Domain\Bus\Event\DomainEventSubscriber;
use CodelyTv\Shared\Infrastructure\Bus\Event\DomainEventSubscriberLocator;
use CodelyTv\Shared\Infrastructure\Bus\Event\RabbitMq\RabbitMqQueueNameFormatter;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

use function Lambdish\Phunctional\each;

#[AsCommand(
name: 'codely:domain-events:rabbitmq:generate-supervisor-files',
description: 'Generate the supervisor configuration for every RabbitMQ subscriber',
)]
final class GenerateSupervisorRabbitMqConsumerFilesCommand extends Command
{
private const EVENTS_TO_PROCESS_AT_TIME = 200;
private const NUMBERS_OF_PROCESSES_PER_SUBSCRIBER = 1;
private const SUPERVISOR_PATH = __DIR__ . '/../../../../build/supervisor';
protected static $defaultName = 'codelytv:domain-events:rabbitmq:generate-supervisor-files';

public function __construct(private readonly DomainEventSubscriberLocator $locator)
{
Expand All @@ -28,9 +32,7 @@ public function __construct(private readonly DomainEventSubscriberLocator $locat

protected function configure(): void
{
$this
->setDescription('Generate the supervisor configuration for every RabbitMQ subscriber')
->addArgument('command-path', InputArgument::OPTIONAL, 'Path on this is gonna be deployed', '/var/www');
$this->addArgument('command-path', InputArgument::OPTIONAL, 'Path on this is gonna be deployed', '/var/www');
}

protected function execute(InputInterface $input, OutputInterface $output): int
Expand Down Expand Up @@ -68,7 +70,7 @@ private function template(): string
{
return <<<EOF
[program:codelytv_{queue_name}]
command = {path}/apps/mooc/backend/bin/console codelytv:domain-events:rabbitmq:consume --env=prod {queue_name} {events_to_process}
command = {path}/apps/mooc/backend/bin/console codely:domain-events:rabbitmq:consume --env=prod {queue_name} {events_to_process}
process_name = %(program_name)s_%(process_num)02d
numprocs = {processes}
startsecs = 1
Expand Down
4 changes: 2 additions & 2 deletions apps/mooc/backend/src/MoocBackendKernel.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

use function dirname;

class MoocBackendKernel extends Kernel
final class MoocBackendKernel extends Kernel
rgomezcasas marked this conversation as resolved.
Show resolved Hide resolved
{
use MicroKernelTrait;

Expand All @@ -36,7 +36,7 @@ public function getProjectDir(): string
protected function configureContainer(ContainerBuilder $container, LoaderInterface $loader): void
{
$container->addResource(new FileResource($this->getProjectDir() . '/config/bundles.php'));
$container->setParameter('container.dumper.inline_class_loader', true);
$container->setParameter('.container.dumper.inline_class_loader', true);
$confDir = $this->getProjectDir() . '/config';

$loader->load($confDir . '/services' . self::CONFIG_EXTS, 'glob');
Expand Down
32 changes: 14 additions & 18 deletions ecs.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,20 @@
use Symplify\EasyCodingStandard\Config\ECSConfig;

return function (ECSConfig $ecsConfig): void {
$ecsConfig->paths([
__DIR__ . '/apps',
__DIR__ . '/src',
__DIR__ . '/tests',
]);
$ecsConfig->paths([__DIR__ . '/apps', __DIR__ . '/src', __DIR__ . '/tests', ]);

$ecsConfig->sets([CodingStyle::DEFAULT]);
$ecsConfig->sets([CodingStyle::DEFAULT]);

$ecsConfig->skip([
FinalClassFixer::class => [
__DIR__ . '/apps/backoffice/backend/src/BackofficeBackendKernel.php',
__DIR__ . '/apps/backoffice/frontend/src/BackofficeFrontendKernel.php',
__DIR__ . '/apps/mooc/backend/src/MoocBackendKernel.php',
__DIR__ . '/src/Shared/Infrastructure/Bus/Event/InMemory/InMemorySymfonyEventBus.php',
],
__DIR__ . '/apps/backoffice/backend/var',
__DIR__ . '/apps/backoffice/frontend/var',
__DIR__ . '/apps/mooc/backend/var',
__DIR__ . '/apps/mooc/frontend/var',
]);
$ecsConfig->skip([
FinalClassFixer::class => [
__DIR__ . '/apps/backoffice/backend/src/BackofficeBackendKernel.php',
__DIR__ . '/apps/backoffice/frontend/src/BackofficeFrontendKernel.php',
__DIR__ . '/apps/mooc/backend/src/MoocBackendKernel.php',
__DIR__ . '/src/Shared/Infrastructure/Bus/Event/InMemory/InMemorySymfonyEventBus.php',
],
__DIR__ . '/apps/backoffice/backend/var',
__DIR__ . '/apps/backoffice/frontend/var',
__DIR__ . '/apps/mooc/backend/var',
__DIR__ . '/apps/mooc/frontend/var',
]);
};
Loading
Loading