Skip to content

Commit

Permalink
bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
amirfaramarzi committed Jul 3, 2021
1 parent 474f98a commit 8867141
Showing 1 changed file with 19 additions and 28 deletions.
47 changes: 19 additions & 28 deletions src/JobWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,32 @@ public function workerRun()

protected function listen(): void
{
Timer::add(0.00001 , function (){
Timer::add(1 , function (){
$queue = Redis::getInstance()->lPop(env('APP_NAME' , 'tower') . 'Queue');
if ($queue){
$data = json_decode($queue);
try {
$class = new $this->jobs[$data->queue]();
if (method_exists($class ,'handle'))
call_user_func_array([$class , 'handle'] , [$data->data]);
}catch (Throwable $e)
{
$class = new $this->jobs[$data->queue]();
if ($data->attempts > 1){
if (method_exists($class ,'retry')){
call_user_func_array([$class , 'retry'] , [$data->data , $data->attempts]);
}else{
try {
throw new QueueException('retry' , $data->queue , (array) $data->data , $e->getMessage() , $e->getFile() , $e->getLine());
}catch (QueueException $e)
{
$e->handle();
}
if (isset($this->jobs[$data->queue])){
try {
$class = new $this->jobs[$data->queue]();
if (method_exists($class ,'handle'))
call_user_func_array([$class , 'handle'] , [$data->data]);
}catch (Throwable $e)
{
$class = new $this->jobs[$data->queue]();
if ($data->attempts > 1){
(new QueueException('retry' , $data->queue , (array) $data->data , $e->getMessage() , $e->getFile() , $e->getLine()))->handle();
(new Queue())->store($data->queue , (array) $data->data , $data->attempts - 1);
}
}else {
if (method_exists($class ,'failed')){
call_user_func_array([$class , 'failed'] , [$data->data]);
}else{
try {
throw new QueueException('failed' , $data->queue , (array) $data->data , $e->getMessage() , $e->getFile() , $e->getLine());
}catch (QueueException $e)
{
$e->handle();
}else {
if (method_exists($class ,'failed')){
call_user_func_array([$class , 'failed'] , [$data->data]);
}else{
(new QueueException('failed' , $data->queue , (array) $data->data , $e->getMessage() , $e->getFile() , $e->getLine()))->handle();
}
}
}
}else{
$values = json_encode($data->data);
(new Log())->channel('queue')->alert("not found handler [queue: $data->queue] [data: $values]");
}
}
});
Expand Down

0 comments on commit 8867141

Please sign in to comment.