Skip to content

Commit

Permalink
Limit shipments per minute and added tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon Chawla committed Sep 19, 2023
1 parent b8369da commit 0f963b1
Show file tree
Hide file tree
Showing 17 changed files with 303 additions and 88 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ yarn-error.log
.idea/
phpstan-baseline.neon
phpstan.neon.dist
build/

# Laravel 4 specific
bootstrap/compiled.php
Expand Down
54 changes: 50 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ composer require autoklose/laravel-data-shipper
You can publish and run the migrations with:

```bash
php artisan vendor:publish --tag="data_shipper-migrations"
php artisan vendor:publish --tag="data-shipper-migrations"
php artisan migrate
```

You can publish the config file with:

```bash
php artisan vendor:publish --tag=":data_shipper-config"
php artisan vendor:publish --tag="data-shipper-config"
```

You can customize how often data shipments happen by changing the following values in the config file:
Expand All @@ -30,12 +30,20 @@ You can customize how often data shipments happen by changing the following valu
### max_wait_minutes
- How many minutes should Data Shipper wait until shipping changes regardless of not yet reaching the max queue size.

### max_shipments_per_minute
- How many times a shipment can be handled per a minute.

### max_retries
- How many times a failed shipment can be retried before no longer being handled.

```php
return [
'subscribers' => ['elasticsearch'],
'shipments' => [
'max_size' => 10,
'max_wait_minutes' => 5
'max_wait_minutes' => 5,
'max_shipments_per_minute' => 10,
'max_retries' => 3
]
];
```
Expand All @@ -47,7 +55,7 @@ protected function schedule(Schedule $schedule)
{
// ...

$shedule->command('data-shipper:ship-it')->everyMinute();
$schedule->command('data-shipper:ship-it')->everyMinute();
}
```

Expand Down Expand Up @@ -155,6 +163,44 @@ class Record extends Model
}
```

## Available Data Subscribers

### Elasticsearch
Elasticsearch is available as the default data subscriber. In order to work with this subscriber, each class that is passed through Data Shipper must have an `elasticsearch_index` field that is publically available.

It can be as simple as a public string property on your class.

```php
use Illuminate\Database\Eloquent\Model;
use Autoklose\DataShipper\Traits\HasDataSubscribers;

class Record extends Model
{
use HasDataSubscribers;

public string $elasticsearch_index = 'record_index';
}
```

Alternatively if you need to retrieve the index name programmatically we recommend making use of an attribute. Please note that you should not rely on class properties when creating the index attribute as the class will not have any data loaded.

```php
use Illuminate\Database\Eloquent\Model;
use Illuminate\Database\Eloquent\Casts\Attribute;
use Autoklose\DataShipper\Traits\HasDataSubscribers;

class Record extends Model
{
use HasDataSubscribers;

public function elasticsearchIndex(): Attribute
{
return Attribute::make(get: fn() => config('elasticsearch.recordIndex'));
}
}
```


## Changelog

