Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Channel connection is closed when consume #13

Open
alesima opened this issue Jun 26, 2024 · 4 comments
Open

Channel connection is closed when consume #13

alesima opened this issue Jun 26, 2024 · 4 comments

Comments

@alesima
Copy link

alesima commented Jun 26, 2024

I'm getting "Channel connection is closed" when consuming some queue.

<?php

namespace App\Abstracts;

use Anik\Amqp\ConsumableMessage;
use Anik\Amqp\Exchanges\Exchange;
use Anik\Amqp\Qos\Qos;
use Anik\Amqp\Queues\Queue;
use Anik\Laravel\Amqp\Facades\Amqp;
use App\Traits\LoggerTrait;
use App\Utils\Utils;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Validator;

abstract class AbstractRabbitCommand extends Command
{
    use LoggerTrait;

    public function handle()
    {
        $this->configureQueues();
    }

    abstract protected function configureQueues();

    protected function consumeQueue(string $queueName, string $method, Exchange $exchange, string $routingKey = null, int $prefetchCount = 30)
    {
        $queue = new Queue($queueName);
        $qos = new Qos(0, $prefetchCount);

        try {
            Amqp::consume([$this, $method], $routingKey ?? $queueName, $exchange, $queue, $qos);
        } catch (\Exception $e) {
            Log::error($queueName, [
                'message' => $e->getMessage(),
                'queue' => $queueName,
            ]);

            $this->error(sprintf("ERROR_CONSUME_QUEUE[%s]: error[%s]", $queueName, $e->getMessage()));
        }
    }

    protected function processMessage(ConsumableMessage $message, array $rules, callable $callback, string $queueName)
    {
        try {
            if (!Utils::checkIfIsJson($message->getMessageBody())) {
                throw new \InvalidArgumentException('Message is not a valid JSON');
            }

            $data = json_decode($message->getMessageBody(), true);
            $validated = Validator::make($data, $rules);

            if ($validated->fails()) {
                throw new \InvalidArgumentException($validated->errors()->toJson());
            }

            $callback($data);
        } catch (\Exception $e) {
            $this->logError($data ?? [], $e, debug_backtrace()[1]['function'], $queueName);
            $message->nack();
        }
    }

    protected function logError(array $data, \Exception $e, string $context, string $queueName)
    {
        Log::error($context, [
            'message' => $e->getMessage(),
            'queue' => $queueName,
            'data' => $data,
        ]);

        $this->error(sprintf("ERROR[%s]: %s", strtoupper($context), $e->getMessage()));
    }
}
<?php

namespace App\Console\Commands;

use Anik\Amqp\ConsumableMessage;
use Anik\Amqp\Exchanges\Exchange;
use Anik\Laravel\Amqp\Facades\Amqp;
use App\Abstracts\AbstractRabbitCommand;
use App\Helpers\Pubsub;
use App\Interfaces\Service\AlocacaoService;
use App\Interfaces\Service\SolicitacaoService;
use Carbon\Carbon;
use Illuminate\Support\Facades\Log;
use PhpAmqpLib\Wire\AMQPTable;

class RabbitSolicitacaoLoopCommand extends AbstractRabbitCommand
{

    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'rabbit:solicitacao:loop:subscribe';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Subscribe to RabbitMQ to listen pending call driver and loop call driver';

    /**
     * @var \App\Interfaces\Service\AlocacaoService
     */
    private $alocacaoService;

    /**
     * @var \App\Interfaces\Service\SolicitacaoService
     */
    private $solicitacaoService;

    /**
     * @var \Anik\Amqp\Exchanges\Exchange
     */
    private $exchange;

    public function __construct(AlocacaoService $alocacaoService, SolicitacaoService $solicitacaoService)
    {
        parent::__construct();

        $this->alocacaoService = $alocacaoService;
        $this->solicitacaoService = $solicitacaoService;
    }

    protected function configureQueues()
    {
        $this->exchange = new Exchange('solicitation-loop', 'x-delayed-message');

        $this->consumeQueue('pending-call-driver', 'pendingCallDriver', $this->exchange);
        $this->consumeQueue('loop-call-driver', 'loopCallDriver', $this->exchange);
        $this->consumeQueue('restart-call-driver', 'restartCallDriver', $this->exchange);
    }

    public function pendingCallDriver(ConsumableMessage $message)
    {
        $this->processMessage($message, [
            'solicitacao' => 'required|array',
            'expiresAt'   => 'required|numeric',
            'startedAt'   => 'required|numeric',
            'counter'     => 'required|numeric',
        ], function ($data) use ($message) {
            $this->info(sprintf("PENDING_CALL_DRIVER: solicitacao[%s]", $data['solicitacao']['uid_solicitacao']));
            $options = ['application_headers' => new AMQPTable(['x-delay' => 0])];
            Amqp::publish(json_encode($data), 'loop-call-driver', $this->exchange, $options);
            $message->ack();
        }, 'pending-call-driver');
    }

