From 1ba843993433aa3611d8c9f669c2fa6f0c568201 Mon Sep 17 00:00:00 2001 From: Maksym Perepelytsya Date: Fri, 8 Jul 2016 18:04:15 +0300 Subject: [PATCH] BAP-10038: Email processing create duplicates: - flush every email with body - add lock handler to command(only one instance of command may run) --- .../EmailBundle/Cache/EmailCacheManager.php | 1 - .../Command/Cron/EmailBodySyncCommand.php | 11 +++++ .../Sync/EmailBodySynchronizer.php | 45 ++++++++++++------- .../Unit/Cache/EmailCacheManagerTest.php | 3 -- .../Unit/Sync/EmailBodySynchronizerTest.php | 12 ++--- 5 files changed, 46 insertions(+), 26 deletions(-) diff --git a/src/Oro/Bundle/EmailBundle/Cache/EmailCacheManager.php b/src/Oro/Bundle/EmailBundle/Cache/EmailCacheManager.php index be31ecbf2b5..503abeb6c70 100644 --- a/src/Oro/Bundle/EmailBundle/Cache/EmailCacheManager.php +++ b/src/Oro/Bundle/EmailBundle/Cache/EmailCacheManager.php @@ -46,7 +46,6 @@ public function ensureEmailBodyCached(Email $email) { if ($email->getEmailBody() === null) { $this->emailBodySynchronizer->syncOneEmailBody($email); - $this->em->flush(); } $this->eventDispatcher->dispatch(EmailBodyLoaded::NAME, new EmailBodyLoaded($email)); diff --git a/src/Oro/Bundle/EmailBundle/Command/Cron/EmailBodySyncCommand.php b/src/Oro/Bundle/EmailBundle/Command/Cron/EmailBodySyncCommand.php index fc21f89ba41..c09e1cee2bd 100644 --- a/src/Oro/Bundle/EmailBundle/Command/Cron/EmailBodySyncCommand.php +++ b/src/Oro/Bundle/EmailBundle/Command/Cron/EmailBodySyncCommand.php @@ -6,6 +6,7 @@ use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; +use Symfony\Component\Filesystem\LockHandler; use Oro\Bundle\CronBundle\Command\CronCommandInterface; use Oro\Bundle\EmailBundle\Sync\EmailBodySynchronizer; @@ -60,9 +61,19 @@ protected function configure() */ protected function execute(InputInterface $input, OutputInterface $output) { + $lock = new LockHandler('oro:cron:email-body-sync'); + if (!$lock->lock()) { + $output->writeln('The command is already running in another process.'); + + return 0; + } /** @var EmailBodySynchronizer $synchronizer */ $synchronizer = $this->getContainer()->get('oro_email.email_body_synchronizer'); $synchronizer->setLogger(new OutputLogger($output)); $synchronizer->sync((int)$input->getOption('max-exec-time'), (int)$input->getOption('batch-size')); + + $lock->release(); + + return 0; } } diff --git a/src/Oro/Bundle/EmailBundle/Sync/EmailBodySynchronizer.php b/src/Oro/Bundle/EmailBundle/Sync/EmailBodySynchronizer.php index cb80d48c0f9..662a4f64565 100644 --- a/src/Oro/Bundle/EmailBundle/Sync/EmailBodySynchronizer.php +++ b/src/Oro/Bundle/EmailBundle/Sync/EmailBodySynchronizer.php @@ -2,7 +2,7 @@ namespace Oro\Bundle\EmailBundle\Sync; -use Doctrine\Common\Persistence\ObjectManager; +use Doctrine\ORM\EntityManager; use Psr\Log\LoggerAwareInterface; use Psr\Log\LoggerAwareTrait; @@ -34,7 +34,7 @@ class EmailBodySynchronizer implements LoggerAwareInterface /** @var array */ protected $emailBodyLoaders = []; - /** @var ObjectManager */ + /** @var EntityManager */ protected $manager = null; /** @@ -61,7 +61,7 @@ public function __construct( */ public function syncOneEmailBody(Email $email) { - if ($email->isBodySynced() !== true && $email->getEmailBody() === null) { + if ($this->isBodyNotLoaded($email)) { // body loader can load email from any folder // todo: refactor to use correct emailuser and origin // to use active origin and get correct folder from this origin @@ -72,12 +72,20 @@ public function syncOneEmailBody(Email $email) throw new LoadEmailBodyFailedException($email); } - $loader = $this->getBodyLoader($origin); - $email->setBodySynced(true); - $emailBody = null; + $loader = $this->getBodyLoader($origin); + $bodyLoaded = false; try { - $emailBody = $loader->loadEmailBody($folder, $email, $this->getManager()); - $email->setEmailBody($emailBody); + $em = $this->getManager(); + $emailBody = $loader->loadEmailBody($folder, $email, $em); + $em->refresh($email); + // double check + if ($this->isBodyNotLoaded($email)) { + $email->setEmailBody($emailBody); + $email->setBodySynced(true); + $bodyLoaded = true; + $em->persist($email); + $em->flush($email); + } } catch (LoadEmailBodyException $loadEx) { $this->logger->notice( sprintf('Load email body failed. Email id: %d. Error: %s', $email->getId(), $loadEx->getMessage()), @@ -91,10 +99,7 @@ public function syncOneEmailBody(Email $email) ); throw new LoadEmailBodyFailedException($email, $ex); } - - $this->getManager()->persist($email); - - if ($emailBody) { + if ($bodyLoaded) { $event = new EmailBodyAdded($email); $this->eventDispatcher->dispatch(EmailBodyAdded::NAME, $event); } @@ -127,7 +132,7 @@ public function sync($maxExecTimeInMin = -1, $batchSize = 10) $emails = $repo->getEmailsWithoutBody($batchSize); if (count($emails) === 0) { - $this->logger->notice('All emails was processed'); + $this->logger->info('All emails was processed'); break; } @@ -146,8 +151,6 @@ public function sync($maxExecTimeInMin = -1, $batchSize = 10) continue; } } - - $this->getManager()->flush(); $this->getManager()->clear(); $currentTime = new \DateTime('now', new \DateTimeZone('UTC')); @@ -157,7 +160,7 @@ public function sync($maxExecTimeInMin = -1, $batchSize = 10) } /** - * @return ObjectManager + * @return EntityManager */ protected function getManager() { @@ -182,4 +185,14 @@ protected function getBodyLoader(EmailOrigin $origin) return $this->emailBodyLoaders[$originId]; } + + /** + * @param Email $email + * + * @return bool + */ + protected function isBodyNotLoaded(Email $email) + { + return $email->isBodySynced() !== true && $email->getEmailBody() === null; + } } diff --git a/src/Oro/Bundle/EmailBundle/Tests/Unit/Cache/EmailCacheManagerTest.php b/src/Oro/Bundle/EmailBundle/Tests/Unit/Cache/EmailCacheManagerTest.php index d62692811d6..23179034879 100644 --- a/src/Oro/Bundle/EmailBundle/Tests/Unit/Cache/EmailCacheManagerTest.php +++ b/src/Oro/Bundle/EmailBundle/Tests/Unit/Cache/EmailCacheManagerTest.php @@ -62,9 +62,6 @@ public function testEnsureEmailBodyCached() $this->emailBodySynchronizer ->expects($this->once()) ->method('syncOneEmailBody'); - $this->em - ->expects($this->once()) - ->method('flush'); $this->manager->ensureEmailBodyCached($email); } diff --git a/src/Oro/Bundle/EmailBundle/Tests/Unit/Sync/EmailBodySynchronizerTest.php b/src/Oro/Bundle/EmailBundle/Tests/Unit/Sync/EmailBodySynchronizerTest.php index 678b26d2fc8..c61d36a1d5c 100644 --- a/src/Oro/Bundle/EmailBundle/Tests/Unit/Sync/EmailBodySynchronizerTest.php +++ b/src/Oro/Bundle/EmailBundle/Tests/Unit/Sync/EmailBodySynchronizerTest.php @@ -96,7 +96,7 @@ public function testSyncOneEmailBody() ->will($this->returnValue($emailBody)); $this->em->expects($this->once()) - ->method('persist') + ->method('flush') ->with($this->identicalTo($email)); $this->logger->expects($this->never()) @@ -218,7 +218,7 @@ public function testSyncOnEmptyData() $repo->expects($this->once())->method('getEmailsWithoutBody') ->willReturn([]); $this->logger->expects($this->once()) - ->method('notice') + ->method('info') ->with( 'All emails was processed' ); @@ -270,14 +270,14 @@ function () use (&$runCount, $email) { ->will($this->returnValue($emailBody)); $this->em->expects($this->once()) - ->method('persist') + ->method('flush') ->with($this->identicalTo($email)); - $this->em->expects($this->once()) - ->method('flush'); $this->em->expects($this->once()) ->method('clear'); - $this->logger->expects($this->exactly(2)) + $this->logger->expects($this->once()) ->method('notice'); + $this->logger->expects($this->exactly(2)) + ->method('info'); $this->logger->expects($this->never()) ->method('warning');