Skip to content

Commit

Permalink
Feature feature changes (#85)
Browse files Browse the repository at this point in the history
* Search by ID.
* Search by mainFileId, authorIds and createdByIds
* Increased mime type DB size.
* Elastic updates
* Job processor updates.
* Changed IdentityTrait to IdentityIntTrait, fixed locking async processing.
* Resource locking update.
* String helper
  • Loading branch information
TomasHermanek authored Oct 7, 2024
1 parent 0b22c59 commit 939e059
Show file tree
Hide file tree
Showing 24 changed files with 285 additions and 77 deletions.
31 changes: 16 additions & 15 deletions src/Domain/AssetFile/AbstractAssetFileStatusFacade.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
use AnzuSystems\CoreDamBundle\Exception\AssetFileProcessFailed;
use AnzuSystems\CoreDamBundle\Exception\DuplicateAssetFileException;
use AnzuSystems\CoreDamBundle\Exception\ForbiddenOperationException;
use AnzuSystems\CoreDamBundle\Exception\RuntimeException;
use AnzuSystems\CoreDamBundle\Logger\DamLogger;
use AnzuSystems\CoreDamBundle\Messenger\Message\AssetRefreshPropertiesMessage;
use AnzuSystems\CoreDamBundle\Model\Dto\Asset\AssetAdmFinishDto;
Expand All @@ -37,6 +36,7 @@
use Doctrine\ORM\NonUniqueResultException;
use League\Flysystem\FilesystemException;
use Psr\Cache\InvalidArgumentException;
use RuntimeException;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
use Symfony\Contracts\Service\Attribute\Required;
use Throwable;
Expand Down Expand Up @@ -196,18 +196,27 @@ public function finishUpload(AssetAdmFinishDto $assetFinishDto, AssetFile $asset
*/
public function storeAndProcess(AssetFile $assetFile, ?AdapterFile $file = null, bool $dispatchPropertyRefresh = true): AssetFile
{
$lockName = $assetFile->getAssetType()->value . '_' . $assetFile->getLicence()->getId();

try {
if ($assetFile->getAssetAttributes()->getStatus()->is(AssetFileProcessStatus::Uploaded)) {
$file = $this->store($assetFile, $file);
$file = $file ?: $this->createFile($assetFile);
$this->fileAttributesPostProcessor->processAttributes($assetFile, $file);
$this->fileAttributesPostProcessor->processChecksum($assetFile, $file);
// we need to lock process due to duplicity checks
$this->resourceLocker->lock($lockName);
$this->store($assetFile, $file);
}
if (null === $file) {
throw new RuntimeException(sprintf('AssetFile (%s) cant be processed without file', $assetFile->getId()));
}
if ($assetFile->getAssetAttributes()->getStatus()->is(AssetFileProcessStatus::Stored)) {
$this->chunkFileManager->clearChunks($assetFile);
$this->process($assetFile, $file, $dispatchPropertyRefresh);
$this->resourceLocker->unLock($lockName);
}
} catch (DuplicateAssetFileException $duplicateAssetFileException) {
$this->resourceLocker->unLock($lockName);
$assetFile->getAssetAttributes()->setOriginAssetId(
(string) $duplicateAssetFileException->getOldAsset()->getId()
);
Expand All @@ -218,19 +227,23 @@ public function storeAndProcess(AssetFile $assetFile, ?AdapterFile $file = null,
$this->assetStatusManager->toDuplicate($assetFile);
$this->assetFileEventDispatcher->dispatchAssetFileChanged($assetFile);
} catch (AssetFileProcessFailed $assetFileProcessFailed) {
$this->resourceLocker->unLock($lockName);
$this->assetStatusManager->toFailed(
$assetFile,
$assetFileProcessFailed->getAssetFileFailedType(),
$assetFileProcessFailed
);
$this->assetFileEventDispatcher->dispatchAssetFileChanged($assetFile);
} catch (Throwable $exception) {
$this->resourceLocker->unLock($lockName);
$this->assetStatusManager->toFailed(
$assetFile,
AssetFileFailedType::Unknown,
$exception
);
$this->assetFileEventDispatcher->dispatchAssetFileChanged($assetFile);
} finally {
$this->resourceLocker->unLock($lockName);
}

return $assetFile;
Expand All @@ -244,20 +257,10 @@ public function storeAndProcess(AssetFile $assetFile, ?AdapterFile $file = null,
* @throws TransportExceptionInterface
* @throws Throwable
*/
public function store(AssetFile $assetFile, ?AdapterFile $file = null): AdapterFile
public function store(AssetFile $assetFile, AdapterFile $file): AdapterFile
{
$file = $file ?: $this->createFile($assetFile);

$this->fileAttributesPostProcessor->processAttributes($assetFile, $file);
$this->fileAttributesPostProcessor->processChecksum($assetFile, $file);

$lockName = $assetFile->getAssetType()->value . '_' . $assetFile->getLicence()->getId();
$this->resourceLocker->lock($lockName);

$originAssetFile = $this->checkDuplicate($assetFile);
if ($originAssetFile) {
$this->resourceLocker->unLock($lockName);

throw new DuplicateAssetFileException(
oldAsset: $originAssetFile,
newAsset: $assetFile
Expand All @@ -269,9 +272,7 @@ public function store(AssetFile $assetFile, ?AdapterFile $file = null): AdapterF
$this->assetFileStorageOperator->save($assetFile, $file);
$this->assetStatusManager->toStored($assetFile);
$this->assetManager->commit();
$this->resourceLocker->unLock($lockName);
} catch (Throwable $exception) {
$this->resourceLocker->unLock($lockName);
$this->assetManager->rollback();

throw $exception;
Expand Down
2 changes: 1 addition & 1 deletion src/Domain/AssetFile/AssetFileStatusInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#[AutoconfigureTag]
interface AssetFileStatusInterface
{
public function store(AssetFile $assetFile, ?AdapterFile $file): AdapterFile;
public function store(AssetFile $assetFile, AdapterFile $file): AdapterFile;

public function process(AssetFile $assetFile, AdapterFile $file, bool $dispatchPropertyRefresh): AssetFile;

Expand Down
2 changes: 1 addition & 1 deletion src/Domain/AssetMetadata/AssetMetadataProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,6 @@ private function provideCommonMetadata(array $rawMetadata, array $allowedMetadat

private function parseValue(string $value): string
{
return htmlspecialchars(strip_tags($value));
return strip_tags($value);
}
}
29 changes: 25 additions & 4 deletions src/Domain/Job/Processor/JobPodcastSynchronizerProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use AnzuSystems\CoreDamBundle\Model\ValueObject\PodcastSynchronizerPointer;
use AnzuSystems\CoreDamBundle\Repository\PodcastRepository;
use AnzuSystems\SerializerBundle\Exception\SerializerException;
use DateTimeImmutable;
use DateTimeInterface;
use Generator;

Expand All @@ -28,9 +29,17 @@ public function __construct(
private readonly PodcastImportIterator $importIterator,
private readonly PodcastRepository $podcastRepository,
private int $bulkSize = self::BULK_SIZE,
private ?DateTimeImmutable $minImportFrom = null
) {
}

public function setMinImportFrom(?DateTimeImmutable $minImportFrom): self
{
$this->minImportFrom = $minImportFrom;

return $this;
}

public function setBulkSize(int $bulkSize): self
{
$this->bulkSize = $bulkSize;
Expand All @@ -45,15 +54,26 @@ public static function getSupportedJob(): string

/**
* @param JobPodcastSynchronizer $job
*
* @throws SerializerException
*/
public function process(JobInterface $job): void
{
$this->start($job);
$this->processPodcasts($job);
}

/**
* @throws SerializerException
*/
private function processPodcasts(JobPodcastSynchronizer $job): void
{
if ($job->isFullSync()) {
$this->importFull(
job: $job,
generator: $this->importIterator->iterate(PodcastSynchronizerPointer::fromString($job->getLastBatchProcessedRecord()))
generator: $this->importIterator->iterate(
pointer: PodcastSynchronizerPointer::fromString($job->getLastBatchProcessedRecord()),
minImportFrom: $this->minImportFrom
)
);

return;
Expand All @@ -72,8 +92,9 @@ public function process(JobInterface $job): void
job: $job,
generator: $this->importIterator->iteratePodcast(
pointer: PodcastSynchronizerPointer::fromString($job->getLastBatchProcessedRecord()),
podcastToImport: $podcast
)
podcastToImport: $podcast,
minImportFrom: $this->minImportFrom
),
);
}
}
Expand Down
33 changes: 17 additions & 16 deletions src/Domain/Podcast/PodcastImportIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace AnzuSystems\CoreDamBundle\Domain\Podcast;

use AnzuSystems\CoreDamBundle\App;
use AnzuSystems\CoreDamBundle\Entity\Podcast;
use AnzuSystems\CoreDamBundle\Exception\InvalidArgumentException;
use AnzuSystems\CoreDamBundle\HttpClient\RssClient;
Expand All @@ -16,13 +17,15 @@
use DateTimeImmutable;
use Generator;

final readonly class PodcastImportIterator
final class PodcastImportIterator
{
private const string MIN_IMPORT_FROM_MODIFIER = '- 3 months';

public function __construct(
private RssClient $client,
private PodcastRssReader $reader,
private PodcastRepository $podcastRepository,
private DamLogger $damLogger,
private readonly RssClient $client,
private readonly PodcastRssReader $reader,
private readonly PodcastRepository $podcastRepository,
private readonly DamLogger $damLogger,
) {
}

Expand All @@ -31,15 +34,15 @@ public function __construct(
*
* @throws SerializerException
*/
public function iterate(PodcastSynchronizerPointer $pointer): Generator
public function iterate(PodcastSynchronizerPointer $pointer, ?DateTimeImmutable $minImportFrom = null): Generator
{
$podcastToImport = $this->getPodcastToImport($pointer);
if (null === $podcastToImport) {
return;
}

while ($podcastToImport) {
foreach ($this->iteratePodcast($pointer, $podcastToImport) as $item) {
foreach ($this->iteratePodcast($pointer, $podcastToImport, $minImportFrom) as $item) {
yield $item;
}

Expand All @@ -58,7 +61,7 @@ public function iterate(PodcastSynchronizerPointer $pointer): Generator
*
* @throws SerializerException
*/
public function iteratePodcast(PodcastSynchronizerPointer $pointer, Podcast $podcastToImport): Generator
public function iteratePodcast(PodcastSynchronizerPointer $pointer, Podcast $podcastToImport, ?DateTimeImmutable $minImportFrom = null): Generator
{
try {
$this->reader->initReader($this->client->readPodcastRss($podcastToImport));
Expand All @@ -74,7 +77,7 @@ public function iteratePodcast(PodcastSynchronizerPointer $pointer, Podcast $pod

return;
}
$startFromDate = $this->getImportFrom($pointer, $podcastToImport);
$startFromDate = $this->getImportFrom($pointer, $minImportFrom);

foreach ($this->reader->readItems($startFromDate) as $podcastItem) {
yield new PodcastImportIteratorDto(
Expand All @@ -85,18 +88,16 @@ public function iteratePodcast(PodcastSynchronizerPointer $pointer, Podcast $pod
}
}

private function getImportFrom(PodcastSynchronizerPointer $pointer, Podcast $podcast): ?DateTimeImmutable
private function getImportFrom(PodcastSynchronizerPointer $pointer, ?DateTimeImmutable $minImportFrom): ?DateTimeImmutable
{
if (null === $podcast->getDates()->getImportFrom()) {
return $pointer->getPubDate();
}
$minImportFrom = $minImportFrom ?? App::getAppDate()->modify(self::MIN_IMPORT_FROM_MODIFIER);

if (null === $pointer->getPubDate()) {
return $podcast->getDates()->getImportFrom();
return $minImportFrom;
}

return $podcast->getDates()->getImportFrom() > $pointer->getPubDate()
? $podcast->getDates()->getImportFrom()
return $minImportFrom > $pointer->getPubDate()
? $minImportFrom
: $pointer->getPubDate();
}

Expand Down
4 changes: 3 additions & 1 deletion src/Domain/Podcast/PodcastRssReader.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

final class PodcastRssReader
{
public const string RSS_DATE_FORMAT = 'D, d M Y H:i:s T';
private const string ITUNES_KEY_KEY = 'itunes';

private SimpleXMLElement $body;
Expand Down Expand Up @@ -91,6 +92,7 @@ public function readItems(?DateTimeImmutable $from = null): Generator
{
foreach (array_reverse($this->body->channel?->xpath('item') ?? []) as $item) {
$item = $this->readItem($item);

if ($item->getPubDate() && $from && $from > $item->getPubDate()) {
continue;
}
Expand Down Expand Up @@ -145,7 +147,7 @@ private function getPublicationDate(SimpleXMLElement $element): ?DateTimeImmutab
{
$publicationDateString = (string) $element->pubDate;
$publicationDate = DateTimeImmutable::createFromFormat(
'D, d M Y H:i:s T',
self::RSS_DATE_FORMAT,
$publicationDateString,
);

Expand Down
9 changes: 4 additions & 5 deletions src/Elasticsearch/IndexFactory/AssetIndexFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
use AnzuSystems\CoreDamBundle\Entity\DocumentFile;
use AnzuSystems\CoreDamBundle\Entity\ImageFile;
use AnzuSystems\CoreDamBundle\Entity\Interfaces\ExtSystemIndexableInterface;
use AnzuSystems\CoreDamBundle\Entity\Keyword;
use AnzuSystems\CoreDamBundle\Entity\PodcastEpisode;
use AnzuSystems\CoreDamBundle\Entity\VideoFile;
use AnzuSystems\CoreDamBundle\Helper\CollectionHelper;
Expand Down Expand Up @@ -48,14 +47,13 @@ public function buildFromEntity(ExtSystemIndexableInterface $entity): array

return [
'id' => $entity->getId(),
'mainFileId' => $entity->getMainFile()?->getId(),
'fileIds' => array_values(CollectionHelper::traversableToIds(
$entity->getSlots(),
fn (AssetSlot $slot): string => (string) $slot->getAssetFile()->getId()
)),
'keywordIds' => array_values(CollectionHelper::traversableToIds(
$entity->getKeywords(),
fn (Keyword $keyword): string => (string) $keyword->getId()
)),
'keywordIds' => array_values(CollectionHelper::traversableToIds($entity->getKeywords())),
'authorIds' => array_values(CollectionHelper::traversableToIds($entity->getAuthors())),
'type' => $entity->getAttributes()->getAssetType()->toString(),
'status' => $entity->getAttributes()->getStatus(),
'described' => $entity->getAssetFlags()->isDescribed(),
Expand All @@ -64,6 +62,7 @@ public function buildFromEntity(ExtSystemIndexableInterface $entity): array
'generatedBySystem' => $entity->getAssetFlags()->isGeneratedBySystem(),
'modifiedAt' => $entity->getModifiedAt()->getTimestamp(),
'createdAt' => $entity->getCreatedAt()->getTimestamp(),
'createdById' => $entity->getCreatedBy()->getId(),
'licence' => $entity->getLicence()->getId(),
'distributedInServices' => array_values($entity->getAssetFileProperties()->getDistributesInServices()),
'slotNames' => array_values($entity->getAssetFileProperties()->getSlotNames()),
Expand Down
Loading

0 comments on commit 939e059

Please sign in to comment.