    public function loopCallDriver(ConsumableMessage $message)
    {
        $this->processMessage($message, [
            'solicitacao' => 'required|array',
            'expiresAt'   => 'required|numeric',
            'startedAt'   => 'required|numeric',
            'counter'     => 'required|numeric',
        ], function ($data) use ($message) {
            $data['counter']++;
            $result = $this->alocacaoService->iniciarCall($data);

            if (!$result->isSuccess()) {
                throw new \Exception($result->getError()->toJson());
            }

            $status = $result->getData()['status'];
            $this->info(sprintf("CALL_DRIVER: loop[%d/%d] solicitacao[%s] status[%s]", $data['counter'], intval(env('MAX_CALLS')), $data['solicitacao']['uid_solicitacao'], $status));

            if ($status == 'CONTINUE_CALL' && $data['counter'] <= intval(env('MAX_CALLS', 7))) {
                $options = ['application_headers' => new AMQPTable(['x-delay' => 18000])];
                Amqp::publish(json_encode($data), 'loop-call-driver', $this->exchange, $options);
            }

            $message->ack();
        }, 'loop-call-driver');
    }

    public function restartCallDriver(ConsumableMessage $message)
    {
        $this->processMessage($message, [
            'uid'                         => 'required|uuid',
            'uid_motorista'               => 'required|uuid',
            'uid_passageiro'              => 'required|uuid',
            'distancia_estimada'          => 'required|numeric',
            'duracao_estimada'            => 'required|numeric',
            'endereco_partida'            => 'required|string',
            'lat_destino'                 => 'required|numeric',
            'lat_partida'                 => 'required|numeric',
            'endereco_destino'            => 'required|string',
            'lng_destino'                 => 'required|numeric',
            'lng_partida'                 => 'required|numeric',
            'ponto_referencia'            => 'nullable|string',
            'tipo_usuario'                => 'required|numeric',
            'tipo_corrida'                => 'required|numeric',
            'tipo_pagamento'              => 'required|numeric',
            'uid_servicos_especializados' => 'nullable|string',
            'favoritos'                   => 'nullable|array',
            'primeira_corrida'            => 'required|boolean',
            'sexo'                        => 'required|string',
            'info_passageiro'             => 'required|array',
            'stripe_charge_id'            => 'nullable|string',
            'calcula_estudante'           => 'nullable|numeric',
            'promocode'                   => 'nullable|array',
        ], function ($data) use ($message) {
            $result = $this->solicitacaoService->reiniciar($data);

            if (!$result->isSuccess()) {
                $this->handleServiceError($data, $result->getError(), 'restart-call-driver');
                return;
            }

            ['status' => $status, 'data' => $solicitacao] = (array) $result->getData();
            $this->info(sprintf("RESTART_CALL_DRIVER: solicitacao[%s] status[%s]", $data['uid'], $status));

            if ($status != 'SUCCESS') {
                Pubsub::publish('stopCallDriver', json_encode(['uid' => $data['uid'], 'to' => $data['uid_passageiro']]));
                $message->ack();
                return;
            }

            Pubsub::publish('restartCallDriver', json_encode([
                'uid' => $solicitacao['uid_solicitacao'],
                'expiresIn' => intval(env('EXPIRES_IN', 300) * 1000),
                'origin' => ['lat' => $data['lat_partida'] ?? null, 'lng' => $data['lng_partida'] ?? null],
                'to' => $data['uid_passageiro'],
            ]));

            $payload = [
                'solicitacao' => $solicitacao,
                'expiresAt' => Carbon::now()->timestamp + intval(env('EXPIRES_IN', 300)),
                'startedAt' => Carbon::now()->timestamp,
                'counter' => 0,
            ];

            Amqp::publish(json_encode($payload), 'pending-call-driver', $this->exchange);
            $message->ack();
        }, 'restart-call-driver');
    }

    private function handleServiceError(array $data, $error, string $context)
    {
        Log::error($context, [
            'message' => $error->toJson(),
            'queue' => $context,
            'exchange' => $this->exchange->getName(),
            'data' => $data,
        ]);

        $this->error(sprintf("ERROR_%s: solicitacao[%s] error[%s]", strtoupper($context), $data['uid'], $error->toJson()));
        Pubsub::publish('stopCallDriver', json_encode(['uid' => $data['uid'], 'to' => $data['uid_passageiro']]));
    }
}

Any thoughts?

@alesima
Copy link
Author

alesima commented Jun 26, 2024

Here's my amqp.php config file

<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$amqp = parse_url(env('CLOUDAMQP_URL'));

