Skip to content

Commit

Permalink
fix AlgoliaReindexAllJob to batch into steps
Browse files Browse the repository at this point in the history
Fix to use jobData so that it is possible to run over multiple steps & if the job dies or runs out of memory etc.
Complete overhaul of this class to actually do what it says in the setup description - do the heavy lifting / batching at setup and then step through them 1 chunk at a time using QueuedJob's currentStep.
  • Loading branch information
matt-in-a-hat committed Jul 31, 2024
1 parent c584e2a commit ef1a478
Showing 1 changed file with 86 additions and 72 deletions.
158 changes: 86 additions & 72 deletions src/Jobs/AlgoliaReindexAllJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

namespace Wilr\Silverstripe\Algolia\Jobs;

use Exception;
use SilverStripe\Core\Config\Configurable;
use SilverStripe\Core\Injector\Injector;
use SilverStripe\ORM\DataObject;
use stdClass;
use Symbiote\QueuedJobs\Services\AbstractQueuedJob;
use Symbiote\QueuedJobs\Services\QueuedJob;
use Throwable;
Expand All @@ -20,8 +20,6 @@ class AlgoliaReindexAllJob extends AbstractQueuedJob implements QueuedJob
{
use Configurable;

public $indexData = [];

/**
* An optional array of default filters to apply when doing the reindex
* i.e for indexing Page subclasses you may wish to exclude expired pages.
Expand Down Expand Up @@ -59,104 +57,120 @@ public function setup()
$this->totalSteps = 0;
$this->currentStep = 0;

$indexData = [];

$filters = $this->config()->get('reindexing_default_filters');
$batchSize = $task->config()->get('batch_size');
$batching = $this->config()->get('use_batching');

// find all classes we have to index and add them to the indexData map
// in groups of batch size, this setup operation does the heavy lifting
// and process simply handles one batch at a time.
foreach ($algoliaService->indexes as $index) {
foreach ($algoliaService->indexes as $indexName => $index) {
$classes = (isset($index['includeClasses'])) ? $index['includeClasses'] : null;
$indexFilters = (isset($index['includeFilters'])) ? $index['includeFilters'] : null;

if ($classes) {
foreach ($classes as $candidate) {
$filter = (isset($filters[$candidate])) ? $filters[$candidate] : '';
$count = 0;

foreach ($task->getItems($candidate, $filter, $indexFilters)->column('ID') as $id) {
$count++;

if (!isset($this->indexData[$candidate])) {
$this->indexData[$candidate] = [];
foreach ($classes as $class) {
$filter = (isset($filters[$class])) ? $filters[$class] : '';
$ids = $task->getItems($class, $filter, $indexFilters)->column('ID');
if (count($ids)) {
if ($batching && $batchSize > 1) {
foreach (array_chunk($ids, $batchSize) as $chunk) {
$indexData[] = [
'indexName' => $indexName,
'class' => $class,
'ids' => $chunk,
];
}
} else {
foreach ($ids as $id) {
$indexData[] = [
'indexName' => $indexName,
'class' => $class,
'id' => $id,
];
}
}

$this->indexData[$candidate][] = $id;
$this->totalSteps++;
$this->addMessage('[' . $indexName . '] Indexing ' . count($ids) . ' ' . $class . ' instances with filters: ' . ($filter ?: '(none)'));
} else {
$this->addMessage('[' . $indexName . '] 0 ' . $class . ' instances to index with filters: ' . ($filter ?: '(none) - skipping.'));
}

$this->addMessage('Indexing ' . $count . ' ' . $candidate . ' instances with filters ' . $filter);
}
}
}
$this->totalSteps += count($indexData);
// Store in jobData to get written to the job descriptor in DB
if (!$this->jobData) {
$this->jobData = new stdClass();
}
$this->jobData->IndexData = $indexData;
}

/**
* Index data is in groups of 20.
* Index data is an array of steps to process, each step either looks like this with batching:
* [
* 'indexName' => string,
* 'class' => string,
* 'ids' => array of int,
* ]
* or this without batching:
* [
* 'indexName' => string,
* 'class' => string,
* 'id' => int,
* ]
* We process one step / batch / id per call.
*/
public function process()
{
$remainingChildren = $this->indexData;

if (!$remainingChildren || empty($remainingChildren)) {
if ($this->currentStep >= $this->totalSteps) {
$this->isComplete = true;
$this->addMessage('Done!');

return;
}
$indexData = isset($this->jobData->IndexData) ? $this->jobData->IndexData : null;
if (!isset($indexData[$this->currentStep])) {
$this->isComplete = true;
$this->addMessage('Somehow we ran out of job data before all steps were processed. So we will assume we are done!');
$this->addMessage('Dumping out the jop data for debug purposes: ' . json_encode($indexData));
return;
}

$algoliaService = Injector::inst()->create(AlgoliaService::class);
$task = new AlgoliaReindex();

$batchSize = $task->config()->get('batch_size');
$batching = $this->config()->get('use_batching');

foreach ($remainingChildren as $class => $ids) {
foreach ($algoliaService->indexes as $indexName => $index) {
$classes = (isset($index['includeClasses'])) ? $index['includeClasses'] : [];

if (!in_array($class, $classes)) {
continue;
}

$take = array_slice($ids, 0, $batchSize);
$this->indexData[$class] = array_slice($ids, $batchSize);

if (!empty($take)) {
$this->currentStep += count($take);
$errors = [];

try {
if ($batching) {
if ($task->indexItems($indexName, DataObject::get($class)->filter('ID', $take), false)) {
$this->addMessage('Successfully indexing ' . $class . ' [' . implode(', ', $take) . ']');
} else {
$this->addMessage('Error indexing ' . $class . ' [' . implode(', ', $take) . ']');
}
} else {
$items = DataObject::get($class)->filter('ID', $take);

foreach ($items as $item) {
if ($task->indexItem($item)) {
$this->addMessage('Successfully indexed ' . $class . ' [' . $item->ID . ']');
} else {
$this->addMessage('Error indexing ' . $class . ' [' . $item->ID . ']');
}
}
}

$errors = $task->getErrors();
} catch (Throwable $e) {
$errors[] = $e->getMessage();
}

if (!empty($errors)) {
$this->addMessage(implode(', ', $errors));
$task->clearErrors();
$stepData = $indexData[$this->currentStep];
$class = $stepData['class'];

try {
$task = new AlgoliaReindex();

if (isset($stepData['ids'])) {
$summary = $task->indexItems($stepData['indexName'], DataObject::get($class)->filter('ID', $stepData['ids']), false);
$this->addMessage($summary);
} else {
$item = DataObject::get($class)->byID($stepData['id']);
if ($item) {
if (min($item->invokeWithExtensions('canIndexInAlgolia')) === false) {
$this->addMessage('Skipped indexing ' . $class . ' ' . $item->ID);
} else if ($task->indexItem($item)) {
$this->addMessage('Successfully indexed ' . $class . ' ' . $item->ID);
} else {
$this->addMessage('Error indexing ' . $class . ' ' . $item->ID);
}
} else {
unset($this->indexData[$class]);
$this->addMessage('Error indexing ' . $class . ' ' . $stepData['id'] . ' - failed to load item from DB');
}
}

$errors = $task->getErrors();
} catch (Throwable $e) {
$errors[] = $e->getMessage();
}

if (!empty($errors)) {
$this->addMessage(implode(', ', $errors));
$task->clearErrors();
}

$this->currentStep++;
}
}

0 comments on commit ef1a478

Please sign in to comment.