Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix AlgoliaReindexAllJob to batch into steps #72

Merged
merged 1 commit into from
Jul 31, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 86 additions & 72 deletions src/Jobs/AlgoliaReindexAllJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

namespace Wilr\Silverstripe\Algolia\Jobs;

use Exception;
use SilverStripe\Core\Config\Configurable;
use SilverStripe\Core\Injector\Injector;
use SilverStripe\ORM\DataObject;
use stdClass;
use Symbiote\QueuedJobs\Services\AbstractQueuedJob;
use Symbiote\QueuedJobs\Services\QueuedJob;
use Throwable;
Expand All @@ -20,8 +20,6 @@ class AlgoliaReindexAllJob extends AbstractQueuedJob implements QueuedJob
{
use Configurable;

public $indexData = [];

/**
* An optional array of default filters to apply when doing the reindex
* i.e for indexing Page subclasses you may wish to exclude expired pages.
Expand Down Expand Up @@ -59,104 +57,120 @@ public function setup()
$this->totalSteps = 0;
$this->currentStep = 0;

$indexData = [];

$filters = $this->config()->get('reindexing_default_filters');
$batchSize = $task->config()->get('batch_size');
$batching = $this->config()->get('use_batching');

// find all classes we have to index and add them to the indexData map
// in groups of batch size, this setup operation does the heavy lifting
// and process simply handles one batch at a time.
foreach ($algoliaService->indexes as $index) {
foreach ($algoliaService->indexes as $indexName => $index) {
$classes = (isset($index['includeClasses'])) ? $index['includeClasses'] : null;
$indexFilters = (isset($index['includeFilters'])) ? $index['includeFilters'] : null;

if ($classes) {
foreach ($classes as $candidate) {
$filter = (isset($filters[$candidate])) ? $filters[$candidate] : '';
$count = 0;

foreach ($task->getItems($candidate, $filter, $indexFilters)->column('ID') as $id) {
$count++;

if (!isset($this->indexData[$candidate])) {
$this->indexData[$candidate] = [];
foreach ($classes as $class) {
$filter = (isset($filters[$class])) ? $filters[$class] : '';
$ids = $task->getItems($class, $filter, $indexFilters)->column('ID');
if (count($ids)) {
if ($batching && $batchSize > 1) {
foreach (array_chunk($ids, $batchSize) as $chunk) {
$indexData[] = [
'indexName' => $indexName,
'class' => $class,
'ids' => $chunk,
];
}
} else {
foreach ($ids as $id) {
$indexData[] = [
'indexName' => $indexName,
'class' => $class,
'id' => $id,
];
}
}

$this->indexData[$candidate][] = $id;
$this->totalSteps++;
$this->addMessage('[' . $indexName . '] Indexing ' . count($ids) . ' ' . $class . ' instances with filters: ' . ($filter ?: '(none)'));
} else {
$this->addMessage('[' . $indexName . '] 0 ' . $class . ' instances to index with filters: ' . ($filter ?: '(none) - skipping.'));
}

$this->addMessage('Indexing ' . $count . ' ' . $candidate . ' instances with filters ' . $filter);
}
}
}
$this->totalSteps += count($indexData);
// Store in jobData to get written to the job descriptor in DB
if (!$this->jobData) {
$this->jobData = new stdClass();
}
$this->jobData->IndexData = $indexData;
}

/**
* Index data is in groups of 20.
* Index data is an array of steps to process, each step either looks like this with batching:
* [
* 'indexName' => string,
* 'class' => string,
* 'ids' => array of int,
* ]
* or this without batching:
* [
* 'indexName' => string,
* 'class' => string,
* 'id' => int,
* ]
* We process one step / batch / id per call.
*/
public function process()
{
$remainingChildren = $this->indexData;

if (!$remainingChildren || empty($remainingChildren)) {
if ($this->currentStep >= $this->totalSteps) {
$this->isComplete = true;
$this->addMessage('Done!');

return;
}
$indexData = isset($this->jobData->IndexData) ? $this->jobData->IndexData : null;
if (!isset($indexData[$this->currentStep])) {
$this->isComplete = true;
$this->addMessage('Somehow we ran out of job data before all steps were processed. So we will assume we are done!');
$this->addMessage('Dumping out the jop data for debug purposes: ' . json_encode($indexData));
return;
}

$algoliaService = Injector::inst()->create(AlgoliaService::class);
$task = new AlgoliaReindex();

$batchSize = $task->config()->get('batch_size');
$batching = $this->config()->get('use_batching');

foreach ($remainingChildren as $class => $ids) {
foreach ($algoliaService->indexes as $indexName => $index) {
$classes = (isset($index['includeClasses'])) ? $index['includeClasses'] : [];

if (!in_array($class, $classes)) {
continue;
}

$take = array_slice($ids, 0, $batchSize);
$this->indexData[$class] = array_slice($ids, $batchSize);

if (!empty($take)) {
$this->currentStep += count($take);
$errors = [];

try {
if ($batching) {
if ($task->indexItems($indexName, DataObject::get($class)->filter('ID', $take), false)) {
$this->addMessage('Successfully indexing ' . $class . ' [' . implode(', ', $take) . ']');
} else {
$this->addMessage('Error indexing ' . $class . ' [' . implode(', ', $take) . ']');
}
} else {
$items = DataObject::get($class)->filter('ID', $take);

foreach ($items as $item) {
if ($task->indexItem($item)) {
$this->addMessage('Successfully indexed ' . $class . ' [' . $item->ID . ']');
} else {
$this->addMessage('Error indexing ' . $class . ' [' . $item->ID . ']');
}
}
}

$errors = $task->getErrors();
} catch (Throwable $e) {
$errors[] = $e->getMessage();
}

if (!empty($errors)) {
$this->addMessage(implode(', ', $errors));
$task->clearErrors();
$stepData = $indexData[$this->currentStep];
$class = $stepData['class'];

try {
$task = new AlgoliaReindex();

if (isset($stepData['ids'])) {
$summary = $task->indexItems($stepData['indexName'], DataObject::get($class)->filter('ID', $stepData['ids']), false);
$this->addMessage($summary);
} else {
$item = DataObject::get($class)->byID($stepData['id']);
if ($item) {
if (min($item->invokeWithExtensions('canIndexInAlgolia')) === false) {
$this->addMessage('Skipped indexing ' . $class . ' ' . $item->ID);
} else if ($task->indexItem($item)) {
$this->addMessage('Successfully indexed ' . $class . ' ' . $item->ID);
} else {
$this->addMessage('Error indexing ' . $class . ' ' . $item->ID);
}
} else {
unset($this->indexData[$class]);
$this->addMessage('Error indexing ' . $class . ' ' . $stepData['id'] . ' - failed to load item from DB');
}
}

$errors = $task->getErrors();
} catch (Throwable $e) {
$errors[] = $e->getMessage();
}

if (!empty($errors)) {
$this->addMessage(implode(', ', $errors));
$task->clearErrors();
}

$this->currentStep++;
}
}
Loading