Skip to content

Commit

Permalink
Fix for #3 and other changes
Browse files Browse the repository at this point in the history
  • Loading branch information
spinx committed Aug 21, 2015
1 parent 669fdaa commit 921a119
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 21 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"processing"
],
"require": {
"php": ">=5.6",
"php": ">=5.4",
"predis/predis": "^1.0"
},
"require-dev": {
Expand Down
9 changes: 7 additions & 2 deletions examples/basic.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
<?php
include __DIR__.'/../vendor/autoload.php';
function _print($id, $retry){
var_dump(sprintf('Pushed job with id %s and retry:%d', $id, $retry));
}

// connect to database 0 on 127.0.0.1
$redis = new Predis\Client('tcp://127.0.0.1:6379/0');
Expand All @@ -14,6 +17,8 @@
70
];

$id = $client->push('ProcessImage', $args);
$id = $client->push('ProcessImage', $args, true);
_print($id, true);

var_dump(sprintf('Pushed job with id %s', $id));
$id = $client->push('ProcessImage', $args, false);
_print($id, false);
2 changes: 1 addition & 1 deletion examples/bulk.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
$jobs = [
['class' => 'ProcessImage', 'args' => [['url' => 'http://i.imgur.com/hlAsa4k.jpg'], true, 12]],
['class' => 'ProcessImage', 'args' => [['url' => 'http://i.imgur.com/hlAsa4k.jpg'], true, null]],
['class' => 'ProcessImage', 'args' => [['url' => 'http://i.imgur.com/hlAsa4k.jpg'], false, 45]]
['class' => 'ProcessImage', 'args' => [['url' => 'http://i.imgur.com/hlAsa4k.jpg']], 'at' => microtime(true)+60*6, 'retry' => false]
];

// push jobs to a different queue
Expand Down
26 changes: 17 additions & 9 deletions lib/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ public function __construct(\Predis\Client $redis, $namespace = null, $serialize
* @param string $class
* @param array $args
* @param string $queue
* @param bool $retry
* @return string
*/
public function push($class, $args = [], $queue = self::QUEUE)
public function push($class, $args = [], $retry = true, $queue = self::QUEUE)
{
$jobId = $this->idGenerator->generate();
$this->atomicPush($jobId, $class, $args, $queue);
$this->atomicPush($jobId, $class, $args, $queue, $retry);

return $jobId;
}
Expand All @@ -62,12 +63,13 @@ public function push($class, $args = [], $queue = self::QUEUE)
* @param string $class
* @param array $args
* @param string $queue
* @param bool $retry
* @return string
*/
public function schedule($doAt, $class, $args = [], $queue = self::QUEUE)
public function schedule($doAt, $class, $args = [], $retry = true, $queue = self::QUEUE)
{
$jobId = $this->idGenerator->generate();
$this->atomicPush($jobId, $class, $args, $queue, $doAt);
$this->atomicPush($jobId, $class, $args, $queue, $retry, $doAt);

return $jobId;
}
Expand All @@ -79,7 +81,9 @@ public function schedule($doAt, $class, $args = [], $queue = self::QUEUE)
* $jobs = [
* [
* 'class' => 'SomeClass',
* 'args' => array()
* 'args' => array(),
* 'retry' => false,
* 'at' => microtime(true)
* ]
* ];
*
Expand All @@ -99,9 +103,12 @@ public function pushBulk($jobs = [], $queue = self::QUEUE)
throw new Exception('pushBulk: each job needs args');
}

$retry = isset($job['retry']) ? $job['retry'] : true;
$doAt = isset($job['at']) ? $job['at'] : null;

$jobId = $this->idGenerator->generate();
array_push($ids, $jobId);
$this->atomicPush($jobId, $job['class'], $job['args'], $queue);
$this->atomicPush($jobId, $job['class'], $job['args'], $queue, $retry, $doAt);
}

return $ids;
Expand All @@ -114,10 +121,11 @@ public function pushBulk($jobs = [], $queue = self::QUEUE)
* @param string $class
* @param array $args
* @param string $queue
* @param bool $retry
* @param float|null $doAt
* @throws exception Exception
*/
private function atomicPush($jobId, $class, $args = [], $queue = self::QUEUE, $doAt = null)
private function atomicPush($jobId, $class, $args = [], $queue = self::QUEUE, $retry = true, $doAt = null)
{
if (array_values($args) !== $args) {
throw new Exception('Associative arrays in job args are not allowed');
Expand All @@ -127,7 +135,7 @@ private function atomicPush($jobId, $class, $args = [], $queue = self::QUEUE, $d
throw new Exception('at argument needs to be in a unix epoch format. Use microtime(true).');
}

$job = $this->serializer->serialize($jobId, $class, $args);
$job = $this->serializer->serialize($jobId, $class, $args, $retry);

if ($doAt === null) {
$this->redis->sadd($this->name('queues'), $queue);
Expand All @@ -141,7 +149,7 @@ private function atomicPush($jobId, $class, $args = [], $queue = self::QUEUE, $d
* @param string ...$key
* @return string
*/
private function name(...$key)
private function name()
{
return implode(':', array_filter(array_merge([$this->namespace], func_get_args()), 'strlen'));
}
Expand Down
10 changes: 2 additions & 8 deletions lib/Serializer.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,14 @@ class Serializer
* @return string
* @throws exception Exception
*/
public function serialize($jobId, $class, $args = [])
public function serialize($jobId, $class, $args = [], $retry = true)
{
$class = is_object($class) ? get_class($class) : $class;
$retry = (isset($args['retry']) && is_bool($args['retry'])) ? $args['retry'] : true;
$createdAt = isset($args['created_at']) ? $args['created_at'] : microtime(true);

if (!is_float($createdAt) && is_string($createdAt)) {
throw new Exception('created_at argument needs to be in a unix epoch format. Use microtime(true).');
}

$data = [
'class' => $class,
'jid' => $jobId,
'created_at' => $createdAt,
'created_at' => microtime(true),
'enqueued_at' => microtime(true),
'args' => $args,
'retry' => $retry,
Expand Down

0 comments on commit 921a119

Please sign in to comment.