diff --git a/composer.json b/composer.json index e84bd3c..f144772 100644 --- a/composer.json +++ b/composer.json @@ -18,7 +18,7 @@ "processing" ], "require": { - "php": ">=5.6", + "php": ">=5.4", "predis/predis": "^1.0" }, "require-dev": { diff --git a/examples/basic.php b/examples/basic.php index cc7b7e4..2eda0b7 100644 --- a/examples/basic.php +++ b/examples/basic.php @@ -1,5 +1,8 @@ push('ProcessImage', $args); +$id = $client->push('ProcessImage', $args, true); +_print($id, true); -var_dump(sprintf('Pushed job with id %s', $id)); \ No newline at end of file +$id = $client->push('ProcessImage', $args, false); +_print($id, false); diff --git a/examples/bulk.php b/examples/bulk.php index f9258f0..9926044 100644 --- a/examples/bulk.php +++ b/examples/bulk.php @@ -11,7 +11,7 @@ $jobs = [ ['class' => 'ProcessImage', 'args' => [['url' => 'http://i.imgur.com/hlAsa4k.jpg'], true, 12]], ['class' => 'ProcessImage', 'args' => [['url' => 'http://i.imgur.com/hlAsa4k.jpg'], true, null]], - ['class' => 'ProcessImage', 'args' => [['url' => 'http://i.imgur.com/hlAsa4k.jpg'], false, 45]] + ['class' => 'ProcessImage', 'args' => [['url' => 'http://i.imgur.com/hlAsa4k.jpg']], 'at' => microtime(true)+60*6, 'retry' => false] ]; // push jobs to a different queue diff --git a/lib/Client.php b/lib/Client.php index 0493707..f0ff0cc 100644 --- a/lib/Client.php +++ b/lib/Client.php @@ -45,12 +45,13 @@ public function __construct(\Predis\Client $redis, $namespace = null, $serialize * @param string $class * @param array $args * @param string $queue + * @param bool $retry * @return string */ - public function push($class, $args = [], $queue = self::QUEUE) + public function push($class, $args = [], $retry = true, $queue = self::QUEUE) { $jobId = $this->idGenerator->generate(); - $this->atomicPush($jobId, $class, $args, $queue); + $this->atomicPush($jobId, $class, $args, $queue, $retry); return $jobId; } @@ -62,12 +63,13 @@ public function push($class, $args = [], $queue = self::QUEUE) * @param string $class * @param array $args * @param string $queue + * @param bool $retry * @return string */ - public function schedule($doAt, $class, $args = [], $queue = self::QUEUE) + public function schedule($doAt, $class, $args = [], $retry = true, $queue = self::QUEUE) { $jobId = $this->idGenerator->generate(); - $this->atomicPush($jobId, $class, $args, $queue, $doAt); + $this->atomicPush($jobId, $class, $args, $queue, $retry, $doAt); return $jobId; } @@ -79,7 +81,9 @@ public function schedule($doAt, $class, $args = [], $queue = self::QUEUE) * $jobs = [ * [ * 'class' => 'SomeClass', - * 'args' => array() + * 'args' => array(), + * 'retry' => false, + * 'at' => microtime(true) * ] * ]; * @@ -99,9 +103,12 @@ public function pushBulk($jobs = [], $queue = self::QUEUE) throw new Exception('pushBulk: each job needs args'); } + $retry = isset($job['retry']) ? $job['retry'] : true; + $doAt = isset($job['at']) ? $job['at'] : null; + $jobId = $this->idGenerator->generate(); array_push($ids, $jobId); - $this->atomicPush($jobId, $job['class'], $job['args'], $queue); + $this->atomicPush($jobId, $job['class'], $job['args'], $queue, $retry, $doAt); } return $ids; @@ -114,10 +121,11 @@ public function pushBulk($jobs = [], $queue = self::QUEUE) * @param string $class * @param array $args * @param string $queue + * @param bool $retry * @param float|null $doAt * @throws exception Exception */ - private function atomicPush($jobId, $class, $args = [], $queue = self::QUEUE, $doAt = null) + private function atomicPush($jobId, $class, $args = [], $queue = self::QUEUE, $retry = true, $doAt = null) { if (array_values($args) !== $args) { throw new Exception('Associative arrays in job args are not allowed'); @@ -127,7 +135,7 @@ private function atomicPush($jobId, $class, $args = [], $queue = self::QUEUE, $d throw new Exception('at argument needs to be in a unix epoch format. Use microtime(true).'); } - $job = $this->serializer->serialize($jobId, $class, $args); + $job = $this->serializer->serialize($jobId, $class, $args, $retry); if ($doAt === null) { $this->redis->sadd($this->name('queues'), $queue); @@ -141,7 +149,7 @@ private function atomicPush($jobId, $class, $args = [], $queue = self::QUEUE, $d * @param string ...$key * @return string */ - private function name(...$key) + private function name() { return implode(':', array_filter(array_merge([$this->namespace], func_get_args()), 'strlen')); } diff --git a/lib/Serializer.php b/lib/Serializer.php index 4029439..45aaec4 100644 --- a/lib/Serializer.php +++ b/lib/Serializer.php @@ -17,20 +17,14 @@ class Serializer * @return string * @throws exception Exception */ - public function serialize($jobId, $class, $args = []) + public function serialize($jobId, $class, $args = [], $retry = true) { $class = is_object($class) ? get_class($class) : $class; - $retry = (isset($args['retry']) && is_bool($args['retry'])) ? $args['retry'] : true; - $createdAt = isset($args['created_at']) ? $args['created_at'] : microtime(true); - - if (!is_float($createdAt) && is_string($createdAt)) { - throw new Exception('created_at argument needs to be in a unix epoch format. Use microtime(true).'); - } $data = [ 'class' => $class, 'jid' => $jobId, - 'created_at' => $createdAt, + 'created_at' => microtime(true), 'enqueued_at' => microtime(true), 'args' => $args, 'retry' => $retry,