Skip to content

Commit 3481382

Browse files
committed
Add consume strategy. Add handle signals.
1 parent 74ca1b5 commit 3481382

37 files changed

Lines changed: 915 additions & 217 deletions

CHANGELOG.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
CHANGELOG
22
=========
33

4+
Next release
5+
------------
6+
7+
* Set minimum Symfony version to 6.4
8+
* Remove `--loop` option. Use `LoopConsumer` instead.
9+
* Add consume strategy (`Default` and `Loop`).
10+
411
v2.1.3
512
------
613

@@ -15,7 +22,7 @@ v2.1.1
1522
------
1623

1724
* Add queue name to `ReceivedMessage`.
18-
* Add method `ReceivedMessage::isDirectPublisher` for check, is message direct published to queue (via default exchange).
25+
* Add method `ReceivedMessage::isDirectPublished` for check, is message direct published to queue (via default exchange).
1926

2027
v2.1.0
2128
------

Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
FROM php:8.4-cli
22

3-
MAINTAINER Vitalii Zhuk <v.zhuk@fivelab.org>
3+
LABEL org.opencontainers.image.authors="Vitalii Zhuk <v.zhuk@fivelab.org>"
44

55
ARG XDEBUG_REMOTE_HOST='host.docker.internal'
66
ARG XDEBUG_REMOTE_PORT=9000
@@ -19,7 +19,7 @@ RUN \
1919
librabbitmq-dev && \
2020
printf '\n' | pecl install amqp && \
2121
yes | pecl install xdebug && \
22-
docker-php-ext-install sockets && \
22+
docker-php-ext-install sockets pcntl && \
2323
docker-php-ext-enable amqp xdebug
2424

2525
# Configure XDebug

