From ef1a478f520e9a37ae5fdcb9efbacccb37aef5ac Mon Sep 17 00:00:00 2001 From: Matt Lang Date: Wed, 31 Jul 2024 12:32:16 +1200 Subject: [PATCH] fix AlgoliaReindexAllJob to batch into steps Fix to use jobData so that it is possible to run over multiple steps & if the job dies or runs out of memory etc. Complete overhaul of this class to actually do what it says in the setup description - do the heavy lifting / batching at setup and then step through them 1 chunk at a time using QueuedJob's currentStep. --- src/Jobs/AlgoliaReindexAllJob.php | 158 ++++++++++++++++-------------- 1 file changed, 86 insertions(+), 72 deletions(-) diff --git a/src/Jobs/AlgoliaReindexAllJob.php b/src/Jobs/AlgoliaReindexAllJob.php index 0de985b..e9afe25 100644 --- a/src/Jobs/AlgoliaReindexAllJob.php +++ b/src/Jobs/AlgoliaReindexAllJob.php @@ -2,10 +2,10 @@ namespace Wilr\Silverstripe\Algolia\Jobs; -use Exception; use SilverStripe\Core\Config\Configurable; use SilverStripe\Core\Injector\Injector; use SilverStripe\ORM\DataObject; +use stdClass; use Symbiote\QueuedJobs\Services\AbstractQueuedJob; use Symbiote\QueuedJobs\Services\QueuedJob; use Throwable; @@ -20,8 +20,6 @@ class AlgoliaReindexAllJob extends AbstractQueuedJob implements QueuedJob { use Configurable; - public $indexData = []; - /** * An optional array of default filters to apply when doing the reindex * i.e for indexing Page subclasses you may wish to exclude expired pages. @@ -59,104 +57,120 @@ public function setup() $this->totalSteps = 0; $this->currentStep = 0; + $indexData = []; + $filters = $this->config()->get('reindexing_default_filters'); + $batchSize = $task->config()->get('batch_size'); + $batching = $this->config()->get('use_batching'); // find all classes we have to index and add them to the indexData map // in groups of batch size, this setup operation does the heavy lifting // and process simply handles one batch at a time. - foreach ($algoliaService->indexes as $index) { + foreach ($algoliaService->indexes as $indexName => $index) { $classes = (isset($index['includeClasses'])) ? $index['includeClasses'] : null; $indexFilters = (isset($index['includeFilters'])) ? $index['includeFilters'] : null; if ($classes) { - foreach ($classes as $candidate) { - $filter = (isset($filters[$candidate])) ? $filters[$candidate] : ''; - $count = 0; - - foreach ($task->getItems($candidate, $filter, $indexFilters)->column('ID') as $id) { - $count++; - - if (!isset($this->indexData[$candidate])) { - $this->indexData[$candidate] = []; + foreach ($classes as $class) { + $filter = (isset($filters[$class])) ? $filters[$class] : ''; + $ids = $task->getItems($class, $filter, $indexFilters)->column('ID'); + if (count($ids)) { + if ($batching && $batchSize > 1) { + foreach (array_chunk($ids, $batchSize) as $chunk) { + $indexData[] = [ + 'indexName' => $indexName, + 'class' => $class, + 'ids' => $chunk, + ]; + } + } else { + foreach ($ids as $id) { + $indexData[] = [ + 'indexName' => $indexName, + 'class' => $class, + 'id' => $id, + ]; + } } - - $this->indexData[$candidate][] = $id; - $this->totalSteps++; + $this->addMessage('[' . $indexName . '] Indexing ' . count($ids) . ' ' . $class . ' instances with filters: ' . ($filter ?: '(none)')); + } else { + $this->addMessage('[' . $indexName . '] 0 ' . $class . ' instances to index with filters: ' . ($filter ?: '(none) - skipping.')); } - - $this->addMessage('Indexing ' . $count . ' ' . $candidate . ' instances with filters ' . $filter); } } } + $this->totalSteps += count($indexData); + // Store in jobData to get written to the job descriptor in DB + if (!$this->jobData) { + $this->jobData = new stdClass(); + } + $this->jobData->IndexData = $indexData; } /** - * Index data is in groups of 20. + * Index data is an array of steps to process, each step either looks like this with batching: + * [ + * 'indexName' => string, + * 'class' => string, + * 'ids' => array of int, + * ] + * or this without batching: + * [ + * 'indexName' => string, + * 'class' => string, + * 'id' => int, + * ] + * We process one step / batch / id per call. */ public function process() { - $remainingChildren = $this->indexData; - - if (!$remainingChildren || empty($remainingChildren)) { + if ($this->currentStep >= $this->totalSteps) { $this->isComplete = true; $this->addMessage('Done!'); - + return; + } + $indexData = isset($this->jobData->IndexData) ? $this->jobData->IndexData : null; + if (!isset($indexData[$this->currentStep])) { + $this->isComplete = true; + $this->addMessage('Somehow we ran out of job data before all steps were processed. So we will assume we are done!'); + $this->addMessage('Dumping out the jop data for debug purposes: ' . json_encode($indexData)); return; } - $algoliaService = Injector::inst()->create(AlgoliaService::class); - $task = new AlgoliaReindex(); - - $batchSize = $task->config()->get('batch_size'); - $batching = $this->config()->get('use_batching'); - - foreach ($remainingChildren as $class => $ids) { - foreach ($algoliaService->indexes as $indexName => $index) { - $classes = (isset($index['includeClasses'])) ? $index['includeClasses'] : []; - - if (!in_array($class, $classes)) { - continue; - } - - $take = array_slice($ids, 0, $batchSize); - $this->indexData[$class] = array_slice($ids, $batchSize); - - if (!empty($take)) { - $this->currentStep += count($take); - $errors = []; - - try { - if ($batching) { - if ($task->indexItems($indexName, DataObject::get($class)->filter('ID', $take), false)) { - $this->addMessage('Successfully indexing ' . $class . ' [' . implode(', ', $take) . ']'); - } else { - $this->addMessage('Error indexing ' . $class . ' [' . implode(', ', $take) . ']'); - } - } else { - $items = DataObject::get($class)->filter('ID', $take); - - foreach ($items as $item) { - if ($task->indexItem($item)) { - $this->addMessage('Successfully indexed ' . $class . ' [' . $item->ID . ']'); - } else { - $this->addMessage('Error indexing ' . $class . ' [' . $item->ID . ']'); - } - } - } - - $errors = $task->getErrors(); - } catch (Throwable $e) { - $errors[] = $e->getMessage(); - } - - if (!empty($errors)) { - $this->addMessage(implode(', ', $errors)); - $task->clearErrors(); + $stepData = $indexData[$this->currentStep]; + $class = $stepData['class']; + + try { + $task = new AlgoliaReindex(); + + if (isset($stepData['ids'])) { + $summary = $task->indexItems($stepData['indexName'], DataObject::get($class)->filter('ID', $stepData['ids']), false); + $this->addMessage($summary); + } else { + $item = DataObject::get($class)->byID($stepData['id']); + if ($item) { + if (min($item->invokeWithExtensions('canIndexInAlgolia')) === false) { + $this->addMessage('Skipped indexing ' . $class . ' ' . $item->ID); + } else if ($task->indexItem($item)) { + $this->addMessage('Successfully indexed ' . $class . ' ' . $item->ID); + } else { + $this->addMessage('Error indexing ' . $class . ' ' . $item->ID); } } else { - unset($this->indexData[$class]); + $this->addMessage('Error indexing ' . $class . ' ' . $stepData['id'] . ' - failed to load item from DB'); } } + + $errors = $task->getErrors(); + } catch (Throwable $e) { + $errors[] = $e->getMessage(); + } + + if (!empty($errors)) { + $this->addMessage(implode(', ', $errors)); + $task->clearErrors(); } + + $this->currentStep++; } }