diff --git a/apps/backoffice/backend/src/BackofficeBackendKernel.php b/apps/backoffice/backend/src/BackofficeBackendKernel.php index 793ede8c0..6ed60b1be 100644 --- a/apps/backoffice/backend/src/BackofficeBackendKernel.php +++ b/apps/backoffice/backend/src/BackofficeBackendKernel.php @@ -36,7 +36,7 @@ public function getProjectDir(): string protected function configureContainer(ContainerBuilder $container, LoaderInterface $loader): void { $container->addResource(new FileResource($this->getProjectDir() . '/config/bundles.php')); - $container->setParameter('container.dumper.inline_class_loader', true); + $container->setParameter('.container.dumper.inline_class_loader', true); $confDir = $this->getProjectDir() . '/config'; $loader->load($confDir . '/services' . self::CONFIG_EXTS, 'glob'); diff --git a/apps/backoffice/frontend/src/BackofficeFrontendKernel.php b/apps/backoffice/frontend/src/BackofficeFrontendKernel.php index 28852a38a..dd793f7ab 100644 --- a/apps/backoffice/frontend/src/BackofficeFrontendKernel.php +++ b/apps/backoffice/frontend/src/BackofficeFrontendKernel.php @@ -36,7 +36,7 @@ public function getProjectDir(): string protected function configureContainer(ContainerBuilder $container, LoaderInterface $loader): void { $container->addResource(new FileResource($this->getProjectDir() . '/config/bundles.php')); - $container->setParameter('container.dumper.inline_class_loader', true); + $container->setParameter('.container.dumper.inline_class_loader', true); $confDir = $this->getProjectDir() . '/config'; $loader->load($confDir . '/services' . self::CONFIG_EXTS, 'glob'); diff --git a/apps/mooc/backend/src/Command/DomainEvents/MySql/ConsumeMySqlDomainEventsCommand.php b/apps/mooc/backend/src/Command/DomainEvents/MySql/ConsumeMySqlDomainEventsCommand.php index 359264a69..16d96c2a6 100644 --- a/apps/mooc/backend/src/Command/DomainEvents/MySql/ConsumeMySqlDomainEventsCommand.php +++ b/apps/mooc/backend/src/Command/DomainEvents/MySql/ConsumeMySqlDomainEventsCommand.php @@ -8,6 +8,7 @@ use CodelyTv\Shared\Infrastructure\Bus\Event\DomainEventSubscriberLocator; use CodelyTv\Shared\Infrastructure\Bus\Event\MySql\MySqlDoctrineDomainEventsConsumer; use CodelyTv\Shared\Infrastructure\Doctrine\DatabaseConnections; +use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; @@ -15,10 +16,9 @@ use function Lambdish\Phunctional\pipe; +#[AsCommand(name: 'codely:domain-events:mysql:consume', description: 'Consume domain events from MySql',)] final class ConsumeMySqlDomainEventsCommand extends Command { - protected static $defaultName = 'codelytv:domain-events:mysql:consume'; - public function __construct( private readonly MySqlDoctrineDomainEventsConsumer $consumer, private readonly DatabaseConnections $connections, @@ -29,9 +29,7 @@ public function __construct( protected function configure(): void { - $this - ->setDescription('Consume domain events from MySql') - ->addArgument('quantity', InputArgument::REQUIRED, 'Quantity of events to process'); + $this->addArgument('quantity', InputArgument::REQUIRED, 'Quantity of events to process'); } protected function execute(InputInterface $input, OutputInterface $output): int diff --git a/apps/mooc/backend/src/Command/DomainEvents/PublishDomainEventsFromMutationsCommand.php b/apps/mooc/backend/src/Command/DomainEvents/PublishDomainEventsFromMutationsCommand.php new file mode 100644 index 000000000..945784311 --- /dev/null +++ b/apps/mooc/backend/src/Command/DomainEvents/PublishDomainEventsFromMutationsCommand.php @@ -0,0 +1,91 @@ +transformers = [ + 'courses' => [ + DatabaseMutationAction::INSERT->value => DatabaseMutationToCourseCreatedDomainEvent::class, + DatabaseMutationAction::UPDATE->value => null, + DatabaseMutationAction::DELETE->value => null, + ], + ]; + } + + protected function configure(): void + { + $this->addArgument('quantity', InputArgument::REQUIRED, 'Quantity of mutations to process'); + } + + protected function execute(InputInterface $input, OutputInterface $output): int + { + $totalMutations = (int) $input->getArgument('quantity'); + + $this->entityManager->wrapInTransaction(function (EntityManager $entityManager) use ($totalMutations) { + $mutations = $entityManager->getConnection() + ->executeQuery("SELECT * FROM mutations ORDER BY id ASC LIMIT $totalMutations FOR UPDATE") + ->fetchAllAssociative(); + + foreach ($mutations as $mutation) { + $transformer = $this->findTransformer($mutation['table_name'], $mutation['operation']); + + if ($transformer === null) { + echo sprintf("Ignoring %s %s\n", $mutation['table_name'], $mutation['operation']); + continue; + } + + $domainEvents = $transformer->transform($mutation); + + $this->eventBus->publish(...$domainEvents); + } + + $entityManager->getConnection()->executeStatement( + sprintf('DELETE FROM mutations WHERE id IN (%s)', implode(',', array_column($mutations, 'id'))) + ); + }); + + return 0; + } + + private function findTransformer(string $tableName, string $operation): ?DatabaseMutationToDomainEvent + { + if (!array_key_exists($tableName, $this->transformers) && array_key_exists( + $operation, + $this->transformers[$tableName] + )) { + throw new RuntimeException("Transformer not found for table $tableName and operation $operation"); + } + + /** @var class-string|null $class */ + $class = $this->transformers[$tableName][$operation]; + + return $class ? new $class() : null; + } +} diff --git a/apps/mooc/backend/src/Command/DomainEvents/RabbitMq/ConfigureRabbitMqCommand.php b/apps/mooc/backend/src/Command/DomainEvents/RabbitMq/ConfigureRabbitMqCommand.php index e42c35ba6..72801af71 100644 --- a/apps/mooc/backend/src/Command/DomainEvents/RabbitMq/ConfigureRabbitMqCommand.php +++ b/apps/mooc/backend/src/Command/DomainEvents/RabbitMq/ConfigureRabbitMqCommand.php @@ -5,15 +5,18 @@ namespace CodelyTv\Apps\Mooc\Backend\Command\DomainEvents\RabbitMq; use CodelyTv\Shared\Infrastructure\Bus\Event\RabbitMq\RabbitMqConfigurer; +use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; use Traversable; +#[AsCommand( + name: 'codely:domain-events:rabbitmq:configure', + description: 'Configure the RabbitMQ to allow publish & consume domain events', +)] final class ConfigureRabbitMqCommand extends Command { - protected static $defaultName = 'codelytv:domain-events:rabbitmq:configure'; - public function __construct( private readonly RabbitMqConfigurer $configurer, private readonly string $exchangeName, @@ -22,11 +25,6 @@ public function __construct( parent::__construct(); } - protected function configure(): void - { - $this->setDescription('Configure the RabbitMQ to allow publish & consume domain events'); - } - protected function execute(InputInterface $input, OutputInterface $output): int { $this->configurer->configure($this->exchangeName, ...iterator_to_array($this->subscribers)); diff --git a/apps/mooc/backend/src/Command/DomainEvents/RabbitMq/ConsumeRabbitMqDomainEventsCommand.php b/apps/mooc/backend/src/Command/DomainEvents/RabbitMq/ConsumeRabbitMqDomainEventsCommand.php index 856f40b04..c29c056d8 100644 --- a/apps/mooc/backend/src/Command/DomainEvents/RabbitMq/ConsumeRabbitMqDomainEventsCommand.php +++ b/apps/mooc/backend/src/Command/DomainEvents/RabbitMq/ConsumeRabbitMqDomainEventsCommand.php @@ -7,6 +7,7 @@ use CodelyTv\Shared\Infrastructure\Bus\Event\DomainEventSubscriberLocator; use CodelyTv\Shared\Infrastructure\Bus\Event\RabbitMq\RabbitMqDomainEventsConsumer; use CodelyTv\Shared\Infrastructure\Doctrine\DatabaseConnections; +use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; @@ -14,10 +15,12 @@ use function Lambdish\Phunctional\repeat; +#[AsCommand( + name: 'codely:domain-events:rabbitmq:consume', + description: 'Consume domain events from the RabbitMQ', +)] final class ConsumeRabbitMqDomainEventsCommand extends Command { - protected static $defaultName = 'codelytv:domain-events:rabbitmq:consume'; - public function __construct( private readonly RabbitMqDomainEventsConsumer $consumer, private readonly DatabaseConnections $connections, @@ -29,7 +32,6 @@ public function __construct( protected function configure(): void { $this - ->setDescription('Consume domain events from the RabbitMQ') ->addArgument('queue', InputArgument::REQUIRED, 'Queue name') ->addArgument('quantity', InputArgument::REQUIRED, 'Quantity of events to process'); } diff --git a/apps/mooc/backend/src/Command/DomainEvents/RabbitMq/GenerateSupervisorRabbitMqConsumerFilesCommand.php b/apps/mooc/backend/src/Command/DomainEvents/RabbitMq/GenerateSupervisorRabbitMqConsumerFilesCommand.php index f806d57da..646392bf3 100644 --- a/apps/mooc/backend/src/Command/DomainEvents/RabbitMq/GenerateSupervisorRabbitMqConsumerFilesCommand.php +++ b/apps/mooc/backend/src/Command/DomainEvents/RabbitMq/GenerateSupervisorRabbitMqConsumerFilesCommand.php @@ -7,6 +7,7 @@ use CodelyTv\Shared\Domain\Bus\Event\DomainEventSubscriber; use CodelyTv\Shared\Infrastructure\Bus\Event\DomainEventSubscriberLocator; use CodelyTv\Shared\Infrastructure\Bus\Event\RabbitMq\RabbitMqQueueNameFormatter; +use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; @@ -14,12 +15,15 @@ use function Lambdish\Phunctional\each; +#[AsCommand( + name: 'codely:domain-events:rabbitmq:generate-supervisor-files', + description: 'Generate the supervisor configuration for every RabbitMQ subscriber', +)] final class GenerateSupervisorRabbitMqConsumerFilesCommand extends Command { private const EVENTS_TO_PROCESS_AT_TIME = 200; private const NUMBERS_OF_PROCESSES_PER_SUBSCRIBER = 1; private const SUPERVISOR_PATH = __DIR__ . '/../../../../build/supervisor'; - protected static $defaultName = 'codelytv:domain-events:rabbitmq:generate-supervisor-files'; public function __construct(private readonly DomainEventSubscriberLocator $locator) { @@ -28,9 +32,7 @@ public function __construct(private readonly DomainEventSubscriberLocator $locat protected function configure(): void { - $this - ->setDescription('Generate the supervisor configuration for every RabbitMQ subscriber') - ->addArgument('command-path', InputArgument::OPTIONAL, 'Path on this is gonna be deployed', '/var/www'); + $this->addArgument('command-path', InputArgument::OPTIONAL, 'Path on this is gonna be deployed', '/var/www'); } protected function execute(InputInterface $input, OutputInterface $output): int @@ -68,7 +70,7 @@ private function template(): string { return <<addResource(new FileResource($this->getProjectDir() . '/config/bundles.php')); - $container->setParameter('container.dumper.inline_class_loader', true); + $container->setParameter('.container.dumper.inline_class_loader', true); $confDir = $this->getProjectDir() . '/config'; $loader->load($confDir . '/services' . self::CONFIG_EXTS, 'glob'); diff --git a/ecs.php b/ecs.php index 3e87e64ce..e3ed961ba 100644 --- a/ecs.php +++ b/ecs.php @@ -7,24 +7,20 @@ use Symplify\EasyCodingStandard\Config\ECSConfig; return function (ECSConfig $ecsConfig): void { - $ecsConfig->paths([ - __DIR__ . '/apps', - __DIR__ . '/src', - __DIR__ . '/tests', - ]); + $ecsConfig->paths([__DIR__ . '/apps', __DIR__ . '/src', __DIR__ . '/tests', ]); - $ecsConfig->sets([CodingStyle::DEFAULT]); + $ecsConfig->sets([CodingStyle::DEFAULT]); - $ecsConfig->skip([ - FinalClassFixer::class => [ - __DIR__ . '/apps/backoffice/backend/src/BackofficeBackendKernel.php', - __DIR__ . '/apps/backoffice/frontend/src/BackofficeFrontendKernel.php', - __DIR__ . '/apps/mooc/backend/src/MoocBackendKernel.php', - __DIR__ . '/src/Shared/Infrastructure/Bus/Event/InMemory/InMemorySymfonyEventBus.php', - ], - __DIR__ . '/apps/backoffice/backend/var', - __DIR__ . '/apps/backoffice/frontend/var', - __DIR__ . '/apps/mooc/backend/var', - __DIR__ . '/apps/mooc/frontend/var', - ]); + $ecsConfig->skip([ + FinalClassFixer::class => [ + __DIR__ . '/apps/backoffice/backend/src/BackofficeBackendKernel.php', + __DIR__ . '/apps/backoffice/frontend/src/BackofficeFrontendKernel.php', + __DIR__ . '/apps/mooc/backend/src/MoocBackendKernel.php', + __DIR__ . '/src/Shared/Infrastructure/Bus/Event/InMemory/InMemorySymfonyEventBus.php', + ], + __DIR__ . '/apps/backoffice/backend/var', + __DIR__ . '/apps/backoffice/frontend/var', + __DIR__ . '/apps/mooc/backend/var', + __DIR__ . '/apps/mooc/frontend/var', + ]); }; diff --git a/etc/databases/mooc.sql b/etc/databases/mooc.sql index e236cbc46..a999ef78e 100644 --- a/etc/databases/mooc.sql +++ b/etc/databases/mooc.sql @@ -1,31 +1,95 @@ +/* ------------------------- + MOOC CONTEXT +---------------------------- */ + +-- Generic tables + +CREATE TABLE `mutations` ( + `id` BIGINT AUTO_INCREMENT PRIMARY KEY, + `table_name` VARCHAR(255) NOT NULL, + `operation` ENUM ('INSERT', 'UPDATE', 'DELETE') NOT NULL, + `old_value` JSON NULL, + `new_value` JSON NULL, + `mutation_timestamp` TIMESTAMP DEFAULT CURRENT_TIMESTAMP +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_unicode_ci; + +CREATE TABLE `domain_events` ( + `id` CHAR(36) NOT NULL, + `aggregate_id` CHAR(36) NOT NULL, + `name` VARCHAR(255) NOT NULL, + `body` JSON NOT NULL, + `occurred_on` TIMESTAMP NOT NULL, + PRIMARY KEY (`id`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_unicode_ci; + +-- Aggregates tables + CREATE TABLE `courses` ( - `id` CHAR(36) NOT NULL, - `name` VARCHAR(255) NOT NULL, - `duration` VARCHAR(255) NOT NULL, - PRIMARY KEY (`id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; + `id` CHAR(36) NOT NULL, + `name` VARCHAR(255) NOT NULL, + `duration` VARCHAR(255) NOT NULL, + PRIMARY KEY (`id`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_unicode_ci; + +CREATE TRIGGER after_courses_insert + AFTER INSERT + ON courses + FOR EACH ROW +BEGIN + INSERT INTO mutations (table_name, operation, new_value, mutation_timestamp) + VALUES ('courses', 'INSERT', JSON_OBJECT('id', new.id, 'name', new.name, 'duration', new.duration), NOW()); +END; + +CREATE TRIGGER after_courses_update + AFTER UPDATE + ON courses + FOR EACH ROW +BEGIN + INSERT INTO mutations (table_name, operation, old_value, new_value, mutation_timestamp) + VALUES ('courses', + 'UPDATE', + JSON_OBJECT('id', old.id, 'name', old.name, 'duration', old.duration), + JSON_OBJECT('id', new.id, 'name', new.name, 'duration', new.duration), + NOW()); +END; + +CREATE TRIGGER after_courses_delete + AFTER DELETE + ON courses + FOR EACH ROW +BEGIN + INSERT INTO mutations (table_name, operation, old_value, mutation_timestamp) + VALUES ('courses', 'DELETE', JSON_OBJECT('id', old.id, 'name', old.name, 'duration', old.duration), NOW()); +END; CREATE TABLE `courses_counter` ( - `id` CHAR(36) NOT NULL, - `total` INT NOT NULL, - `existing_courses` JSON NOT NULL, - PRIMARY KEY (`id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; + `id` CHAR(36) NOT NULL, + `total` INT NOT NULL, + `existing_courses` JSON NOT NULL, + PRIMARY KEY (`id`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_unicode_ci; -INSERT INTO `courses_counter` VALUES ("cdf26d7d-3deb-4e8c-9f73-4ac085a8d6f3", 0, "[]"); +INSERT INTO `courses_counter` +VALUES ("cdf26d7d-3deb-4e8c-9f73-4ac085a8d6f3", 0, "[]"); -CREATE TABLE `domain_events` ( - `id` CHAR(36) NOT NULL, - `aggregate_id` CHAR(36) NOT NULL, - `name` VARCHAR(255) NOT NULL, - `body` JSON NOT NULL, - `occurred_on` timestamp NOT NULL, - PRIMARY KEY (`id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; + +/* ------------------------- + BACKOFFICE CONTEXT +---------------------------- */ CREATE TABLE `backoffice_courses` ( - `id` CHAR(36) NOT NULL, - `name` VARCHAR(255) NOT NULL, - `duration` VARCHAR(255) NOT NULL, - PRIMARY KEY (`id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; + `id` CHAR(36) NOT NULL, + `name` VARCHAR(255) NOT NULL, + `duration` VARCHAR(255) NOT NULL, + PRIMARY KEY (`id`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_unicode_ci; diff --git a/src/Mooc/Courses/Infrastructure/Cdc/DatabaseMutationToCourseCreatedDomainEvent.php b/src/Mooc/Courses/Infrastructure/Cdc/DatabaseMutationToCourseCreatedDomainEvent.php new file mode 100644 index 000000000..59f78fe54 --- /dev/null +++ b/src/Mooc/Courses/Infrastructure/Cdc/DatabaseMutationToCourseCreatedDomainEvent.php @@ -0,0 +1,40 @@ + $id->value(), $value), $platform); } - public function convertToPHPValue($value, AbstractPlatform $platform) + public function convertToPHPValue($value, AbstractPlatform $platform): array { $scalars = parent::convertToPHPValue($value, $platform); diff --git a/src/Shared/Infrastructure/Cdc/DatabaseMutationAction.php b/src/Shared/Infrastructure/Cdc/DatabaseMutationAction.php new file mode 100644 index 000000000..fdad67057 --- /dev/null +++ b/src/Shared/Infrastructure/Cdc/DatabaseMutationAction.php @@ -0,0 +1,12 @@ +setMetadataDriverImpl(new SimplifiedXmlDriver(array_merge(self::$sharedPrefixes, $contextPrefixes))); + $config->setSchemaManagerFactory(new DefaultSchemaManagerFactory()); return $config; } diff --git a/src/Shared/Infrastructure/Persistence/Doctrine/UuidType.php b/src/Shared/Infrastructure/Persistence/Doctrine/UuidType.php index f90ebbe32..1ef8213bc 100644 --- a/src/Shared/Infrastructure/Persistence/Doctrine/UuidType.php +++ b/src/Shared/Infrastructure/Persistence/Doctrine/UuidType.php @@ -26,14 +26,14 @@ final public function getName(): string return self::customTypeName(); } - final public function convertToPHPValue($value, AbstractPlatform $platform) + final public function convertToPHPValue($value, AbstractPlatform $platform): mixed { $className = $this->typeClassName(); return new $className($value); } - final public function convertToDatabaseValue($value, AbstractPlatform $platform) + final public function convertToDatabaseValue($value, AbstractPlatform $platform): string { /** @var Uuid $value */ return $value->value();