diff --git a/src/DataShipper.php b/src/DataShipper.php index 7f5aa6f..388f971 100644 --- a/src/DataShipper.php +++ b/src/DataShipper.php @@ -42,6 +42,8 @@ public function pushMany(string $className, array|Collection $changes, $identifi /** @var Model $class */ $class = new $className(); + // Get absolute class name + $className = (new \ReflectionClass(new $className()))->getName(); $identifierKey = $identifierKey ?? $class->getKeyName(); $ids = array_column($changes, $identifierKey); @@ -80,6 +82,9 @@ public function push(string $className, array|Collection $changes, $identifier, throw new \Exception("Provided model does not use the HasDataSubscribers trait"); } + // Get absolute class name + $className = (new \ReflectionClass(new $className()))->getName(); + if ($changes instanceof Collection) { $changes = $changes->toArray(); } diff --git a/src/Jobs/NotifySubscriberOfShipment.php b/src/Jobs/NotifySubscriberOfShipment.php index 55535b5..f40ba33 100644 --- a/src/Jobs/NotifySubscriberOfShipment.php +++ b/src/Jobs/NotifySubscriberOfShipment.php @@ -36,20 +36,20 @@ public function handle() } $limitHit = false; - $lastShipment = Redis::connection('data-shipper')->zscore("data-shipper-records", $this->shipment); + $lastShipment = Redis::connection('data-shipper')->zscore("records", $this->shipment); if (!$lastShipment || Carbon::createFromTimestamp($lastShipment)->isBefore(now()->startOfMinute())) { // New throttle setup $timestamp = now()->timestamp; Redis::connection('data-shipper')->pipeline(function ($pipe) use($timestamp) { - $pipe->zadd("data-shipper-records", $timestamp, $this->shipment); - $pipe->expire("data-shipper-records", 60); - $pipe->del("data-shipper-{$this->shipment}-per-minute"); - $pipe->incr("data-shipper-{$this->shipment}-per-minute"); - $pipe->expire("data-shipper-{$this->shipment}-per-minute", 60); + $pipe->zadd("records", $timestamp, $this->shipment); + $pipe->expire("records", 60); + $pipe->del("{$this->shipment}-per-minute"); + $pipe->incr("{$this->shipment}-per-minute"); + $pipe->expire("{$this->shipment}-per-minute", 60); }); } else { - $handledThisMinute = Redis::connection('data-shipper')->incr("data-shipper-{$this->shipment}-per-minute"); + $handledThisMinute = Redis::connection('data-shipper')->incr("{$this->shipment}-per-minute"); if ($handledThisMinute > $this->maxShipmentsPerMinute) { return; } @@ -61,8 +61,10 @@ public function handle() $packageIds = DataShipper::getPackagesForShipment($this->shipment, true); $jobs = []; - foreach ($subscribers as $subscriberName => $subscriber) { - $jobs[] = new DispatchShipmentToSubscriber($this->shipment, $packageIds, $subscriberName); + if (!empty($packageIds)) { + foreach ($subscribers as $subscriberName => $subscriber) { + $jobs[] = new DispatchShipmentToSubscriber($this->shipment, $packageIds, $subscriberName); + } } $jobs[] = new ClearPackagesFromShipment($this->shipment, count($packageIds), $limitHit); diff --git a/src/ShipmentRepository.php b/src/ShipmentRepository.php index ca485ed..0ebd13f 100644 --- a/src/ShipmentRepository.php +++ b/src/ShipmentRepository.php @@ -116,25 +116,26 @@ public function flushPackagesForShipment(string $key, int $length): bool { $lock->block(10); - $packageIds = $this->connection()->zrange($key, 0, $length - 1); + if ($length > 0) { + $packageIds = $this->connection()->zrange($key, 0, $length - 1); - $results = $this->connection()->pipeline(function ($pipe) use ($key, $length, $packageIds) { - foreach ($packageIds as $id) { - $pipe->hdel($id); - } + $results = $this->connection()->pipeline(function ($pipe) use ($key, $length, $packageIds) { + foreach ($packageIds as $id) { + $pipe->hdel($id); + } - $pipe->zrem($key, ...$packageIds); - $pipe->decrby("{$key}-shipment-length", count($packageIds)); - }); + $pipe->zrem($key, ...$packageIds); + $pipe->decrby("{$key}-shipment-length", count($packageIds)); + }); - $count = array_pop($results); + $count = array_pop($results); + } else { + $count = $this->connection()->get("{$key}-shipment-length") ?? 0; + } // If there are no shipments left, we can remove this shipment record, so we don't keep checking to push changes if ($count < 1) { - $this->connection()->pipeline(function ($pipe) use ($key) { - $pipe->del($key); - $pipe->del("{$key}-shipment-length"); - }); + $this->wipeShipmentRecord($key); } $lock->release(); @@ -142,6 +143,20 @@ public function flushPackagesForShipment(string $key, int $length): bool { return $count >= $this->maxShipmentLength; } + /** + * Remove the shipment from the list of shipments + * + * @param $key + * @return void + */ + public function wipeShipmentRecord($key) + { + $this->connection()->pipeline(function ($pipe) use ($key) { + $pipe->zrem('shipments', $key); + $pipe->del("{$key}-shipment-length"); + }); + } + /** * Get a list of shipments that are ready to notify subscribers *