ActiveMQ – reconnect consumer after restart

I am using ActiveMQ, specifically in the Symfony 4 stomp-php package. The problem is that everything works fine, as long as ActiveMQ is restarted on the server, then the consumer, although connected, no longer downloads the messages.
My code looks that:

      $messages = 0;
        $this->subscription = $this->getSubscription($input->getArgument('name'));
        $signalHandler = $this->registerSignalHandler();
        foreach ($this->subscription->consume() as $unit) {
            if (null !== $unit) {
                ++$messages;
                if ($io->isVerbose()) {
                    $io->writeln(sprintf('Message #%s %s', $messages, ($unit ? 'processed' : 'skipped')));
                }
                if ($maxMessages && $messages == $maxMessages) {
                    $this->subscription->stop();
                }
            } elseif ($io->isVeryVerbose()) {
                $io->warning('No message available yet!');
            }
    
            if ($signalHandler) {
                pcntl_signal_dispatch();
            }
    
        }
        return 0;
    }

 private function getSubscription(string $name): Subscription
    {
        $id = sprintf(CustomStompExtension::CONSUMER_ID, $name);
        $subscription = $this->container->get($id);

        if (!is_a($subscription, Subscription::class)) {
            throw new LogicException(sprintf('The service "%s" is not a "%s".', $id, Subscription::class));
        }

        return $subscription;
    }

 private function registerSignalHandler(): bool
    {
        if (extension_loaded('pcntl') && function_exists('pcntl_signal')) {
            pcntl_signal(SIGTERM, [$this, 'stopSubscription']);
            pcntl_signal(SIGINT, [$this, 'stopSubscription']);
            return true;
        }
        return false;
    }

My ClientBuilder

<?php

use StompClient;
use StompNetworkConnection;
use StompNetworkObserverHeartbeatEmitter;
use StompStatefulStomp;

class ClientBuilder
{
    /**
     * @var string
     */
    private $host;
    /**
     * @var int
     */
    private $port;
    /**
     * @var string
     */
    private $user;
    /**
     * @var string
     */
    private $password;
    /**
     * @var string
     */
    private $vhost;
    /**
     * @var int
     */
    private $connectionTimeout;
    /**
     * @var int
     */
    private $writeTimeout;
    /**
     * @var int
     */
    private $readTimeoutMs;
    /**
     * @var int
     */
    private $heartbeatClientMs;
    /**
     * @var int
     */
    private $heartbeatServerMs;

    /**
     * @var string
     */
    private $brokerUri;

    /*
     * @var array
     */
    private $context = [];

    /**
     * Sets options for this builder.
     *
     * @param array $options
     */
    public function setOptions(array $options): void
    {

        $keys = array_keys(get_object_vars($this));
        foreach ($options as $k => $v) {
            $k = preg_replace_callback(
                '/_w/',
                function ($str) {
                    return strtoupper($str[0][1]);
                },
                $k
            );
            if (!in_array($k, $keys, true)) {
                throw new InvalidArgumentException(sprintf('The parameter "%s" is not supported.', $k));
            }
            $this->$k = $v;
        }
    }

    /**
     * Returns the connection defined by this builder.
     *
     * @return StatefulStomp
     *
     * @throws StompExceptionConnectionException
     */
    public function getClient(): StatefulStomp
    {
        $connection = $this->getConnection();

        if(!$connection->isConnected()) {
            $connection = $this->getConnection();
        }

        $client = new Client($connection);
        if ($this->vhost) {
            $client->setVhostname($this->vhost);
        }
        if ($this->user || $this->password) {
            $client->setLogin($this->user, $this->password);
        }

        $this->configureHeartBeat($client);

        return new StatefulStomp($client);
    }

    /**
     * @return Connection
     *
     * @throws StompExceptionConnectionException
     */
    private function getConnection(): Connection
    {
        $connection = new Connection($this->getConnectionString(), $this->getConnectTimeout(), $this->context);
        if ($this->writeTimeout) {
            $connection->setWriteTimeout($this->writeTimeout);
        }

        if ($this->readTimeoutMs) {
            $seconds = ($this->readTimeoutMs - ($this->readTimeoutMs % 1000)) / 1000;
            $microseconds = ($this->readTimeoutMs % 1000) * 1000;
            $connection->setReadTimeout($seconds, $microseconds);
        }

        return $connection;
    }

    private function getConnectionString(): string
    {
        if ($this->brokerUri) {
            return $this->brokerUri;
        }

        return sprintf('tcp://%s:%s', $this->host, $this->port);
    }

    private function getConnectTimeout(): int
    {
        return $this->connectionTimeout ?: 1;
    }

    private function configureHeartBeat(Client $client): void
    {
        if (!$this->heartbeatClientMs && !$this->heartbeatServerMs) {
            return;
        }
        $observer = new HeartbeatEmitter($client->getConnection());
        $client->setHeartbeat($this->heartbeatClientMs, $this->heartbeatServerMs);
        $client->getConnection()->getParser()->setObserver($observer);
    }
}

In env I got:

ACTIVEMQ_STOMP_BROKER_URI="failover://(tcp://127.0.0.1:61613/?initialReconnectDelay=100&maxReconnectAttempts=0)"

Any idea? How can I do this?

Source: Symfony Questions

Was this helpful?

0 / 0

Leave a Reply 0

Your email address will not be published. Required fields are marked *