Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add first two change handler implementations #14

Merged
merged 11 commits into from
Dec 17, 2024
2 changes: 1 addition & 1 deletion CRM/Share/Form/ShareNode.php
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php

use CRM_Share_ExtensionUtil as E;
use \Civi\Api4\ShareNode;
use Civi\Api4\ShareNode;

/**
* Form controller class
Expand Down
2 changes: 1 addition & 1 deletion CRM/Share/Form/ShareNodePeering.php
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php

use CRM_Share_ExtensionUtil as E;
use \Civi\Api4\ShareNodePeering;
use Civi\Api4\ShareNodePeering;

/**
* Form controller class
Expand Down
5 changes: 5 additions & 0 deletions Civi/Api4/ShareChange.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<?php
namespace Civi\Api4;

use Civi\Share\Api4\Action\ShareChange\ProcessAction;
use Civi\Share\Permissions;

/**
Expand All @@ -12,6 +13,10 @@
*/
class ShareChange extends Generic\DAOEntity {

public static function process(bool $checkPermissions = TRUE): ProcessAction {
return (new ProcessAction())->setCheckPermissions($checkPermissions);
}

/**
* @inheritDoc
*/
Expand Down
10 changes: 5 additions & 5 deletions Civi/Api4/ShareChangeMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

use Civi\Api4\Generic\AbstractEntity;
use Civi\Share\Api4\Action\ShareChangeMessage\GetFieldsAction;
use Civi\Share\Api4\Action\ShareChangeMessage\ReceiveAction;
use Civi\Share\Api4\Action\ShareChangeMessage\SendAction;
use Civi\Share\Api4\Action\ShareChangeMessage\ReceiveAction;
use Civi\Share\Permissions;

class ShareChangeMessage extends Generic\AbstractEntity {
Expand All @@ -17,14 +17,14 @@ public static function getFields(bool $checkPermissions = TRUE) {
return (new GetFieldsAction())->setCheckPermissions($checkPermissions);
}

public static function receive(bool $checkPermissions = TRUE) {
return (new ReceiveAction())->setCheckPermissions($checkPermissions);
}

public static function send(bool $checkPermissions = TRUE) {
return (new SendAction())->setCheckPermissions($checkPermissions);
}

public static function receive(bool $checkPermissions = TRUE) {
return (new ReceiveAction())->setCheckPermissions($checkPermissions);
}

/**
* @inheritDoc
*/
Expand Down
54 changes: 54 additions & 0 deletions Civi/Share/Api4/Action/ShareChange/ProcessAction.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<?php

namespace Civi\Share\Api4\Action\ShareChange;

use Civi\Api4\Generic\AbstractAction;
use Civi\Api4\Generic\Result;
use Civi\Api4\ShareChange;
use Civi\Share\Change;
use Civi\Share\ChangeProcessingEvent;

/**
* @method int getLocalNodeId()
* @method $this setLocalNodeId(int $localNodeId)
* @method int getId()
* @method $this setId(int $id)
*/
class ProcessAction extends AbstractAction {

/**
* @var int
* The ID of the CiviShare Node on which to process the change.
* @required
*/
protected ?int $localNodeId = NULL;

/**
* @var int
* The ID of the CiviShare Change to process.
*/
protected ?int $id = NULL;

public function __construct() {
parent::__construct(ShareChange::getEntityName(), 'process');
}

/**
* @inheritDoc
*/
public function _run(Result $result): void {
$query = ShareChange::get()
->addWhere('status', '=', Change::STATUS_PENDING);
if (isset($this->id)) {
$query
->addWhere('id', '=', $this->id);
}
$shareChanges = $query->execute();

foreach ($shareChanges as $shareChange) {
$change = Change::createFromApiResultArray($shareChange);
$change->process($this->localNodeId);
}
}

}
80 changes: 67 additions & 13 deletions Civi/Share/Change.php
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ public static function createFromExisting(int $id): self {
->addWhere('id', '=', $id)
->execute()
->single();
return self::createFromApiResultArray($shareChange);
}

public static function createFromApiResultArray(array $shareChange): self {
return new self(
$shareChange['change_type'],
$shareChange['local_contact_id'],
Expand All @@ -157,7 +161,7 @@ public static function createFromExisting(int $id): self {
\DateTime::createFromFormat(Utils::CIVICRM_DATE_FORMAT, $shareChange['change_date']),
\DateTime::createFromFormat(Utils::CIVICRM_DATE_FORMAT, $shareChange['received_date']),
$shareChange['status'],
$id
$shareChange['id']
);
}

Expand Down Expand Up @@ -189,21 +193,71 @@ public function getSourceNodeId(): int {
return $this->sourceNodeId;
}

public function setStatus(string $status): void {
// TODO Validate.
$this->status = $status;
}

public function process(int $localNodeId): void {
$lock = \Civi::lockManager()
// TODO: Is 'data' the right type?
->acquire('data.civishare.changes');
if (!$lock->isAcquired()) {
throw new \RuntimeException('CiviShare: Could not acquire lock for processing changes.');
}

if (!$this->isPersisted()) {
throw new \RuntimeException('CiviShare: cannot process unpersisted changes.');
}
// TODO: Replace $change with instance of $this in ChangeProcessingEvent.
$change = \Civi\Api4\ShareChange::get(TRUE)
->addWhere('id', '=', $this->id)
->setLimit(1)
->execute()
->first();
$changeProcessingEvent = new \Civi\Share\ChangeProcessingEvent($this->id, $localNodeId, $change);
try {
\Civi::dispatcher()
->dispatch(ChangeProcessingEvent::NAME, $changeProcessingEvent);
if ($changeProcessingEvent->isProcessed()) {
$this->setStatus($changeProcessingEvent->getNewStatus() ?? self::STATUS_PROCESSED);
}
else {
$this->setStatus($changeProcessingEvent->getNewStatus() ?? self::STATUS_ERROR);
\Civi::log()
->warning("Change [{$this->id}] could not be processed - no processor found.");
}
$this->persist();
}
catch (\Exception $exception) {
\Civi::log()
->warning("Change [{$this->id}] failed processing, exception was " . $exception->getMessage());
$this->setStatus(self::STATUS_ERROR);
$changeProcessingEvent->setFailed($exception->getMessage());
}

$lock->release();
}

public function persist(): void {
$shareChangeQuery = ShareChange::create()
->addValue('change_type', $this->type)
->addValue('local_contact_id', $this->localContactId)
->addValue('data_before', $this->getDataBefore())
->addValue('data_after', $this->getDataAfter())
->addValue('change_date', $this->changedDate->format(Utils::DATE_FORMAT))
->addValue('received_date', $this->receivedDate->format(Utils::DATE_FORMAT))
->addValue('status', $this->status)
->addValue('source_node_id', $this->sourceNodeId);
$record = [
'change_type' => $this->type,
'local_contact_id' => $this->localContactId,
'data_before' => $this->getDataBefore(),
'data_after' => $this->getDataAfter(),
'change_date' => $this->changedDate->format(Utils::DATE_FORMAT),
'received_date' => $this->receivedDate->format(Utils::DATE_FORMAT),
'status' => $this->status,
'source_node_id' => $this->sourceNodeId,
];
if ($this->isPersisted()) {
$shareChangeQuery
->addValue('id', $this->id);
$record['id'] = $this->id;
}
$shareChange = $shareChangeQuery->execute();
$shareChange = ShareChange::save()
->addRecord($record)
->setMatch(['id'])
->execute();

$this->id = $shareChange['id'];
}

Expand Down
Loading