Symfony Messenger AMQP transport that uses consume instead of get.
the-consoomer is a custom AMQP transport for the Symfony Messenger component. Unlike the default AMQP transport, which relies on the get method for message retrieval, this package uses the consume method to process messages from an AMQP broker. This can result in different performance characteristics and is more suitable for certain messaging patterns.
Requirements: PHP 8.2+ with the amqp extension installed.
- Language: PHP
- Framework: Symfony
- License: MIT
- Status: Public, actively maintained
- Custom AMQP transport for Symfony Messenger
- Uses
basic_consumefor push-based message processing - Lower latency and better throughput than polling-based
basic_get
composer require crazy-goat/the-consoomer- Register the transport factory in your Symfony services configuration.
- Use it in your Messenger transport configuration.
services:
CrazyGoat\TheConsoomer\AmqpTransport:
tags:
- { name: 'messenger.transport_factory' }framework:
messenger:
transports:
consoomer:
dsn: 'amqp-consoomer://guest:guest@localhost:5672/%2f/?queue=my_queue'amqp-consoomer://<user>:<password>@<host>:<port>/<vhost>/<exchange>/?queue=<queue_name>
Example: amqp-consoomer://guest:guest@localhost:5672/%2f/my_exchange/?queue=test
| Option | Description | Default |
|---|---|---|
queue |
Queue name to consume from | (required) |
max_unacked_messages |
Prefetch count / batch size | 100 |
timeout |
Consumer timeout in seconds | 0.1 |
heartbeat |
Connection heartbeat interval in seconds (0 = disabled) | 0 |
routing_key |
Consumer-side: binding key used when declaring/binding the queue | '' |
default_publish_routing_key |
Sender-side: default routing key used when publishing messages | '' |
The transport uses separate routing keys for consuming and sending:
- Consumer (Receiver): Uses
routing_keyas the binding key when declaring/binding the queue to an exchange. This determines which messages are routed to the queue. - Sender: Uses
default_publish_routing_keyas the default routing key when publishing messages. This determines how messages are routed through the exchange.
When sending a message, the routing key precedence is:
AmqpStamp::getRoutingKey()— message-specific routing key (highest priority)default_publish_routing_key— configured default for publishing''— empty string (no routing key)
This separation prevents unintended coupling: setting routing_key for consumer binding does not affect how messages are published.
The heartbeat feature keeps connections alive and detects dead connections. When enabled, the connection will automatically reconnect if no activity is detected for twice the heartbeat interval.
framework:
messenger:
transports:
consoomer:
dsn: 'amqp-consoomer://guest:guest@localhost:5672/%2f/?queue=my_queue&heartbeat=60'With heartbeat enabled:
- Connection is checked before each operation (send, get, ack, reject)
- If stale (elapsed > 2 * heartbeat), automatic reconnect occurs
- Activity is updated after each operation
# All tests
composer test
# Unit tests only
composer test-unit
# E2E tests (requires RabbitMQ)
composer test-e2e-fullE2E tests require RabbitMQ. The test-e2e-full script automatically:
- Starts RabbitMQ via Docker
- Waits for RabbitMQ to be ready
- Runs E2E tests
- Stops RabbitMQ
# Run rector + php-cs-fixer
composer lintContributions are welcome! Please open an issue or submit a pull request.
This project is licensed under the MIT License.
