It’s hard to believe, but the --all option for the messenger:consume command in Symfony was unavailable until version 7.1 — on lower versions list of transports must be passed explicitly to the executed command. But if you need this, it’s really easy to port this feature to 6.4!

Background

In GetResponse we’re currently preparing new approach for AMQP workers, because the one we have is not suitable anymore. After doing some research, we decided to use Symfony Messenger with KEDA auto-scaling. In regular Symfony application, with control over AMQP instance it’s really easy to set everything up, but the hard part in our case is that we need to fit into existing infrastructure and our application does not own AMQP server (only is one of the consumers). That means we can’t use auto-setup, dead letter queues or delayed messages out of the box, and we need to introduce transports with full backward compatibility for message payloads. A lot of customisation had to be done, but we’re almost there!

Anyway, since our AMQP server uses multiple exchanges, Symfony Messenger has to be configured with multiple transports. That leads us to one little inconvenience: we need either specify list of the transports when executing messenger:consume command, or leave it empty and interactively choose transport from the list that is printed in CLI.

But we want to consume all the transports, at least locally or in test instances (on production we will group transports and queues, so they can be scaled). How to achieve this?

Schrödinger’s --all option

Fortunately, few months ago pull request was merged, which added --all option to the messenger:consume command. But there are 2 problems with this: because of feature-freeze period in 7.0, it had to be moved to 7.1 milestone (which is not released at the time of writing), and even if it landed little earlier, our app still uses Symfony 6.4 🤷‍♂️. This is something we need to work on at some point, but is not going to happen now.

So in the end it seems like this is a Schrödinger’s option for us — it is, but at the same time it’s not available.

Porting --all to Symfony 6.4

I did some proof of concept today, and it looks like it’s possible to backport --all option to Symfony 6.4 pretty easily. To introduce this option in messenger:consume command we need to extend Symfony’s built-in command:

<?php

declare(strict_types=1);

namespace Codito\App\Cli;

use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand as SymfonyConsumeMessagesCommand;

/**
 * This is only a proxy to original `messenger:consume` command, with `--all` option ported from Symfony 7.1,
 * command's FQCN is overridden dynamically in compiler pass ({@see OverrideMessengerConsumeCommandCompilerPass}).
 *
 * @see https://github.com/symfony/symfony/pull/52411 @TODO Remove this workaround when Symfony 7.1 is used
 */
#[AsCommand(name: 'messenger:consume', description: 'Wrapper for Symfony\'s command that adds `--all` option')]
class ConsumeMessagesCommand extends SymfonyConsumeMessagesCommand
{
    /** @var list<string> */
    private array $transportNames = [];

    /**
     * @param list<string> $transportNames
     */
    public function setTransportNames(array $transportNames): void
    {
        $this->transportNames = $transportNames;
    }

    protected function configure(): void
    {
        parent::configure();

        $this->addOption('all', 'a', InputOption::VALUE_NONE, 'Consume messages from all receivers');
    }

    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        if ($input->getOption('all')) {
            if ([] === $this->transportNames) {
                throw new \RuntimeException('There are no transports configured');
            }

            $input->setArgument('receivers', $this->transportNames);
        }

        return parent::execute($input, $output);
    }

    protected function interact(InputInterface $input, OutputInterface $output): void
    {
        if ($input->getOption('all')) {
            return;
        }

        parent::interact($input, $output);
    }
}

OK, but this command is already registered in Symfony application, because framework bundle provides it out-of-the-box when Messenger component is installed. How to tell Symfony to use our customised version of the command? Compiler pass to the rescue!

<?php

declare(strict_types=1);

namespace Codito\App\DependencyInjection\Compiler;

use Codito\App\Cli\ConsumeMessagesCommand;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;

/**
 * This compiler pass is responsible for registering enhanced version of `messenger:consume` command.
 *
 * @see https://github.com/symfony/symfony/pull/52411 @TODO Remove this workaround when Symfony 7.1 is used
 */
class OverrideMessengerConsumeCommandCompilerPass implements CompilerPassInterface
{
    public function process(ContainerBuilder $container)
    {
        $symfonyConsumeCommandDefinition = $container->getDefinition('console.command.messenger_consume_messages');
        $symfonyConsumeCommandDefinition->setClass(ConsumeMessagesCommand::class);

        /** @uses ConsumeMessagesCommand::setTransportNames() */
        $symfonyConsumeCommandDefinition->addMethodCall(
            'setTransportNames',
            [$symfonyConsumeCommandDefinition->getArgument(4)]
        );
    }
}

This compiler pass does two things:

  • Overrides class parameter in console.command.messenger_consume_messages service definition, under which messenger:consume command is registered in DI container. That means, when command in executed and related service is fetched from the container, Symfony will initialise it using extended class.
  • Registers method call on our custom Codito\App\Cli\ConsumeMessagesCommand service, that will inject the list of configured transports, so we can automatically use it when --all option is present. The value that will be passed to the command is taken from original command’s definition (and is calculated in yet another compiler pass).

To make it work, we of course need to register compiler pass in our kernel:

<?php

declare(strict_types=1);

namespace Codito\App;

use Codito\App\DependencyInjection\Compiler\OverrideMessengerConsumeCommandCompilerPass;
use Symfony\Bundle\FrameworkBundle\Kernel\MicroKernelTrait;
use Symfony\Component\Config\Loader\LoaderInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\HttpKernel\Kernel as SymfonyKernel;

class Kernel extends SymfonyKernel
{
    use MicroKernelTrait;

    /**
     * Used in {@see MicroKernelTrait::registerContainerConfiguration()}
     */
    protected function configureContainer(ContainerBuilder $container, LoaderInterface $loader): void
    {
        $loader->load($this->getConfigDir() . '/services.yml');

        $container->addCompilerPass(new OverrideMessengerConsumeCommandCompilerPass());
    }
}

After that, when running bin/console messenger:consume --all, we automatically provide complete list of transports and instantly consume messages from all of them:


 [OK] Consuming messages from transports "poc1, poc2, poc3".

 // The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command.       

 // Quit the worker with CONTROL-C.                                                                                     

 // Re-run the command with a -vv option to see logs about consumed messages

Summary

Symfony’s dependency injection container is incredibly flexible. The fact you work with definitions, not with concrete instances of the services, allows you to dynamically hook into compilation process and do whatever you want — the only limit is your imagination!

Please keep in mind that DI container’s compilation process has multiple steps, so be careful how you register your compiler passes. Ordering matters, both in terms of steps and priorities within each step.