Skip to content

Commit

Permalink
Correct package key on shipment wipe and get absolute class
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon Chawla committed Oct 2, 2023
1 parent 48f74ff commit 48e5b23
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 22 deletions.
5 changes: 5 additions & 0 deletions src/DataShipper.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public function pushMany(string $className, array|Collection $changes, $identifi

/** @var Model $class */
$class = new $className();
// Get absolute class name
$className = (new \ReflectionClass(new $className()))->getName();

$identifierKey = $identifierKey ?? $class->getKeyName();
$ids = array_column($changes, $identifierKey);
Expand Down Expand Up @@ -80,6 +82,9 @@ public function push(string $className, array|Collection $changes, $identifier,
throw new \Exception("Provided model does not use the HasDataSubscribers trait");
}

// Get absolute class name
$className = (new \ReflectionClass(new $className()))->getName();

if ($changes instanceof Collection) {
$changes = $changes->toArray();
}
Expand Down
20 changes: 11 additions & 9 deletions src/Jobs/NotifySubscriberOfShipment.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,20 @@ public function handle()
}

$limitHit = false;
$lastShipment = Redis::connection('data-shipper')->zscore("data-shipper-records", $this->shipment);
$lastShipment = Redis::connection('data-shipper')->zscore("records", $this->shipment);

if (!$lastShipment || Carbon::createFromTimestamp($lastShipment)->isBefore(now()->startOfMinute())) {
// New throttle setup
$timestamp = now()->timestamp;
Redis::connection('data-shipper')->pipeline(function ($pipe) use($timestamp) {
$pipe->zadd("data-shipper-records", $timestamp, $this->shipment);
$pipe->expire("data-shipper-records", 60);
$pipe->del("data-shipper-{$this->shipment}-per-minute");
$pipe->incr("data-shipper-{$this->shipment}-per-minute");
$pipe->expire("data-shipper-{$this->shipment}-per-minute", 60);
$pipe->zadd("records", $timestamp, $this->shipment);
$pipe->expire("records", 60);
$pipe->del("{$this->shipment}-per-minute");
$pipe->incr("{$this->shipment}-per-minute");
$pipe->expire("{$this->shipment}-per-minute", 60);
});
} else {
$handledThisMinute = Redis::connection('data-shipper')->incr("data-shipper-{$this->shipment}-per-minute");
$handledThisMinute = Redis::connection('data-shipper')->incr("{$this->shipment}-per-minute");
if ($handledThisMinute > $this->maxShipmentsPerMinute) {
return;
}
Expand All @@ -61,8 +61,10 @@ public function handle()
$packageIds = DataShipper::getPackagesForShipment($this->shipment, true);

$jobs = [];
foreach ($subscribers as $subscriberName => $subscriber) {
$jobs[] = new DispatchShipmentToSubscriber($this->shipment, $packageIds, $subscriberName);
if (!empty($packageIds)) {
foreach ($subscribers as $subscriberName => $subscriber) {
$jobs[] = new DispatchShipmentToSubscriber($this->shipment, $packageIds, $subscriberName);
}
}

$jobs[] = new ClearPackagesFromShipment($this->shipment, count($packageIds), $limitHit);
Expand Down
41 changes: 28 additions & 13 deletions src/ShipmentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -116,32 +116,47 @@ public function flushPackagesForShipment(string $key, int $length): bool {

$lock->block(10);

$packageIds = $this->connection()->zrange($key, 0, $length - 1);
if ($length > 0) {
$packageIds = $this->connection()->zrange($key, 0, $length - 1);

$results = $this->connection()->pipeline(function ($pipe) use ($key, $length, $packageIds) {
foreach ($packageIds as $id) {
$pipe->hdel($id);
}
$results = $this->connection()->pipeline(function ($pipe) use ($key, $length, $packageIds) {
foreach ($packageIds as $id) {
$pipe->hdel($id);
}

$pipe->zrem($key, ...$packageIds);
$pipe->decrby("{$key}-shipment-length", count($packageIds));
});
$pipe->zrem($key, ...$packageIds);
$pipe->decrby("{$key}-shipment-length", count($packageIds));
});

$count = array_pop($results);
$count = array_pop($results);
} else {
$count = $this->connection()->get("{$key}-shipment-length") ?? 0;
}

// If there are no shipments left, we can remove this shipment record, so we don't keep checking to push changes
if ($count < 1) {
$this->connection()->pipeline(function ($pipe) use ($key) {
$pipe->del($key);
$pipe->del("{$key}-shipment-length");
});
$this->wipeShipmentRecord($key);
}

$lock->release();

return $count >= $this->maxShipmentLength;
}

/**
* Remove the shipment from the list of shipments
*
* @param $key
* @return void
*/
public function wipeShipmentRecord($key)
{
$this->connection()->pipeline(function ($pipe) use ($key) {
$pipe->zrem('shipments', $key);
$pipe->del("{$key}-shipment-length");
});
}

/**
* Get a list of shipments that are ready to notify subscribers
*
Expand Down

0 comments on commit 48e5b23

Please sign in to comment.