Skip to content

Commit cba3241

Browse files
committed
Remove middleware layer (use event-dispatcher). Add eventable tick handler for loop strategy.
1 parent 981386f commit cba3241

File tree

65 files changed

+1004
-1997
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+1004
-1997
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
CHANGELOG
22
=========
33

4+
Next release
5+
------------
6+
7+
* Remove consumer middleware layer. Use `symfony/event-dispatcher` instead of.
8+
* Remove `RamseyUuidMessageIdGenerator` (use `SymfonyUuidMessageIdGenerator` instead).
9+
410
v2.2.0
511
------
612

README.md

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -51,26 +51,10 @@ Development
5151

5252
For easy development you can use the `Docker`.
5353

54-
> **Note:** We use internal network for link the our library with rabbitmq for testing
55-
and development.
56-
5754
```bash
58-
docker network create --driver bridge amqp
59-
docker run -d --rm \
60-
--network amqp \
61-
--name amqp-rabbitmq \
62-
rabbitmq:management
63-
docker build -t amqp .
64-
docker run -it --rm \
65-
--name amqp \
66-
-v $(pwd):/code \
67-
--network amqp \
68-
-e "RABBITMQ_HOST=amqp-rabbitmq" \
69-
amqp bash
70-
71-
```
72-
73-
> **Note** for debugging you can expose 15672 port for access to management plugin.
55+
docker compose up
56+
docker compose exec amqp bash
57+
```
7458

7559
After success run and attach to container you must install vendors:
7660

composer.json

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@
3333
"guzzlehttp/guzzle": "~7.0",
3434
"symfony/console": "~6.4 | ~7.0",
3535
"symfony/process": "~6.4 | ~7.0",
36-
"ramsey/uuid": "~4.7",
36+
"symfony/uid": "~6.4 | ~7.0",
37+
"symfony/event-dispatcher": "~6.4 | ~7.0",
3738
"fivelab/transactional": "~2.0",
3839
"fivelab/ci-rules": "dev-master",
3940
"psr/log": "*",
@@ -44,10 +45,11 @@
4445

4546
"suggest": {
4647
"ext-amqp": "For use amqp extension.",
47-
"ext-sockets": "For php-amqplib",
48-
"php-amqplib/php-amqplib": "pure PHP driver",
48+
"ext-sockets": "For php-amqplib.",
49+
"ext-pcntl": "For catch signals and use alarm.",
50+
"php-amqplib/php-amqplib": "pure PHP driver.",
4951
"symfony/console": "For configure console commands for run consumer.",
50-
"ramsey/uuid": "For use UUID message id generator",
52+
"symfony/uid": "For use UUID message id generator",
5153
"fivelab/transactional": "For use transactional layer with this wrapper."
5254
},
5355

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,4 @@ services:
2323
networks:
2424
amqp:
2525
name: fivelab-amqp
26-
driver: bridge
26+
driver: bridge

docs/basic-usage.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,11 @@ And you can create and run consumer:
102102
<?php
103103

104104
use FiveLab\Component\Amqp\Consumer\SingleConsumer;
105-
use FiveLab\Component\Amqp\Consumer\Middleware\ConsumerMiddlewares;
106105
use FiveLab\Component\Amqp\Consumer\ConsumerConfiguration;
107106

108107
$consumer = new SingleConsumer(
109108
$queueFactory,
110109
new MyMessageHandler(),
111-
new ConsumerMiddlewares(),
112110
new ConsumerConfiguration()
113111
);
114112

src/Adapter/AmqpLib/Queue/AmqpQueue.php

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public function consume(\Closure $handler, string $tag = ''): void
4242
$amqplibChannel = $this->channel->getChannel();
4343

4444
$queueName = $this->getName();
45+
$stopConsuming = false;
4546

4647
try {
4748
$amqplibChannel->basic_consume(
@@ -51,15 +52,19 @@ public function consume(\Closure $handler, string $tag = ''): void
5152
false,
5253
$this->definition->exclusive,
5354
false,
54-
function (AMQPMessage $message) use ($handler, $queueName) {
55+
function (AMQPMessage $message) use ($handler, $queueName, &$stopConsuming): void {
5556
$receivedMessage = new AmqpReceivedMessage($message, $queueName);
5657

57-
$handler($receivedMessage);
58+
$result = $handler($receivedMessage);
59+
60+
if (false === $result) {
61+
$stopConsuming = true;
62+
}
5863
}
5964
);
6065

6166
// Loop as long as the channel has callbacks registered
62-
while ($amqplibChannel->is_consuming()) {
67+
while (!$stopConsuming && $amqplibChannel->is_consuming()) {
6368
$amqplibChannel->wait(null, false, $this->getChannel()->getConnection()->getReadTimeout());
6469
}
6570
} catch (\Throwable $e) {

src/Command/OutputEventHandler.php

Lines changed: 0 additions & 39 deletions
This file was deleted.

src/Command/RunConsumerCommand.php

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,18 @@
1717
use FiveLab\Component\Amqp\Consumer\Checker\RunConsumerCheckerRegistryInterface;
1818
use FiveLab\Component\Amqp\Consumer\ConsumerInterface;
1919
use FiveLab\Component\Amqp\Consumer\EventableConsumerInterface;
20-
use FiveLab\Component\Amqp\Consumer\Middleware\StopAfterNExecutesMiddleware;
21-
use FiveLab\Component\Amqp\Consumer\MiddlewareAwareInterface;
2220
use FiveLab\Component\Amqp\Consumer\Registry\ConsumerRegistryInterface;
21+
use FiveLab\Component\Amqp\Event\ProcessedMessageEvent;
2322
use FiveLab\Component\Amqp\Exception\RunConsumerCheckerNotFoundException;
23+
use FiveLab\Component\Amqp\Listener\StopAfterNExecutesListener;
2424
use Symfony\Component\Console\Attribute\AsCommand;
2525
use Symfony\Component\Console\Command\Command;
2626
use Symfony\Component\Console\Command\SignalableCommandInterface;
2727
use Symfony\Component\Console\Input\InputArgument;
2828
use Symfony\Component\Console\Input\InputInterface;
2929
use Symfony\Component\Console\Input\InputOption;
3030
use Symfony\Component\Console\Output\OutputInterface;
31+
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
3132

3233
#[AsCommand(name: 'event-broker:consumer:run', description: 'Run consumer.')]
3334
class RunConsumerCommand extends Command implements SignalableCommandInterface
@@ -37,7 +38,8 @@ class RunConsumerCommand extends Command implements SignalableCommandInterface
3738

3839
public function __construct(
3940
private readonly ConsumerRegistryInterface $consumerRegistry,
40-
?RunConsumerCheckerRegistryInterface $runCheckerRegistry = null
41+
?RunConsumerCheckerRegistryInterface $runCheckerRegistry = null,
42+
private readonly ?EventDispatcherInterface $eventDispatcher = null
4143
) {
4244
parent::__construct();
4345

@@ -95,21 +97,24 @@ protected function execute(InputInterface $input, OutputInterface $output): int
9597
$this->consumer = $this->consumerRegistry->get($consumerKey);
9698

9799
if ($this->consumer instanceof EventableConsumerInterface) {
98-
$closure = (new OutputEventHandler($output))(...);
99-
100-
$this->consumer->addEventHandler($closure);
100+
$this->consumer->setEventDispatcher($this->eventDispatcher);
101101
}
102102

103103
if ($input->getOption('messages')) {
104-
if (!$this->consumer instanceof MiddlewareAwareInterface) {
104+
if (!$this->consumer instanceof EventableConsumerInterface) {
105105
throw new \InvalidArgumentException(\sprintf(
106106
'For set number of messages customer must implement "%s", but "%s" given.',
107-
MiddlewareAwareInterface::class,
107+
EventableConsumerInterface::class,
108108
\get_class($this->consumer)
109109
));
110110
}
111111

112-
$this->consumer->pushMiddleware(new StopAfterNExecutesMiddleware((int) $input->getOption('messages')));
112+
if (!$this->eventDispatcher) {
113+
throw new \RuntimeException('A message limit can\'t be applied, since the command has no access to the event dispatcher.');
114+
}
115+
116+
$listener = new StopAfterNExecutesListener($this->eventDispatcher, (int) $input->getOption('messages'));
117+
$this->consumer->getEventDispatcher()?->addListener(ProcessedMessageEvent::class, $listener->onProcessedMessage(...));
113118
}
114119

115120
if ($input->getOption('read-timeout')) {

src/Command/RunRoundRobinConsumerCommand.php

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313

1414
namespace FiveLab\Component\Amqp\Command;
1515

16-
use FiveLab\Component\Amqp\Consumer\ConsumerInterface;
17-
use FiveLab\Component\Amqp\Consumer\Event;
1816
use FiveLab\Component\Amqp\Consumer\RoundRobin\RoundRobinConsumer;
1917
use Symfony\Component\Console\Attribute\AsCommand;
2018
use Symfony\Component\Console\Command\Command;
@@ -41,10 +39,7 @@ public function getSubscribedSignals(): array
4139
return [];
4240
}
4341

44-
return [
45-
\SIGINT,
46-
\SIGTERM,
47-
];
42+
return [\SIGINT, \SIGTERM];
4843
}
4944

5045
public function handleSignal(int $signal, false|int $previousExitCode = 0): int|false
@@ -54,21 +49,6 @@ public function handleSignal(int $signal, false|int $previousExitCode = 0): int|
5449
return false;
5550
}
5651

57-
protected function initialize(InputInterface $input, OutputInterface $output): void
58-
{
59-
$this->consumer->addEventHandler(static function (Event $event, mixed ...$args) use ($output) {
60-
if (Event::ChangeConsumer === $event) {
61-
/** @var ConsumerInterface $consumer */
62-
$consumer = $args[0];
63-
64-
$output->writeln(\sprintf(
65-
'Select next consumer with queue <comment>%s</comment>.',
66-
$consumer->getQueue()->getName()
67-
), OutputInterface::VERBOSITY_VERBOSE);
68-
}
69-
});
70-
}
71-
7252
protected function execute(InputInterface $input, OutputInterface $output): int
7353
{
7454
$this->consumer->run();

src/Connection/SpoolConnection.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ public function connect(): void
7878
}
7979
}
8080

81-
// @phpstan-ignore-next-line
8281
throw $firstException;
8382
}
8483

0 commit comments

Comments
 (0)