return [
    'default' => env('AMQP_CONNECTION', 'rabbitmq'),

    'connections' => [
        'rabbitmq' => [
            'connection' => [
                'class' => AMQPStreamConnection::class,
                'hosts' => [
                    [
                        'host' => $amqp['host'],
                        'port' => array_key_exists("port", $amqp) ? $amqp['port'] : 5672,
                        'user' => $amqp['user'],
                        'password' => $amqp['pass'],
                        'vhost' => ltrim($amqp['path'], '/') ?: '/',
                    ]
                ],
                'options' => [],
            ],

            'message' => [
                'content_type' => env('AMQP_MESSAGE_CONTENT_TYPE', 'text/plain'),
                'delivery_mode' => env('AMQP_MESSAGE_DELIVERY_MODE', AMQPMessage::DELIVERY_MODE_PERSISTENT),
                'content_encoding' => env('AMQP_MESSAGE_CONTENT_ENCODING', 'UTF-8'),
            ],

            'exchange' => [
                'name' => env('AMQP_EXCHANGE_NAME', 'amq.direct'),
                'declare' => env('AMQP_EXCHANGE_DECLARE', false),
                'type' => env('AMQP_EXCHANGE_TYPE', 'direct'),
                'passive' => env('AMQP_EXCHANGE_PASSIVE', false),
                'durable' => env('AMQP_EXCHANGE_DURABLE', true),
                'auto_delete' => env('AMQP_EXCHANGE_AUTO_DELETE', false),
                'internal' => env('AMQP_EXCHANGE_INTERNAL', false),
                'no_wait' => env('AMQP_EXCHANGE_NOWAIT', false),
                'arguments' => [],
                'ticket' => env('AMQP_EXCHANGE_TICKET'),
            ],

            'queue' => [
                'name' => env('AMQP_QUEUE_NAME', 'amqp.laravel.queue'),
                'declare' => env('AMQP_QUEUE_DECLARE', false),
                'passive' => env('AMQP_QUEUE_PASSIVE', false),
                'durable' => env('AMQP_QUEUE_DURABLE', true),
                'exclusive' => env('AMQP_QUEUE_EXCLUSIVE', false),
                'auto_delete' => env('AMQP_QUEUE_AUTO_DELETE', false),
                'no_wait' => env('AMQP_QUEUE_NOWAIT', false),
                'arguments' => [],
                'ticket' => env('AMQP_QUEUE_TICKET'),
            ],

            'consumer' => [
                'tag' => env('AMQP_CONSUMER_TAG', ''),
                'no_local' => env('AMQP_CONSUMER_NO_LOCAL', false),
                'no_ack' => env('AMQP_CONSUMER_NO_ACK', false),
                'exclusive' => env('AMQP_CONSUMER_EXCLUSIVE', false),
                'no_wait' => env('AMQP_CONSUMER_NOWAIT', false),
                'arguments' => [],
                'ticket' => env('AMQP_CONSUMER_TICKET'),
            ],

            'qos' => [
                'enabled' => env('AMQP_QOS_ENABLED', false),
                'prefetch_size' => env('AMQP_QOS_PREFETCH_SIZE', 0),
                'prefetch_count' => env('AMQP_QOS_PREFETCH_COUNT', 1),
                'global' => env('AMQP_QOS_GLOBAL', false),
            ],

            'publish' => [
                'mandatory' => false,
                'immediate' => false,
                'ticket' => null,
                'batch_count' => 500,
            ],

            'bind' => [
                'no_wait' => false,
                'arguments' => [],
                'ticket' => null,
            ],

            'consume' => [
                'allowed_methods' => null,
                'non_blocking' => false,
                'timeout' => 0,
            ],
        ],
    ],
];

@ssi-anik
Copy link
Owner

Hello @alesima
I am extremely sorry for replying you late.

Can you check the solution from the SO and confirm if it works. https://stackoverflow.com/questions/8839094/why-do-my-rabbitmq-channels-keep-closing

@tof06
Copy link

tof06 commented Jul 31, 2024

Hi
I got exactly the same problem, and I think I found why.

When publishing a message inside a receiver callback, it uses the same Connection (beccause the AmqpManager is caching connections).
But, when publisher has finished its job, the destructor is closing the connection (and all channels), and the receiver thus cannot ack the message.

Quoting https://www.cloudamqp.com/blog/part4-rabbitmq-13-common-errors.html :

 Don’t use too many connections or channels.

Try to keep the connection/channel count low. Use separate connections to publish and consume. 
Ideally, you should have one connection per process, and then use one channel per thread in your application.

    Reuse connections
    1 connection for publishing
    1 connection for consuming

Don't really know what would be the best option to fix that...

Maybe we can create 2 connections (with same config) when we instantiate AMQP, with getProducer and getConsumer will use each.
Or, maybe simpler in destructor Anik\Amqp\Connection, we shouldn't close the connection, which would be closed when process is destructed...

@ssi-anik
Copy link
Owner

ssi-anik commented Aug 8, 2024

@tof06 I am again sorry for the late reply due to unavoidable circumstances. Can you share a working code that breaks the connection?

Also, publishing is mostly done from the web requests, and can also be published from the jobs. As soon as the request or job is done, the connection is closed and the consumption should be done in a long-running command, where the connection is kept alive.

So, it uses two different connection to publish and consume the data. Am I missing anything here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants