diff --git a/app/Events/Concerns/ShouldBeUniqueEvent.php b/app/Events/Concerns/ShouldBeUniqueEvent.php index 3930087f76..21299353d9 100644 --- a/app/Events/Concerns/ShouldBeUniqueEvent.php +++ b/app/Events/Concerns/ShouldBeUniqueEvent.php @@ -10,29 +10,35 @@ trait ShouldBeUniqueEvent { public const UNIQUE_KEY = 'webhooks:event'; + protected array $ids = [null]; + + protected ?array $broadcastChannels = null; + final public function broadcastWhen(): bool { - $lock = Cache::lock( - $this->uniqueKey(), - $this->uniqueTimeout() - ); + $channels = []; + + foreach ($this->ids as $id) { + $channelName = $this->channelName($id); + $lock = Cache::lock($this->uniqueKeyForChannel($channelName), $this->uniqueTimeout()); - if ($lock->get() === false) { - return false; + /** @var bool $hasAcquiredLock */ + $hasAcquiredLock = $lock->get(); + + if ($hasAcquiredLock) { + $channels[] = $channelName; + } } - return true; + $this->broadcastChannels = $channels; + + return count($channels) > 0; } abstract protected function uniqueTimeout(): int; - // @phpstan-ignore-next-line - protected function uniqueKey(): string + protected function uniqueKeyForChannel(string $channelName): string { - return sprintf( - '%s:%s', - static::UNIQUE_KEY, - $this->channelName() - ); + return sprintf('%s:%s', static::UNIQUE_KEY, $channelName); } } diff --git a/app/Events/Statistics/StatisticsUpdate.php b/app/Events/Statistics/StatisticsUpdate.php index 13d004f298..f0e68a84bb 100644 --- a/app/Events/Statistics/StatisticsUpdate.php +++ b/app/Events/Statistics/StatisticsUpdate.php @@ -15,12 +15,12 @@ final protected function uniqueTimeout(): int return config('arkscan.webhooks.statistics-update.ttl', 8); } - final protected function uniqueKey(): string + final protected function uniqueKeyForChannel(string $channelName): string { return sprintf( '%s:%s:%s', static::UNIQUE_KEY, - $this->channelName(), + $channelName, $this::class, ); } diff --git a/app/Events/WebsocketEvent.php b/app/Events/WebsocketEvent.php index 278d617bea..0b7d8a1f8b 100644 --- a/app/Events/WebsocketEvent.php +++ b/app/Events/WebsocketEvent.php @@ -5,7 +5,7 @@ namespace App\Events; use App\Events\Concerns\ShouldBeUniqueEvent; -use Illuminate\Broadcasting\Channel; +use Illuminate\Bus\Queueable; use Illuminate\Contracts\Broadcasting\ShouldBroadcast; use Illuminate\Foundation\Events\Dispatchable; @@ -13,31 +13,38 @@ abstract class WebsocketEvent implements ShouldBroadcast { use Dispatchable; use ShouldBeUniqueEvent; + use Queueable; public const CHANNEL = 'channel'; - public function __construct(protected ?string $id = null) + public function __construct(string|null ...$ids) { - // + $this->ids = count($ids) === 0 ? [null] : array_values($ids); + $this->onQueue('reverb'); } - final public function broadcastOn(): Channel + /** @return array */ + final public function broadcastOn(): array { - return new Channel($this->channelName()); + // broadcastWhen() populates this after acquiring per-channel locks. + // When it hasn't been called (e.g. Event::fake() in tests), fall back + // to all channels so assertions against the event object still work. + return $this->broadcastChannels + ?? array_map(fn ($id) => $this->channelName($id), $this->ids); } final public function getId(): ?string { - return $this->id; + return $this->ids[0] ?? null; } - final protected function channelName(): string + final protected function channelName(?string $id = null): string { - if ($this->id !== null) { + if ($id !== null) { return sprintf( '%s.%s', static::CHANNEL, - $this->id + $id ); } diff --git a/app/Http/Controllers/Concerns/HandlesBlockWebhooks.php b/app/Http/Controllers/Concerns/HandlesBlockWebhooks.php index 969ba33e32..a56b596171 100644 --- a/app/Http/Controllers/Concerns/HandlesBlockWebhooks.php +++ b/app/Http/Controllers/Concerns/HandlesBlockWebhooks.php @@ -11,14 +11,9 @@ trait HandlesBlockWebhooks { private function handleBlockApplied(): void { - NewBlock::dispatch(); + NewBlock::dispatch(null, request()->input('data.proposer')); // We'll run the job instead of duplicating the logic as this is the only purpose for the job. CacheBlocks::dispatch(); } - - private function handleGeneratorBlockApplied(): void - { - NewBlock::dispatch(request()->input('data.proposer')); - } } diff --git a/app/Http/Controllers/Concerns/HandlesTransactionWebhooks.php b/app/Http/Controllers/Concerns/HandlesTransactionWebhooks.php index b1799383ce..248a356654 100644 --- a/app/Http/Controllers/Concerns/HandlesTransactionWebhooks.php +++ b/app/Http/Controllers/Concerns/HandlesTransactionWebhooks.php @@ -12,18 +12,13 @@ trait HandlesTransactionWebhooks { private function handleTransactionApplied(): void { - NewTransaction::dispatch(); + NewTransaction::dispatch( + null, + request()->input('data.senderPublicKey'), + request()->input('data.to'), + ); + CheckLatestWallet::dispatch(); CheckLargestTransaction::dispatch(); } - - private function handleSenderTransactionApplied(): void - { - NewTransaction::dispatch(request()->input('data.senderPublicKey')); - } - - private function handleRecipientTransactionApplied(): void - { - NewTransaction::dispatch(request()->input('data.to')); - } } diff --git a/app/Http/Controllers/WebhooksController.php b/app/Http/Controllers/WebhooksController.php index c7dc935da5..51bb5dfc05 100644 --- a/app/Http/Controllers/WebhooksController.php +++ b/app/Http/Controllers/WebhooksController.php @@ -24,15 +24,12 @@ public function __invoke(): void if ($event === WebhookEvents::BlockApplied->value) { $this->handleBlockApplied(); - $this->handleGeneratorBlockApplied(); return; } if ($event === WebhookEvents::TransactionApplied->value) { $this->handleTransactionApplied(); - $this->handleSenderTransactionApplied(); - $this->handleRecipientTransactionApplied(); return; } diff --git a/config/horizon.php b/config/horizon.php index 7e36d71c08..f1f7761516 100644 --- a/config/horizon.php +++ b/config/horizon.php @@ -55,6 +55,16 @@ 'balanceCooldown' => 1, 'tries' => 1, ], + 'reverb' => [ + 'connection' => 'redis', + 'queue' => ['reverb'], + 'balance' => 'auto', + 'minProcesses' => 1, + 'maxProcesses' => 10, + 'balanceMaxShift' => 5, + 'balanceCooldown' => 1, + 'tries' => 1, + ], ]; return [ diff --git a/tests/Feature/Events/NewBlockTest.php b/tests/Feature/Events/NewBlockTest.php index f004d92211..1bde82fce1 100644 --- a/tests/Feature/Events/NewBlockTest.php +++ b/tests/Feature/Events/NewBlockTest.php @@ -11,7 +11,7 @@ NewBlock::dispatch(); Event::assertDispatched(NewBlock::class, function ($event) { - return $event->broadcastOn()->name === 'blocks'; + return in_array('blocks', $event->broadcastOn(), true); }); }); @@ -21,6 +21,6 @@ NewBlock::dispatch('channel-id'); Event::assertDispatched(NewBlock::class, function ($event) { - return $event->broadcastOn()->name === 'blocks.channel-id'; + return in_array('blocks.channel-id', $event->broadcastOn(), true); }); }); diff --git a/tests/Feature/Events/NewTransactionTest.php b/tests/Feature/Events/NewTransactionTest.php index 1f042eb886..96ba237dbd 100644 --- a/tests/Feature/Events/NewTransactionTest.php +++ b/tests/Feature/Events/NewTransactionTest.php @@ -11,7 +11,7 @@ NewTransaction::dispatch(); Event::assertDispatched(NewTransaction::class, function ($event) { - return $event->broadcastOn()->name === 'transactions'; + return in_array('transactions', $event->broadcastOn(), true); }); }); @@ -21,6 +21,6 @@ NewTransaction::dispatch('channel-id'); Event::assertDispatched(NewTransaction::class, function ($event) { - return $event->broadcastOn()->name === 'transactions.channel-id'; + return in_array('transactions.channel-id', $event->broadcastOn(), true); }); }); diff --git a/tests/Feature/Http/Controllers/WebhooksControllerTest.php b/tests/Feature/Http/Controllers/WebhooksControllerTest.php index 33dce443fb..3c813677cd 100644 --- a/tests/Feature/Http/Controllers/WebhooksControllerTest.php +++ b/tests/Feature/Http/Controllers/WebhooksControllerTest.php @@ -2,10 +2,6 @@ declare(strict_types=1); -use App\Events\NewBlock; -use App\Events\NewTransaction; -use App\Events\Statistics\TransactionDetails; -use App\Events\Statistics\UniqueAddresses; use App\Jobs\CacheBlocks; use App\Models\Block; use App\Models\Transaction; @@ -17,12 +13,11 @@ use Carbon\Carbon; use Illuminate\Broadcasting\BroadcastEvent; use Illuminate\Support\Facades\Config; -use Illuminate\Support\Facades\Event; use Illuminate\Support\Facades\Queue; use Illuminate\Support\Facades\URL; it('should not dispatch any event if insecure url', function () { - Event::fake(); + Queue::fake(); $event = [ 'event' => 'block.applied', @@ -35,12 +30,11 @@ ->post(route('webhooks'), $event) ->assertUnauthorized(); - Event::assertDispatchedTimes(NewBlock::class, 0); - Event::assertDispatchedTimes(NewTransaction::class, 0); + Queue::assertPushed(BroadcastEvent::class, 0); }); it('should not dispatch a random event on webhook', function () { - Event::fake(); + Queue::fake(); $secureUrl = URL::signedRoute('webhooks'); @@ -48,7 +42,7 @@ ->post($secureUrl, ['event' => 'random.event']) ->assertOk(); - Event::assertDispatchedTimes(NewBlock::class, 0); + Queue::assertPushed(BroadcastEvent::class, 0); }); describe('block', function () { @@ -62,7 +56,7 @@ }); it('should dispatch an event on webhook', function () { - Event::fake(); + Queue::fake(); $secureUrl = URL::signedRoute('webhooks'); @@ -70,14 +64,18 @@ ->post($secureUrl, $this->block) ->assertOk(); - Event::assertDispatchedTimes(NewBlock::class, 2); + Queue::assertPushed(BroadcastEvent::class, 1); - Event::assertDispatched(NewBlock::class, function ($event) { - return $event->broadcastOn()->name === 'blocks'; - }); + Queue::assertPushed(BroadcastEvent::class, function ($event) { + if ($event->event->queue !== 'reverb') { + return false; + } + + if (! in_array('blocks', $event->event->broadcastOn(), true)) { + return false; + } - Event::assertDispatched(NewBlock::class, function ($event) { - return $event->broadcastOn()->name === 'blocks.public-key'; + return in_array('blocks.public-key', $event->event->broadcastOn(), true); }); }); @@ -102,19 +100,18 @@ $this->post($secureUrl, $this->block) ->assertOk(); - Queue::assertPushed(BroadcastEvent::class, 4); + Queue::assertPushed(BroadcastEvent::class, 2); Queue::assertPushed(BroadcastEvent::class, function ($event) { - return $event->event->broadcastOn()->name === 'blocks'; - }); + if (! in_array('blocks', $event->event->broadcastOn(), true)) { + return false; + } - Queue::assertPushed(BroadcastEvent::class, function ($event) { - return $event->event->broadcastOn()->name === 'blocks.public-key'; + return in_array('blocks.public-key', $event->event->broadcastOn(), true); }); }); it('should dispatch statistics event if there is a change to block statistics', function () { - Event::fake(); Queue::fake(); $secureUrl = URL::signedRoute('webhooks'); @@ -123,15 +120,15 @@ ->post($secureUrl, $this->block) ->assertOk(); - Event::assertDispatchedTimes(NewBlock::class, 2); + Queue::assertPushed(BroadcastEvent::class, 1); Queue::assertPushed(CacheBlocks::class, 1); - Event::assertDispatched(NewBlock::class, function ($event) { - return $event->broadcastOn()->name === 'blocks'; - }); + Queue::assertPushed(BroadcastEvent::class, function ($event) { + if (! in_array('blocks', $event->event->broadcastOn(), true)) { + return false; + } - Event::assertDispatched(NewBlock::class, function ($event) { - return $event->broadcastOn()->name === 'blocks.public-key'; + return in_array('blocks.public-key', $event->event->broadcastOn(), true); }); $block = Block::factory()->create(); @@ -142,13 +139,21 @@ 'gas_price' => 5, ]); + Queue::fake(); + $secureUrl = URL::signedRoute('webhooks'); $this - ->post($secureUrl, $this->block) + ->post($secureUrl, [ + 'event' => 'block.applied', + 'data' => [ + 'proposer' => $block->proposer, + ], + ]) ->assertOk(); - Queue::assertPushed(CacheBlocks::class, 2); + Queue::assertPushed(BroadcastEvent::class, 1); + Queue::assertPushed(CacheBlocks::class, 1); }); }); @@ -164,7 +169,7 @@ }); it('should dispatch an event on webhook', function () { - Event::fake(); + Queue::fake(); $secureUrl = URL::signedRoute('webhooks'); @@ -172,18 +177,22 @@ ->post($secureUrl, $this->transaction) ->assertOk(); - Event::assertDispatchedTimes(NewTransaction::class, 3); + Queue::assertPushed(BroadcastEvent::class, 1); - Event::assertDispatched(NewTransaction::class, function ($event) { - return $event->broadcastOn()->name === 'transactions'; - }); + Queue::assertPushed(BroadcastEvent::class, function ($event) { + if ($event->event->queue !== 'reverb') { + return false; + } - Event::assertDispatched(NewTransaction::class, function ($event) { - return $event->broadcastOn()->name === 'transactions.public-key'; - }); + if (! in_array('transactions', $event->event->broadcastOn(), true)) { + return false; + } - Event::assertDispatched(NewTransaction::class, function ($event) { - return $event->broadcastOn()->name === 'transactions.address'; + if (! in_array('transactions.public-key', $event->event->broadcastOn(), true)) { + return false; + } + + return in_array('transactions.address', $event->event->broadcastOn(), true); }); }); @@ -208,23 +217,23 @@ $this->post($secureUrl, $this->transaction) ->assertOk(); - Queue::assertPushed(BroadcastEvent::class, 6); + Queue::assertPushed(BroadcastEvent::class, 2); Queue::assertPushed(BroadcastEvent::class, function ($event) { - return $event->event->broadcastOn()->name === 'transactions'; - }); + if (! in_array('transactions', $event->event->broadcastOn(), true)) { + return false; + } - Queue::assertPushed(BroadcastEvent::class, function ($event) { - return $event->event->broadcastOn()->name === 'transactions.public-key'; - }); + if (! in_array('transactions.public-key', $event->event->broadcastOn(), true)) { + return false; + } - Queue::assertPushed(BroadcastEvent::class, function ($event) { - return $event->event->broadcastOn()->name === 'transactions.address'; + return in_array('transactions.address', $event->event->broadcastOn(), true); }); }); it('should dispatch statistics event if there is a new wallet', function () { - Event::fake(); + Queue::fake(); $this->travelTo('2024-04-19 00:15:44'); @@ -253,21 +262,28 @@ ->post($secureUrl, $this->transaction) ->assertOk(); - Event::assertDispatchedTimes(NewTransaction::class, 3); - Event::assertDispatchedTimes(UniqueAddresses::class, 0); + Queue::assertPushed(BroadcastEvent::class, 1); - Event::assertDispatched(NewTransaction::class, function ($event) { - return $event->broadcastOn()->name === 'transactions'; - }); + Queue::assertPushed(BroadcastEvent::class, function ($event) { + if ($event->event->queue !== 'reverb') { + return false; + } - Event::assertDispatched(NewTransaction::class, function ($event) { - return $event->broadcastOn()->name === 'transactions.public-key'; - }); + if (! in_array('transactions', $event->event->broadcastOn(), true)) { + return false; + } - Event::assertDispatched(NewTransaction::class, function ($event) { - return $event->broadcastOn()->name === 'transactions.address'; + if (! in_array('transactions.public-key', $event->event->broadcastOn(), true)) { + return false; + } + + return in_array('transactions.address', $event->event->broadcastOn(), true); }); + $this->travelTo('2024-04-20 00:15:44'); + + Queue::fake(); + $walletB = Wallet::factory()->create(); $timestamp = Carbon::parse('2024-04-19 01:16:55')->getTimestampMs(); $transaction = Transaction::factory()->create([ @@ -278,14 +294,36 @@ $walletB->fill(['updated_at' => $timestamp])->save(); $this - ->post($secureUrl, $this->transaction) + ->post($secureUrl, [ + 'event' => 'transaction.applied', + 'data' => [ + 'to' => $transaction->to, + 'senderPublicKey' => $transaction->sender_public_key, + ], + ]) ->assertOk(); - Event::assertDispatchedTimes(UniqueAddresses::class, 1); + Queue::assertPushed(BroadcastEvent::class, 1); + + Queue::assertPushed(BroadcastEvent::class, function ($event) use ($transaction) { + if ($event->event->queue !== 'reverb') { + return false; + } + + if (! in_array('transactions', $event->event->broadcastOn(), true)) { + return false; + } + + if (! in_array('transactions.'.$transaction->to, $event->event->broadcastOn(), true)) { + return false; + } + + return in_array('transactions.'.$transaction->sender_public_key, $event->event->broadcastOn(), true); + }); }); it('should dispatch statistics event if there is a new largest transaction', function () { - Event::fake(); + Queue::fake(); $cache = new TransactionCache(); @@ -307,21 +345,28 @@ ->post($secureUrl, $this->transaction) ->assertOk(); - Event::assertDispatchedTimes(NewTransaction::class, 3); - Event::assertDispatchedTimes(TransactionDetails::class, 0); + Queue::assertPushed(BroadcastEvent::class, 1); - Event::assertDispatched(NewTransaction::class, function ($event) { - return $event->broadcastOn()->name === 'transactions'; - }); + Queue::assertPushed(BroadcastEvent::class, function ($event) { + if ($event->event->queue !== 'reverb') { + return false; + } - Event::assertDispatched(NewTransaction::class, function ($event) { - return $event->broadcastOn()->name === 'transactions.public-key'; - }); + if (! in_array('transactions', $event->event->broadcastOn(), true)) { + return false; + } + + if (! in_array('transactions.public-key', $event->event->broadcastOn(), true)) { + return false; + } - Event::assertDispatched(NewTransaction::class, function ($event) { - return $event->broadcastOn()->name === 'transactions.address'; + return in_array('transactions.address', $event->event->broadcastOn(), true); }); + $this->travelTo('2024-04-20 00:15:44'); + + Queue::fake(); + $transaction = Transaction::factory()->transfer()->create([ 'value' => 20 * 1e8, 'gas_price' => 6, @@ -329,9 +374,31 @@ ]); $this - ->post($secureUrl, $this->transaction) + ->post($secureUrl, [ + 'event' => 'transaction.applied', + 'data' => [ + 'to' => $transaction->to, + 'senderPublicKey' => $transaction->sender_public_key, + ], + ]) ->assertOk(); - Event::assertDispatchedTimes(TransactionDetails::class, 1); + Queue::assertPushed(BroadcastEvent::class, 1); + + Queue::assertPushed(BroadcastEvent::class, function ($event) use ($transaction) { + if ($event->event->queue !== 'reverb') { + return false; + } + + if (! in_array('transactions', $event->event->broadcastOn(), true)) { + return false; + } + + if (! in_array('transactions.'.$transaction->to, $event->event->broadcastOn(), true)) { + return false; + } + + return in_array('transactions.'.$transaction->sender_public_key, $event->event->broadcastOn(), true); + }); }); }); diff --git a/tests/Unit/Jobs/Webhooks/CheckLargestTransactionTest.php b/tests/Unit/Jobs/Webhooks/CheckLargestTransactionTest.php index 3565b00576..3714ccb078 100644 --- a/tests/Unit/Jobs/Webhooks/CheckLargestTransactionTest.php +++ b/tests/Unit/Jobs/Webhooks/CheckLargestTransactionTest.php @@ -10,6 +10,14 @@ use Carbon\Carbon; use Illuminate\Support\Facades\Event; +it('should not dispatch transaction details event if no transactions exist', function () { + Event::fake(); + + (new CheckLargestTransaction())->handle(); + + Event::assertDispatchedTimes(TransactionDetails::class, 0); +}); + it('should not dispatch transaction details event if no change', function () { Event::fake();