Skip to content

Commit

Permalink
Merge pull request #72 from matt-in-a-hat/fix-reindex-all-job-batchin…
Browse files Browse the repository at this point in the history
…g-logic

fix AlgoliaReindexAllJob to batch into steps
  • Loading branch information
wilr authored Jul 31, 2024
2 parents c584e2a + ef1a478 commit 8d32123
Showing 1 changed file with 86 additions and 72 deletions.
158 changes: 86 additions & 72 deletions src/Jobs/AlgoliaReindexAllJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

namespace Wilr\Silverstripe\Algolia\Jobs;

use Exception;
use SilverStripe\Core\Config\Configurable;
use SilverStripe\Core\Injector\Injector;
use SilverStripe\ORM\DataObject;
use stdClass;
use Symbiote\QueuedJobs\Services\AbstractQueuedJob;
use Symbiote\QueuedJobs\Services\QueuedJob;
use Throwable;
Expand All @@ -20,8 +20,6 @@ class AlgoliaReindexAllJob extends AbstractQueuedJob implements QueuedJob
{
use Configurable;

public $indexData = [];

/**
* An optional array of default filters to apply when doing the reindex
* i.e for indexing Page subclasses you may wish to exclude expired pages.
Expand Down Expand Up @@ -59,104 +57,120 @@ public function setup()
$this->totalSteps = 0;
$this->currentStep = 0;

$indexData = [];

$filters = $this->config()->get('reindexing_default_filters');
$batchSize = $task->config()->get('batch_size');
$batching = $this->config()->get('use_batching');

// find all classes we have to index and add them to the indexData map
// in groups of batch size, this setup operation does the heavy lifting
// and process simply handles one batch at a time.
foreach ($algoliaService->indexes as $index) {
foreach ($algoliaService->indexes as $indexName => $index) {
$classes = (isset($index['includeClasses'])) ? $index['includeClasses'] : null;
$indexFilters = (isset($index['includeFilters'])) ? $index['includeFilters'] : null;

if ($classes) {
foreach ($classes as $candidate) {
$filter = (isset($filters[$candidate])) ? $filters[$candidate] : '';
$count = 0;

foreach ($task->getItems($candidate, $filter, $indexFilters)->column('ID') as $id) {
$count++;

if (!isset($this->indexData[$candidate])) {
$this->indexData[$candidate] = [];
foreach ($classes as $class) {
$filter = (isset($filters[$class])) ? $filters[$class] : '';
$ids = $task->getItems($class, $filter, $indexFilters)->column('ID');
if (count($ids)) {
if ($batching && $batchSize > 1) {
foreach (array_chunk($ids, $batchSize) as $chunk) {
$indexData[] = [
'indexName' => $indexName,
'class' => $class,
'ids' => $chunk,
];
}
} else {
foreach ($ids as $id) {
$indexData[] = [
'indexName' => $indexName,
'class' => $class,
'id' => $id,
];
}
}

$this->indexData[$candidate][] = $id;
$this->totalSteps++;
$this->addMessage('[' . $indexName . '] Indexing ' . count($ids) . ' ' . $class . ' instances with filters: ' . ($filter ?: '(none)'));
} else {
$this->addMessage('[' . $indexName . '] 0 ' . $class . ' instances to index with filters: ' . ($filter ?: '(none) - skipping.'));
}

$this->addMessage('Indexing ' . $count . ' ' . $candidate . ' instances with filters ' . $filter);
}
}
}
$this->totalSteps += count($indexData);
// Store in jobData to get written to the job descriptor in DB
if (!$this->jobData) {
$this->jobData = new stdClass();
}
$this->jobData->IndexData = $indexData;
}

/**
* Index data is in groups of 20.
* Index data is an array of steps to process, each step either looks like this with batching:
* [
* 'indexName' => string,
* 'class' => string,
* 'ids' => array of int,
* ]
* or this without batching:
* [
* 'indexName' => string,
* 'class' => string,
* 'id' => int,
* ]
* We process one step / batch / id per call.
*/
public function process()
{
$remainingChildren = $this->indexData;

if (!$remainingChildren || empty($remainingChildren)) {
if ($this->currentStep >= $this->totalSteps) {
$this->isComplete = true;
$this->addMessage('Done!');

return;
}
$indexData = isset($this->jobData->IndexData) ? $this->jobData->IndexData : null;
if (!isset($indexData[$this->currentStep])) {
$this->isComplete = true;
$this->addMessage('Somehow we ran out of job data before all steps were processed. So we will assume we are done!');
$this->addMessage('Dumping out the jop data for debug purposes: ' . json_encode($indexData));
return;
}

$algoliaService = Injector::inst()->create(AlgoliaService::class);
$task = new AlgoliaReindex();

$batchSize = $task->config()->get('batch_size');
$batching = $this->config()->get('use_batching');

foreach ($remainingChildren as $class => $ids) {
foreach ($algoliaService->indexes as $indexName => $index) {
$classes = (isset($index['includeClasses'])) ? $index['includeClasses'] : [];

if (!in_array($class, $classes)) {
continue;
}

$take = array_slice($ids, 0, $batchSize);
$this->indexData[$class] = array_slice($ids, $batchSize);

if (!empty($take)) {
$this->currentStep += count($take);
$errors = [];

try {
if ($batching) {
if ($task->indexItems($indexName, DataObject::get($class)->filter('ID', $take), false)) {
$this->addMessage('Successfully indexing ' . $class . ' [' . implode(', ', $take) . ']');
} else {
$this->addMessage('Error indexing ' . $class . ' [' . implode(', ', $take) . ']');
}
} else {
$items = DataObject::get($class)->filter('ID', $take);

foreach ($items as $item) {
if ($task->indexItem($item)) {
$this->addMessage('Successfully indexed ' . $class . ' [' . $item->ID . ']');
} else {
$this->addMessage('Error indexing ' . $class . ' [' . $item->ID . ']');
}
}
}

$errors = $task->getErrors();
} catch (Throwable $e) {
$errors[] = $e->getMessage();
}

if (!empty($errors)) {
$this->addMessage(implode(', ', $errors));
$task->clearErrors();
$stepData = $indexData[$this->currentStep];
$class = $stepData['class'];

try {
$task = new AlgoliaReindex();

if (isset($stepData['ids'])) {
$summary = $task->indexItems($stepData['indexName'], DataObject::get($class)->filter('ID', $stepData['ids']), false);
$this->addMessage($summary);
} else {
$item = DataObject::get($class)->byID($stepData['id']);
if ($item) {
if (min($item->invokeWithExtensions('canIndexInAlgolia')) === false) {
$this->addMessage('Skipped indexing ' . $class . ' ' . $item->ID);
} else if ($task->indexItem($item)) {
$this->addMessage('Successfully indexed ' . $class . ' ' . $item->ID);
} else {
$this->addMessage('Error indexing ' . $class . ' ' . $item->ID);
}
} else {
unset($this->indexData[$class]);
$this->addMessage('Error indexing ' . $class . ' ' . $stepData['id'] . ' - failed to load item from DB');
}
}

$errors = $task->getErrors();
} catch (Throwable $e) {
$errors[] = $e->getMessage();
}

if (!empty($errors)) {
$this->addMessage(implode(', ', $errors));
$task->clearErrors();
}

$this->currentStep++;
}
}

0 comments on commit 8d32123

Please sign in to comment.