Please see [CHANGELOG](CHANGELOG.md) for more information on what has changed recently.
Expand Down
4 changes: 3 additions & 1 deletion config/data-shipper.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
'subscribers' => ['elasticsearch'],
'shipments' => [
'max_size' => 10,
'max_wait_minutes' => 5
'max_wait_minutes' => 5,
'max_shipments_per_minute' => 10,
'max_retries' => 3
]
];
4 changes: 2 additions & 2 deletions database/migrations/create_failed_packages_table.php.stub
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ return new class extends Migration
{
public function up()
{
Schema::create('failed_shipments', function (Blueprint $table) {
Schema::create('failed_packages', function (Blueprint $table) {
$table->id();
$table->string('model_id');
$table->json('payload');

$table->unsignedBigInteger('failed_shipment_id');
$table->foreign('failed_shipment')->references('id')->on('failed_shipments');
$table->foreign('failed_shipment_id')->references('id')->on('failed_shipments')->cascadeOnDelete();

$table->timestamps();
});
Expand Down
3 changes: 3 additions & 0 deletions database/migrations/create_failed_shipments_table.php.stub
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ return new class extends Migration
$table->id();
$table->string('class_name');
$table->string('shipment');
$table->string('subscriber');
$table->timestamp('last_retried_at');
$table->unsignedInteger('retries')->default(0);

$table->timestamps();
});
Expand Down
5 changes: 2 additions & 3 deletions src/Commands/RetryFailedShipments.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ class RetryFailedShipments extends Command

public function handle()
{
$threshold = now()->subMinutes(15);
$maxRetries = config('data-shipper.shipments.max_retries');

FailedShipment::where(fn($query) => $query->whereNull('last_retried_at')->orWhere('last_retried_at', '<=', $threshold))
->where('retries', '<', 3)->chunk(250, function($failedShipments) {
FailedShipment::where('retries', '<', $maxRetries)->chunk(250, function($failedShipments) {
foreach ($failedShipments as $failedShipment) {
RetryFailedShipment::dispatch($failedShipment);
}
Expand Down
1 change: 1 addition & 0 deletions src/Commands/ShipIt.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Autoklose\DataShipper\Jobs\NotifySubscriberOfShipment;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Redis;

class ShipIt extends Command
{
Expand Down
2 changes: 1 addition & 1 deletion src/DataShipper.php
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ protected function setup(): void {

protected function resolveSubscriber($subscriber): DataSubscriberInterface {
if ($subscriber === 'elasticsearch') {
return new ElasticsearchSubscriber($this->app);
return app()->make(ElasticsearchSubscriber::class);
}

throw new InvalidArgumentException("No resolver exists for $subscriber data source.");
Expand Down
16 changes: 13 additions & 3 deletions src/Jobs/ClearPackagesFromShipment.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Cache;

class ClearPackagesFromShipment implements ShouldQueue
{
Expand All @@ -18,16 +19,25 @@ class ClearPackagesFromShipment implements ShouldQueue

protected int $length;

public function __construct(string $shipment, int $length)
protected bool $limitHit;

public function __construct(string $shipment, int $length, bool $limitHit)
{
$this->shipment = $shipment;
$this->length = $length;
$this->limitHit = $limitHit;
}

public function handle()
{
$repository = new ShipmentRepository(app()->make(Factory::class));
$repository = new ShipmentRepository(app()->make(Factory::class), config('data-shipper.shipments.max_wait_minutes'), config('data-shipper.shipments.max_size'));

$canShipAgain = $repository->flushPackagesForShipment($this->shipment, $this->length);

Cache::lock("data-shipper-{$this->shipment}-active-lock")->forceRelease();

$repository->flushPackagesForShipment($this->shipment, $this->length);
if ($canShipAgain) {
NotifySubscriberOfShipment::dispatch($this->shipment, true);
}
}
}
40 changes: 36 additions & 4 deletions src/Jobs/NotifySubscriberOfShipment.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,61 @@

namespace Autoklose\DataShipper\Jobs;

use Autoklose\DataShipper\Facades\DataShipper;
use Autoklose\DataShipper\ShipmentRepository;
use Carbon\Carbon;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Redis\Factory;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Redis;
use Autoklose\DataShipper\Facades\DataShipper;

class NotifySubscriberOfShipment implements ShouldQueue {

use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

protected string $shipment;

protected int $maxShipmentsPerMinute;

public function __construct(string $shipment)
{
$this->shipment = $shipment;
$this->maxShipmentsPerMinute = config('data-shipper.shipments.max_shipments_per_minute');
}

public function handle()
{
$lock = Cache::lock("data-shipper-{$this->shipment}-active-lock", 50);

if (!$lock->get()) {
return;
}

$limitHit = false;
$lastShipment = Redis::connection('data-shipper')->zscore("data-shipper-records", $this->shipment);

if (!$lastShipment || Carbon::createFromTimestamp($lastShipment)->isBefore(now()->startOfMinute())) {
// New throttle setup
$timestamp = now()->timestamp;
Redis::connection('data-shipper')->pipeline(function ($pipe) use($timestamp) {
$pipe->zadd("data-shipper-records", $timestamp, $this->shipment);
$pipe->expire("data-shipper-records", 60);
$pipe->del("data-shipper-{$this->shipment}-per-minute");
$pipe->incr("data-shipper-{$this->shipment}-per-minute");
$pipe->expire("data-shipper-{$this->shipment}-per-minute", 60);
});
} else {
$handledThisMinute = Redis::connection('data-shipper')->incr("data-shipper-{$this->shipment}-per-minute");
if ($handledThisMinute > $this->maxShipmentsPerMinute) {
return;
}

$limitHit = $this->maxShipmentsPerMinute === $handledThisMinute;
}

$subscribers = DataShipper::getSubscribers();
$packageIds = DataShipper::getPackagesForShipment($this->shipment, true);

Expand All @@ -33,7 +65,7 @@ public function handle()
$jobs[] = new DispatchShipmentToSubscriber($this->shipment, $packageIds, $subscriberName);
}

$jobs[] = new ClearPackagesFromShipment($this->shipment, count($packageIds));
$jobs[] = new ClearPackagesFromShipment($this->shipment, count($packageIds), $limitHit);

Bus::chain($jobs)->dispatch();
}
Expand Down
4 changes: 3 additions & 1 deletion src/Jobs/RetryFailedShipment.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public function handle()
$this->failedShipment->increment('retries');
$this->failedShipment->update(['last_retried_at' => now()]);

if ($this->failedShipment->retries >= 3) {
if ($this->failedShipment->retries >= (int)config('data-shipper.shipments.max_retries')) {
return;
}

Expand All @@ -42,5 +42,7 @@ public function handle()
$subscriber = $subscribers[$this->failedShipment->subscriber];

$subscriber->ship($preparedPackages);

$this->failedShipment->delete();
}
}
4 changes: 4 additions & 0 deletions src/Models/FailedPackage.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,8 @@

class FailedPackage extends Model {
protected $guarded = [];

protected $casts = [
'payload' => 'array'
];
}
2 changes: 1 addition & 1 deletion src/Models/FailedShipment.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ class FailedShipment extends Model {

public function packages()
{
return $this->belongsToMany(FailedPackage::class);
return $this->hasMany(FailedPackage::class);
}
}
9 changes: 5 additions & 4 deletions src/ShipmentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public function getPackagesByUuids(array $ids, string $key, $startingIndex = 0):

// Remove these jobs from the queue
$count = array_pop($packages) - count($ids);
if ($count >= 1 && $count < $this->maxShipmentLength) {
if ($count > 0 && $count < $this->maxShipmentLength) {
$this->connection()->pipeline(function ($pipe) use ($key) {
// Reset the timeout for the shipment
$this->updateShipmentManifest($pipe, $key);
Expand Down Expand Up @@ -117,14 +117,14 @@ public function flushPackagesForShipment(string $key, int $length): bool {

$lock->block(10);

$packageIds = $this->connection()->zrange($key, 0, $length);
$packageIds = $this->connection()->zrange($key, 0, $length - 1);

$results = $this->connection()->pipeline(function ($pipe) use ($key, $length, $packageIds) {
foreach ($packageIds as $id) {
$pipe->hdel($id);
}

$pipe->zrem(...$packageIds);
$pipe->zrem($key, ...$packageIds);
$pipe->decrby("{$key}-shipment-length", count($packageIds));
});

Expand All @@ -140,7 +140,7 @@ public function flushPackagesForShipment(string $key, int $length): bool {

$lock->release();

return $count === $this->maxShipmentLength;
return $count >= $this->maxShipmentLength;
}

/**
Expand Down Expand Up @@ -174,6 +174,7 @@ protected function addPackageToShipment($pipe, string $key, PackageInterface $pa
*/
public function updateShipmentManifest($pipe, string $key, bool $now = false): void {
$timestamp = !$now ? now()->addMinutes($this->minutesUntilShipment)->getPreciseTimestamp(4) : now()->getPreciseTimestamp(4);
$pipe->zrem("shipments", $key);
$pipe->zadd("shipments", $timestamp, $key);
}

Expand Down
7 changes: 2 additions & 5 deletions src/Subscribers/ElasticsearchSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,10 @@
use Elasticsearch\Common\Exceptions\Conflict409Exception;

class ElasticsearchSubscriber implements DataSubscriberInterface {
protected $app;
protected $retries;

public function __construct($app)
public function __construct()
{
$this->app = $app;

$this->retries = config('data-shipper.subscribers.elasticsearch.retires', 3);
}

Expand Down Expand Up @@ -44,7 +41,7 @@ public function ship($packages) {
}

/** @var Client $client */
$client = $this->app->make(Client::class);
$client = app()->make(Client::class);

$retries = $this->retries;

Expand Down
Loading

0 comments on commit 0f963b1

Please sign in to comment.