composer.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@
2525
"require-dev": {
2626
"ext-amqp": "*",
2727
"ext-sockets": "*",
28+
"ext-pcntl": "*",
2829
"phpunit/phpunit": "~11.5",
2930
"phpmetrics/phpmetrics": "~3.0",
3031
"phpstan/phpstan": "~2.1.6",
3132
"escapestudios/symfony2-coding-standard": "~3.5",
3233
"guzzlehttp/guzzle": "~7.0",
33-
"symfony/console": "~5.4 | ~6.0 | ~7.0",
34+
"symfony/console": "~6.4 | ~7.0",
35+
"symfony/process": "~6.4 | ~7.0",
3436
"ramsey/uuid": "~4.7",
3537
"fivelab/transactional": "~2.0",
3638
"fivelab/ci-rules": "dev-master",

docker-compose.yml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
services:
2+
rabbitmq:
3+
image: rabbitmq:management
4+
container_name: fivelab-amqp-rabbitmq
5+
ports:
6+
- '15672:15672'
7+
networks:
8+
- amqp
9+
10+
amqp:
11+
build:
12+
dockerfile: Dockerfile
13+
context: .
14+
container_name: fivelab-amqp-php
15+
tty: true
16+
volumes:
17+
- './:/code'
18+
environment:
19+
RABBITMQ_HOST: fivelab-amqp-rabbitmq
20+
networks:
21+
- amqp
22+
23+
networks:
24+
amqp:
25+
name: fivelab-amqp
26+
driver: bridge

phpstan.neon

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,3 @@ parameters:
22
level: 8
33
paths:
44
- src
5-
6-
ignoreErrors:
7-
-
8-
message: '#^Property FiveLab\\Component\\Amqp\\Command\\.+::\$(defaultName|defaultDescription) has no type specified.$#'
9-
path: src/

src/Adapter/Amqp/Queue/AmqpQueue.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public function consume(\Closure $handler, string $tag = ''): void
4444
$this->queue->consume(function (\AMQPEnvelope $envelope) use ($handler) {
4545
$receivedMessage = new AmqpReceivedMessage($this->queue, $envelope);
4646

47-
$handler($receivedMessage);
47+
return $handler($receivedMessage);
4848
}, AMQP_NOPARAM, $tag);
4949
} catch (\AMQPQueueException $e) {
5050
if (false !== \stripos($e->getMessage(), 'consumer timeout exceed')) {

src/AmqpBuilder.php

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the FiveLab Amqp package
5+
*
6+
* (c) FiveLab
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code
10+
*/
11+
12+
declare(strict_types = 1);
13+
14+
namespace FiveLab\Component\Amqp;
15+
16+
use FiveLab\Component\Amqp\Adapter\Amqp\Channel\AmqpChannelFactory;
17+
use FiveLab\Component\Amqp\Adapter\Amqp\Connection\AmqpConnectionFactory;
18+
use FiveLab\Component\Amqp\Adapter\Amqp\Exchange\AmqpExchangeFactory;
19+
use FiveLab\Component\Amqp\Adapter\Amqp\Queue\AmqpQueueFactory;
20+
use FiveLab\Component\Amqp\Channel\ChannelFactoryInterface;
21+
use FiveLab\Component\Amqp\Channel\Definition\ChannelDefinition;
22+
use FiveLab\Component\Amqp\Connection\ConnectionFactoryInterface;
23+
use FiveLab\Component\Amqp\Connection\Dsn;
24+
use FiveLab\Component\Amqp\Exchange\Definition\ExchangeDefinition;
25+
use FiveLab\Component\Amqp\Exchange\ExchangeFactoryInterface;
26+
use FiveLab\Component\Amqp\Publisher\Middleware\PublisherMiddlewares;
27+
use FiveLab\Component\Amqp\Publisher\Publisher;
28+
use FiveLab\Component\Amqp\Publisher\PublisherInterface;
29+
use FiveLab\Component\Amqp\Queue\Definition\QueueDefinition;
30+
use FiveLab\Component\Amqp\Queue\QueueFactoryInterface;
31+
32+
class AmqpBuilder
33+
{
34+
private readonly Dsn $dsn;
35+
private ?ConnectionFactoryInterface $connectionFactory = null;
36+
private ?ChannelFactoryInterface $channelFactory = null;
37+
38+
public function __construct(Dsn|string $dsn)
39+
{
40+
if (\is_string($dsn)) {
41+
$dsn = Dsn::fromDsn($dsn);
42+
}
43+
44+
$this->dsn = $dsn;
45+
}
46+
47+
public function createConnection(): ConnectionFactoryInterface
48+
{
49+
if (!$this->connectionFactory) {
50+
$this->connectionFactory = new AmqpConnectionFactory($this->dsn);
51+
}
52+
53+
return $this->connectionFactory;
54+
}
55+
56+
public function createChannel(): ChannelFactoryInterface
57+
{
58+
if (!$this->channelFactory) {
59+
$this->channelFactory = new AmqpChannelFactory($this->createConnection(), new ChannelDefinition());
60+
}
61+
62+
return $this->channelFactory;
63+
}
64+
65+
public function createExchange(ExchangeDefinition $definition): ExchangeFactoryInterface
66+
{
67+
return new AmqpExchangeFactory($this->createChannel(), $definition);
68+
}
69+
70+
public function createPublisher(ExchangeDefinition $definition, ?PublisherMiddlewares $middlewares = null): PublisherInterface
71+
{
72+
$exchange = $this->createExchange($definition);
73+
74+
return new Publisher($exchange, $middlewares ?: new PublisherMiddlewares());
75+
}
76+
77+
public function createQueue(QueueDefinition $definition): QueueFactoryInterface
78+
{
79+
$channel = $this->createChannel();
80+
81+
return new AmqpQueueFactory($channel, $definition);
82+
}
83+
}

src/Command/InitializeExchangesCommand.php

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
#[AsCommand(name: 'event-broker:initialize:exchanges', description: 'Initialize exchanges.')]
2323
class InitializeExchangesCommand extends Command
2424
{
25-
protected static $defaultName = 'event-broker:initialize:exchanges';
26-
protected static $defaultDescription = 'Initialize exchanges.';
2725
private ExchangeFactoryRegistryInterface $registry;
2826

2927
/**
@@ -45,12 +43,6 @@ public function __construct(ExchangeFactoryRegistryInterface $registry, array $e
4543
$this->exchanges = $exchanges;
4644
}
4745

48-
protected function configure(): void
49-
{
50-
$this
51-
->setDescription(self::$defaultDescription);
52-
}
53-
5446
protected function execute(InputInterface $input, OutputInterface $output): int
5547
{
5648
foreach ($this->exchanges as $exchange) {

src/Command/InitializeQueuesCommand.php

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
#[AsCommand(name: 'event-broker:initialize:queues', description: 'Initialize queues.')]
2323
class InitializeQueuesCommand extends Command
2424
{
25-
protected static $defaultName = 'event-broker:initialize:queues';
26-
protected static $defaultDescription = 'Initialize queues.';
2725
private QueueFactoryRegistryInterface $registry;
2826

2927
/**
@@ -45,12 +43,6 @@ public function __construct(QueueFactoryRegistryInterface $registry, array $queu
4543
$this->queues = $queues;
4644
}
4745

48-
protected function configure(): void
49-
{
50-
$this
51-
->setDescription(self::$defaultDescription);
52-
}
53-
5446
protected function execute(InputInterface $input, OutputInterface $output): int
5547
{
5648
foreach ($this->queues as $queue) {

src/Command/ListConsumersCommand.php

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,6 @@
2121
#[AsCommand(name: 'event-broker:consumer:list', description: 'List of possible consumers.')]
2222
class ListConsumersCommand extends Command
2323
{
24-
protected static $defaultName = 'event-broker:consumer:list';
25-
protected static $defaultDescription = 'List of possible consumers.';
26-
2724
/**
2825
* @var array<string>
2926
*/
@@ -41,12 +38,6 @@ public function __construct(array $consumers)
4138
$this->consumers = $consumers;
4239
}
4340

44-
protected function configure(): void
45-
{
46-
$this
47-
->setDescription(self::$defaultDescription);
48-
}
49-
5041
protected function execute(InputInterface $input, OutputInterface $output): int
5142
{
5243
$output->writeln('Possible consumers are:');

0 commit comments

Comments
 (0)