From b036bac563ed0f67ac40496c7312abbc6faea0b0 Mon Sep 17 00:00:00 2001 From: Serhii Zhuravel <1804871+dxops@users.noreply.github.com> Date: Fri, 30 Dec 2022 14:04:54 +0200 Subject: [PATCH] Products being disabled with since N days filters (#80) --- .../DataConverter/ProductDataConverter.php | 27 ------ .../Processor/ProductVariantProcessor.php | 34 ++++++-- .../Strategy/ProductImportStrategy.php | 32 +++++++ ImportExport/Writer/AsyncWriter.php | 21 ++++- .../Writer/ConfigurableAsyncWriter.php | 83 ++++++++++++++++--- Integration/AkeneoTransport.php | 41 +++++---- Integration/AkeneoTransportInterface.php | 4 +- .../ConfigurableProductConnector.php | 40 ++++++--- .../Iterator/ConfigurableProductIterator.php | 1 - Resources/config/batch_jobs.yml | 4 +- Resources/config/importexport.yml | 6 ++ Resources/config/services.yml | 5 ++ 12 files changed, 213 insertions(+), 85 deletions(-) diff --git a/ImportExport/DataConverter/ProductDataConverter.php b/ImportExport/DataConverter/ProductDataConverter.php index a16a50ce..ddf929f7 100644 --- a/ImportExport/DataConverter/ProductDataConverter.php +++ b/ImportExport/DataConverter/ProductDataConverter.php @@ -100,18 +100,6 @@ private function setStatus(array &$importedRecord) { $importedRecord['status'] = empty($importedRecord['enabled']) ? Product::STATUS_DISABLED : Product::STATUS_ENABLED; - - if (!empty($importedRecord['family_variant'])) { - $importedRecord['status'] = Product::STATUS_DISABLED; - - return; - } - - if (!empty($importedRecord['parent'])) { - $importedRecord['status'] = Product::STATUS_DISABLED; - - return; - } } private function setPrimaryUnitPrecision(array &$importedRecord): void @@ -182,22 +170,7 @@ private function setFamilyVariant(array &$importedRecord) return; } - $importedRecord['status'] = Product::STATUS_ENABLED; $importedRecord['variantFields'] = implode(',', $variantFields); - - if ($isTwoLevelFamilyVariant) { - $allowSecondProductOnly = $this->getTransport()->getAkeneoVariantLevels() === - AkeneoSettings::TWO_LEVEL_FAMILY_VARIANT_SECOND_ONLY; - if ($isFirstLevelProduct && $allowSecondProductOnly) { - $importedRecord['status'] = Product::STATUS_DISABLED; - } - - $allowFirstProductOnly = $this->getTransport()->getAkeneoVariantLevels() === - AkeneoSettings::TWO_LEVEL_FAMILY_VARIANT_FIRST_ONLY; - if ($isSecondLevelProduct && $allowFirstProductOnly) { - $importedRecord['status'] = Product::STATUS_DISABLED; - } - } } private function processValues(array &$importedRecord) diff --git a/ImportExport/Processor/ProductVariantProcessor.php b/ImportExport/Processor/ProductVariantProcessor.php index 0f68da10..3d82c2b7 100644 --- a/ImportExport/Processor/ProductVariantProcessor.php +++ b/ImportExport/Processor/ProductVariantProcessor.php @@ -97,7 +97,7 @@ function ($variantSku) { $variantSkus ); - $variantSkusUppercase = array_combine($variantSkusUppercase, $items); + $variantSkusUppercase = array_combine($variantSkusUppercase, $variantSkusUppercase); foreach ($parentProduct->getVariantLinks() as $variantLink) { $variantProduct = $variantLink->getProduct(); if (!$variantSkusUppercase) { @@ -118,14 +118,10 @@ function ($variantSku) { continue; } - $variantItem = $variantSkusUppercase[$variantProduct->getSkuUppercase()]; - $status = empty($variantItem['enabled']) ? Product::STATUS_DISABLED : Product::STATUS_ENABLED; - $variantProduct->setStatus($status); - unset($variantSkusUppercase[$variantProduct->getSkuUppercase()]); } - foreach ($variantSkusUppercase as $variantSku => $variantItem) { + foreach ($variantSkusUppercase as $variantSku) { $variantProduct = $productRepository->findOneBySku($variantSku); if (!$variantProduct instanceof Product) { $context->incrementErrorEntriesCount(); @@ -156,9 +152,6 @@ function ($variantSku) { $variantProduct->addParentVariantLink($variantLink); $parentProduct->addVariantLink($variantLink); - $status = empty($variantItem['enabled']) ? Product::STATUS_DISABLED : Product::STATUS_ENABLED; - $variantProduct->setStatus($status); - $context->incrementAddCount(); $objectManager->persist($variantLink); @@ -190,6 +183,18 @@ function ($variantSku) { } $parentProduct->setStatus(Product::STATUS_DISABLED); + foreach ($parentProduct->getVariantLinks() as $variantLink) { + $variantProduct = $variantLink->getProduct(); + if ($variantProduct instanceof Product) { + $variantProduct->setStatus(Product::STATUS_DISABLED); + } + } + foreach ($variantSkusUppercase as $variantSku) { + $variantProduct = $productRepository->findOneBySku($variantSku); + if ($variantProduct instanceof Product) { + $variantProduct->setStatus(Product::STATUS_DISABLED); + } + } return $parentProduct; } @@ -214,9 +219,20 @@ function ($variantSku) { ] ) ); + + return $parentProduct; } $context->incrementUpdateCount(); + $parentProduct->setStatus(Product::STATUS_ENABLED); + + foreach ($items as $item) { + if (!empty($item['parent_disabled'])) { + $parentProduct->setStatus(Product::STATUS_DISABLED); + } + + break; + } return $parentProduct; } diff --git a/ImportExport/Strategy/ProductImportStrategy.php b/ImportExport/Strategy/ProductImportStrategy.php index b43b3e60..3d517994 100644 --- a/ImportExport/Strategy/ProductImportStrategy.php +++ b/ImportExport/Strategy/ProductImportStrategy.php @@ -36,6 +36,20 @@ public function close() parent::close(); } + protected function beforeProcessEntity($entity) + { + /** @var Product $entity */ + if ($entity->isConfigurable()) { + /** @var Product $existingProduct */ + $existingProduct = $this->findExistingEntity($entity); + if ($existingProduct) { + $entity->setStatus($existingProduct->getStatus()); + } + } + + return parent::beforeProcessEntity($entity); + } + protected function afterProcessEntity($entity) { if ($entity instanceof Product && $entity->getCategory() && !$entity->getCategory()->getId()) { @@ -74,6 +88,24 @@ protected function afterProcessEntity($entity) return parent::afterProcessEntity($entity); } + protected function validateAndUpdateContext($entity) + { + $validationErrors = $this->strategyHelper->validateEntity($entity); + if ($validationErrors) { + $this->processValidationErrors($entity, $validationErrors); + + /** @var Product $entity */ + $entity = $this->findExistingEntity($entity); + $entity->setStatus(Product::STATUS_DISABLED); + + return $entity; + } + + $this->updateContextCounters($entity); + + return $entity; + } + protected function populateOwner(Product $entity) { } diff --git a/ImportExport/Writer/AsyncWriter.php b/ImportExport/Writer/AsyncWriter.php index 28b52450..2fec098f 100644 --- a/ImportExport/Writer/AsyncWriter.php +++ b/ImportExport/Writer/AsyncWriter.php @@ -6,9 +6,9 @@ use Doctrine\DBAL\Types\Types; use Oro\Bundle\AkeneoBundle\Async\Topics; use Oro\Bundle\AkeneoBundle\EventListener\AdditionalOptionalListenerManager; +use Oro\Bundle\AkeneoBundle\Tools\CacheProviderTrait; use Oro\Bundle\BatchBundle\Entity\StepExecution; use Oro\Bundle\BatchBundle\Item\ItemWriterInterface; -use Oro\Bundle\BatchBundle\Item\Support\ClosableInterface; use Oro\Bundle\BatchBundle\Step\StepExecutionAwareInterface; use Oro\Bundle\EntityBundle\ORM\DoctrineHelper; use Oro\Bundle\IntegrationBundle\Entity\FieldsChanges; @@ -21,9 +21,10 @@ class AsyncWriter implements ItemWriterInterface, - ClosableInterface, StepExecutionAwareInterface { + use CacheProviderTrait; + /** @var MessageProducerInterface * */ private $messageProducer; @@ -42,6 +43,8 @@ class AsyncWriter implements /** @var AdditionalOptionalListenerManager */ private $additionalOptionalListenerManager; + private $configurable = []; + public function __construct( MessageProducerInterface $messageProducer, DoctrineHelper $doctrineHelper, @@ -76,6 +79,13 @@ public function write(array $items) $this->size = $newSize; $this->stepExecution->setWriteCount($this->size); + foreach ($items as $item) { + $this->configurable[$item['identifier'] ?? $item['code']] = true; + if (!empty($item['parent'])) { + $this->configurable[$item['parent']] = true; + } + } + $jobId = $this->insertJob($jobName); if ($jobId && $this->createFieldsChanges($jobId, $items, 'items')) { $this->sendMessage($channelId, $jobId, true); @@ -136,8 +146,13 @@ private function getRootJob(): ?int return (int)$rootJobId; } - public function close() + public function flush() { + if ($this->configurable) { + $this->cacheProvider->save('akeneo_configurable', $this->configurable); + } + + $this->configurable = []; $this->size = 0; $this->optionalListenerManager->enableListeners($this->optionalListenerManager->getListeners()); diff --git a/ImportExport/Writer/ConfigurableAsyncWriter.php b/ImportExport/Writer/ConfigurableAsyncWriter.php index 25532b8c..10363b04 100644 --- a/ImportExport/Writer/ConfigurableAsyncWriter.php +++ b/ImportExport/Writer/ConfigurableAsyncWriter.php @@ -5,7 +5,9 @@ use Doctrine\DBAL\Platforms\MySqlPlatform; use Doctrine\DBAL\Types\Types; use Oro\Bundle\AkeneoBundle\Async\Topics; +use Oro\Bundle\AkeneoBundle\Entity\AkeneoSettings; use Oro\Bundle\AkeneoBundle\EventListener\AdditionalOptionalListenerManager; +use Oro\Bundle\AkeneoBundle\Tools\CacheProviderTrait; use Oro\Bundle\BatchBundle\Entity\StepExecution; use Oro\Bundle\BatchBundle\Item\ItemWriterInterface; use Oro\Bundle\BatchBundle\Step\StepExecutionAwareInterface; @@ -17,11 +19,14 @@ use Oro\Component\MessageQueue\Client\Message; use Oro\Component\MessageQueue\Client\MessagePriority; use Oro\Component\MessageQueue\Client\MessageProducerInterface; +use Symfony\Contracts\Cache\CacheInterface; class ConfigurableAsyncWriter implements ItemWriterInterface, StepExecutionAwareInterface { + use CacheProviderTrait; + private const VARIANTS_BATCH_SIZE = 25; /** @var MessageProducerInterface * */ @@ -45,6 +50,16 @@ class ConfigurableAsyncWriter implements private $models = []; + private $configurable = []; + + /** @var CacheInterface */ + private $cache; + + public function setCache(CacheInterface $cache): void + { + $this->cache = $cache; + } + public function __construct( MessageProducerInterface $messageProducer, DoctrineHelper $doctrineHelper, @@ -59,16 +74,30 @@ public function __construct( public function initialize() { - $this->variants = []; - $this->origins = []; - $this->models = []; - $this->additionalOptionalListenerManager->disableListeners(); $this->optionalListenerManager->disableListeners($this->optionalListenerManager->getListeners()); + + $this->configurable = $this->cacheProvider->fetch('akeneo_configurable') ?: []; } public function write(array $items) { + if (!$this->variants) { + $this->variants = $this->cache->get('variants', function () { + return []; + }); + } + if (!$this->origins) { + $this->origins = $this->cache->get('origins', function () { + return []; + }); + } + if (!$this->models) { + $this->models = $this->cache->get('models', function () { + return []; + }); + } + foreach ($items as $item) { $origin = $item['origin']; $sku = $item['sku']; @@ -95,28 +124,52 @@ public function write(array $items) $this->variants[$parent][$origin] = [ 'parent' => $this->origins[$parent] ?? $parent, 'variant' => $sku, - 'enabled' => $item['enabled'] ?? false, ]; } } - public function close() + public function flush() { - $this->variants = []; - $this->origins = []; - $this->models = []; - $this->optionalListenerManager->enableListeners($this->optionalListenerManager->getListeners()); $this->additionalOptionalListenerManager->enableListeners(); - } - public function flush() - { + if (!$this->variants) { + return; + } + + if ($this->variants) { + $variants = $this->cache->getItem('variants'); + $variants->set($this->variants); + $this->cache->save($variants); + } + + if ($this->origins) { + $origins = $this->cache->getItem('origins'); + $origins->set($this->origins); + $this->cache->save($origins); + } + + if ($this->models) { + $models = $this->cache->getItem('models'); + $models->set($this->models); + $this->cache->save($models); + } + + $updated = $this->cache->getItem('time'); + $updated->set($this->cacheProvider->fetch('time')); + $this->cache->save($updated); + + $this->variants = array_intersect_key($this->variants, $this->configurable); + foreach ($this->models as $levelTwo => $levelOne) { if (array_key_exists($levelTwo, $this->variants)) { foreach ($this->variants[$levelTwo] as $sku => $item) { $item['parent'] = $this->origins[$levelOne] ?? $levelOne; $this->variants[$levelOne][$sku] = $item; + + $akeneoVariantLevels = $this->cacheProvider->fetch('akeneo_variant_levels'); + $this->variants[$levelOne][$sku]['parent_disabled'] = $akeneoVariantLevels === AkeneoSettings::TWO_LEVEL_FAMILY_VARIANT_SECOND_ONLY; + $this->variants[$levelTwo][$sku]['parent_disabled'] = $akeneoVariantLevels === AkeneoSettings::TWO_LEVEL_FAMILY_VARIANT_FIRST_ONLY; } } } @@ -138,6 +191,10 @@ public function flush() $this->sendMessage($channelId, $jobId); } } + + $this->variants = []; + $this->origins = []; + $this->models = []; } private function createFieldsChanges(int $jobId, array &$data, string $key): bool diff --git a/Integration/AkeneoTransport.php b/Integration/AkeneoTransport.php index fae743b4..22dbaf9d 100644 --- a/Integration/AkeneoTransport.php +++ b/Integration/AkeneoTransport.php @@ -238,17 +238,21 @@ public function getProducts(int $pageSize) ); } - public function getProductsList(int $pageSize, ?string $family = null): iterable + public function getProductsList(int $pageSize, int $sinceLastNDays = null): iterable { - $attributeMapping = $this->getAttributeMapping(); + $this->initAttributesList(); + + $filters = [ + 'parent' => [['operator' => 'NOT EMPTY']], + 'family' => [['operator' => 'NOT EMPTY']], + ]; + if ($sinceLastNDays) { + $filters['updated'] = [['operator' => 'SINCE LAST N DAYS', 'value' => $sinceLastNDays]]; + } $queryParams = [ 'scope' => $this->transportEntity->getAkeneoActiveChannel(), - 'search' => $this->akeneoSearchBuilder->getFilters( - $family ? - json_encode(['family' => [['operator' => 'IN', 'value' => [$family]]]]) : - $this->transportEntity->getProductFilter() - ), - 'attributes' => $attributeMapping['sku'] ?? reset($attributeMapping), + 'search' => $this->akeneoSearchBuilder->getFilters(json_encode($filters)), + 'attributes' => array_key_first($this->attributes), ]; if ($this->transportEntity->getSyncProducts() === SyncProductsDataProvider::PUBLISHED) { @@ -256,7 +260,7 @@ public function getProductsList(int $pageSize, ?string $family = null): iterable $this->client->getPublishedProductApi()->all($pageSize, $queryParams), $this->client, $this->logger, - $attributeMapping + $this->getAttributeMapping() ); } @@ -264,7 +268,7 @@ public function getProductsList(int $pageSize, ?string $family = null): iterable $this->client->getProductApi()->all($pageSize, $queryParams), $this->client, $this->logger, - $attributeMapping + $this->getAttributeMapping() ); } @@ -293,20 +297,27 @@ public function getProductModels(int $pageSize) ); } - public function getProductModelsList(int $pageSize): iterable + public function getProductModelsList(int $pageSize, int $sinceLastNDays = null): iterable { - $attributeMapping = $this->getAttributeMapping(); + $this->initAttributesList(); + + $filters = [ + 'family' => [['operator' => 'NOT EMPTY']], + ]; + if ($sinceLastNDays) { + $filters['updated'] = [['operator' => 'SINCE LAST N DAYS', 'value' => $sinceLastNDays]]; + } $queryParams = [ 'scope' => $this->transportEntity->getAkeneoActiveChannel(), - 'search' => $this->akeneoSearchBuilder->getFilters($this->transportEntity->getConfigurableProductFilter()), - 'attributes' => $attributeMapping['sku'] ?? reset($attributeMapping), + 'search' => $this->akeneoSearchBuilder->getFilters(json_encode($filters)), + 'attributes' => array_key_first($this->attributes), ]; return new ConfigurableProductIterator( $this->client->getProductModelApi()->all($pageSize, $queryParams), $this->client, $this->logger, - $attributeMapping + $this->getAttributeMapping() ); } diff --git a/Integration/AkeneoTransportInterface.php b/Integration/AkeneoTransportInterface.php index efee9c45..c0adc2e9 100644 --- a/Integration/AkeneoTransportInterface.php +++ b/Integration/AkeneoTransportInterface.php @@ -51,9 +51,9 @@ public function getProducts(int $pageSize); */ public function getProductModels(int $pageSize); - public function getProductsList(int $pageSize, ?string $family = null): iterable; + public function getProductsList(int $pageSize, int $sinceLastNDays = null): iterable; - public function getProductModelsList(int $pageSize): iterable; + public function getProductModelsList(int $pageSize, int $sinceLastNDays = null): iterable; /** * @return \Iterator diff --git a/Integration/Connector/ConfigurableProductConnector.php b/Integration/Connector/ConfigurableProductConnector.php index 90f4e382..f8e1bd31 100644 --- a/Integration/Connector/ConfigurableProductConnector.php +++ b/Integration/Connector/ConfigurableProductConnector.php @@ -10,6 +10,7 @@ use Oro\Bundle\IntegrationBundle\Provider\AllowedConnectorInterface; use Oro\Bundle\IntegrationBundle\Provider\ConnectorInterface; use Oro\Bundle\ProductBundle\Entity\Product; +use Symfony\Contracts\Cache\CacheInterface; /** * Integration configurable product connector. @@ -26,6 +27,14 @@ class ConfigurableProductConnector extends AbstractConnector implements Connecto /** @var SchemaUpdateFilter */ protected $schemaUpdateFilter; + /** @var CacheInterface */ + private $cache; + + public function setCache(CacheInterface $cache): void + { + $this->cache = $cache; + } + public function getLabel(): string { return 'oro.akeneo.connector.configurable_product.label'; @@ -63,21 +72,26 @@ protected function getConnectorSource() return new \ArrayIterator(); } - $iterator = new \AppendIterator(); - $iterator->append($this->transport->getProductModelsList(self::PAGE_SIZE)); + $this->cacheProvider->save('akeneo_variant_levels', $this->channel->getTransport()->getAkeneoVariantLevels()); + + $now = time(); + $this->cacheProvider->save('time', $now); - $processed = []; + $sinceLastNDays = 0; + $time = $this->cache->getItem('time')->get() ?: null; + if ($time) { + $nowDate = new \DateTime(); + $nowDate->setTimestamp($now); + $lastDate = new \DateTime(); + $lastDate->setTimestamp($time); + $interval = $lastDate->diff($nowDate); + $sinceLastNDays = (int)$interval->format('%a') ?: 1; + } - return new \CallbackFilterIterator( - $iterator, - function ($current, $key, $iterator) use (&$processed) { - if (isset($current['family_variant'], $current['family']) && empty($processed[$current['family']])) { - $iterator->append($this->transport->getProductsList(self::PAGE_SIZE, $current['family'])); - $processed[$current['family']] = true; - } + $iterator = new \AppendIterator(); + $iterator->append($this->transport->getProductModelsList(self::PAGE_SIZE, $sinceLastNDays)); + $iterator->append($this->transport->getProductsList(self::PAGE_SIZE, $sinceLastNDays)); - return true; - } - ); + return $iterator; } } diff --git a/Integration/Iterator/ConfigurableProductIterator.php b/Integration/Iterator/ConfigurableProductIterator.php index 7c214e43..92529e95 100644 --- a/Integration/Iterator/ConfigurableProductIterator.php +++ b/Integration/Iterator/ConfigurableProductIterator.php @@ -39,7 +39,6 @@ public function doCurrent() 'parent' => $item['parent'] ?? null, 'family_variant' => $item['family_variant'] ?? null, 'family' => $item['family'] ?? null, - 'enabled' => $item['enabled'] ?? false, ]; } } diff --git a/Resources/config/batch_jobs.yml b/Resources/config/batch_jobs.yml index d6505d30..2322f9ac 100644 --- a/Resources/config/batch_jobs.yml +++ b/Resources/config/batch_jobs.yml @@ -70,7 +70,7 @@ connector: processor: oro_akeneo.importexport.processor.async writer: oro_akeneo.importexport.writer.async_product parameters: - batch_size: 25 + batch_size: 100 import: title: import class: Oro\Bundle\AkeneoBundle\ImportExport\Step\ItemStep @@ -105,7 +105,7 @@ connector: processor: oro_akeneo.importexport.processor.async writer: oro_akeneo.importexport.writer.configurable_async_product parameters: - batch_size: 25 + batch_size: 100 import_variants: title: import class: Oro\Bundle\AkeneoBundle\ImportExport\Step\ItemStep diff --git a/Resources/config/importexport.yml b/Resources/config/importexport.yml index 0b852ec5..43e0e9c8 100644 --- a/Resources/config/importexport.yml +++ b/Resources/config/importexport.yml @@ -224,6 +224,7 @@ services: calls: - [ setSchemaUpdateFilter, [ '@oro_akeneo.placeholder.schema_update_filter' ] ] - [ setCacheProvider, [ '@oro_akeneo.importexport.cache' ] ] + - [ setCache, [ '@oro_akeneo.configurable_cache' ] ] tags: - { name: oro_integration.connector, type: configurable_product, channel_type: oro_akeneo } @@ -315,6 +316,8 @@ services: - '@oro_entity.doctrine_helper' - '@oro_platform.optional_listeners.manager' - '@oro_akeneo.event_listener.additional_optional_listeners_manager' + calls: + - [ setCacheProvider, [ '@oro_akeneo.importexport.cache' ] ] oro_akeneo.importexport.writer.configurable_async_product: class: 'Oro\Bundle\AkeneoBundle\ImportExport\Writer\ConfigurableAsyncWriter' @@ -323,6 +326,9 @@ services: - '@oro_entity.doctrine_helper' - '@oro_platform.optional_listeners.manager' - '@oro_akeneo.event_listener.additional_optional_listeners_manager' + calls: + - [ setCacheProvider, [ '@oro_akeneo.importexport.cache' ] ] + - [ setCache, [ '@oro_akeneo.configurable_cache' ] ] oro_akeneo.importexport.cache: parent: oro_cache.array_cache diff --git a/Resources/config/services.yml b/Resources/config/services.yml index 49593a94..93d39339 100644 --- a/Resources/config/services.yml +++ b/Resources/config/services.yml @@ -391,3 +391,8 @@ services: class: Oro\Bundle\AkeneoBundle\EntityConfig\ImportexportFieldConfiguration tags: - oro_entity_config.validation.entity_config + + oro_akeneo.configurable_cache: + parent: oro.data.cache + tags: + - { name: 'cache.pool', namespace: 'oro_akeneo_configurable_cache' }