diff --git a/src/cache/src/Limiters/ConcurrencyLimiter.php b/src/cache/src/Limiters/ConcurrencyLimiter.php new file mode 100644 index 000000000..e87b1fd76 --- /dev/null +++ b/src/cache/src/Limiters/ConcurrencyLimiter.php @@ -0,0 +1,99 @@ + + */ + protected array $slots; + + /** + * Create a new concurrency limiter instance. + * + * @param LockProvider $store the cache store instance + * @param string $name the name of the limiter + * @param int $maxLocks the allowed number of concurrent locks + * @param int $releaseAfter the number of seconds a slot should be maintained + */ + public function __construct( + protected LockProvider $store, + protected string $name, + protected int $maxLocks, + protected int $releaseAfter, + ) { + $this->slots = $maxLocks < 1 + ? [] + : array_map(fn (int $i): string => $name . $i, range(1, $maxLocks)); + } + + /** + * Attempt to acquire the lock for the given number of seconds. + * + * @throws LimiterTimeoutException + * @throws Throwable + */ + public function block(int $timeout, ?callable $callback = null, int $sleep = 250): mixed + { + $starting = time(); + + $id = Str::random(20); + + while (! $slot = $this->acquire($id)) { + if (time() - $timeout >= $starting) { + throw new LimiterTimeoutException; + } + + Sleep::usleep($sleep * 1000); + } + + if (is_callable($callback)) { + try { + return tap($callback(), function () use ($slot): void { + $this->release($slot); + }); + } catch (Throwable $exception) { + $this->release($slot); + + throw $exception; + } + } + + return true; + } + + /** + * Attempt to acquire a slot lock. + */ + protected function acquire(string $id): bool|Lock + { + foreach ($this->slots as $slotName) { + $lock = $this->store->lock($slotName, $this->releaseAfter, $id); + + if ($lock->acquire()) { + return $lock; + } + } + + return false; + } + + /** + * Release the slot lock. + */ + protected function release(Lock $lock): void + { + $lock->release(); + } +} diff --git a/src/cache/src/Limiters/ConcurrencyLimiterBuilder.php b/src/cache/src/Limiters/ConcurrencyLimiterBuilder.php new file mode 100644 index 000000000..4d59789a3 --- /dev/null +++ b/src/cache/src/Limiters/ConcurrencyLimiterBuilder.php @@ -0,0 +1,125 @@ +maxLocks = $maxLocks; + + return $this; + } + + /** + * Set the number of seconds until the lock will be released. + */ + public function releaseAfter(DateInterval|DateTimeInterface|int $releaseAfter): static + { + $this->releaseAfter = $this->secondsUntil($releaseAfter); + + return $this; + } + + /** + * Set the number of seconds to block until a lock is available. + */ + public function block(int $timeout): static + { + $this->timeout = $timeout; + + return $this; + } + + /** + * The number of milliseconds to wait between lock acquisition attempts. + */ + public function sleep(int $sleep): static + { + $this->sleep = $sleep; + + return $this; + } + + /** + * Execute the given callback if a lock is obtained, otherwise call the failure callback. + * + * @throws LimiterTimeoutException + */ + public function then(callable $callback, ?callable $failure = null): mixed + { + try { + return $this->createLimiter()->block($this->timeout, $callback, $this->sleep); + } catch (LimiterTimeoutException $e) { + if ($failure !== null) { + return $failure($e); + } + + throw $e; + } + } + + /** + * Create the concurrency limiter instance. + */ + protected function createLimiter(): ConcurrencyLimiter + { + // Type is guaranteed by the LockProvider check in Repository::funnel(), + // but Repository::getStore() returns Store so phpstan needs help narrowing. + /** @var LockProvider&Store $store */ + $store = $this->connection->getStore(); + + if ($store instanceof RedisStore) { + return new RedisConcurrencyLimiter($store, $this->name, $this->maxLocks, $this->releaseAfter); + } + + return new ConcurrencyLimiter($store, $this->name, $this->maxLocks, $this->releaseAfter); + } +} diff --git a/src/cache/src/Limiters/LimiterTimeoutException.php b/src/cache/src/Limiters/LimiterTimeoutException.php new file mode 100644 index 000000000..470b8af80 --- /dev/null +++ b/src/cache/src/Limiters/LimiterTimeoutException.php @@ -0,0 +1,11 @@ + + */ + protected array $prefixedSlots; + + /** + * Create a new Redis-optimized concurrency limiter instance. + */ + public function __construct( + RedisStore $store, + string $name, + int $maxLocks, + int $releaseAfter, + ) { + parent::__construct($store, $name, $maxLocks, $releaseAfter); + + $prefix = $store->getPrefix(); + $this->prefixedSlots = array_map( + fn (string $slot): string => $prefix . $slot, + $this->slots, + ); + } + + /** + * Atomically claim a free slot via a single Lua script. + * + * Two correctness invariants: + * + * 1. The Lua script writes the prefixed slot key to Redis and returns the + * UNPREFIXED slot name (e.g. "my-funnel1") so RedisStore::restoreLock() + * prepends the prefix exactly once when constructing the Lock object. + * + * 2. The owner ID must be pre-packed via $connection->pack() before being + * passed into Lua. phpredis does NOT auto-serialize eval() ARGV (regular + * commands like set() do). RedisLock::release() later packs $this->owner + * before its owner-check Lua, so the value Redis stores at acquire time + * must already be in packed form. If we passed raw $id here, Redis would + * store the raw string, but release would compare against a packed value + * — silent mismatch, slot leaks until TTL. We pass the RAW $id to + * restoreLock() so the returned RedisLock's owner field is raw; release + * will pack it consistently with what we stored. + * + * Using withConnection() also keeps both pack() and eval() on the same + * checked-out pool connection, avoiding two pool roundtrips per attempt. + */ + protected function acquire(string $id): bool|Lock + { + // Without slots there's nothing to claim. Calling eval with zero KEYS + // would error inside Lua via unpack({}) → redis.call('mget') with no args. + if ($this->prefixedSlots === []) { + return false; + } + + /** @var RedisStore $store */ + $store = $this->store; + + return $store->lockConnection()->withConnection(function (RedisConnection $connection) use ($id, $store): bool|Lock { + $packedOwner = $connection->pack([$id])[0]; + + $result = $connection->eval(...array_merge( + [LuaScripts::acquireConcurrencySlot(), count($this->prefixedSlots)], + $this->prefixedSlots, + [$this->name, $this->releaseAfter, $packedOwner], + )); + + return is_string($result) ? $store->restoreLock($result, $id) : false; + }); + } +} diff --git a/src/cache/src/LuaScripts.php b/src/cache/src/LuaScripts.php deleted file mode 100644 index 05c9f66bf..000000000 --- a/src/cache/src/LuaScripts.php +++ /dev/null @@ -1,43 +0,0 @@ -store->lock(enum_value($key), $lockFor, $owner)->block($waitFor, $callback); // @phpstan-ignore method.notFound (lock() is on LockProvider, not Store contract) } + /** + * Funnel a callback for a maximum number of simultaneous executions. + */ + public function funnel(UnitEnum|string $name): ConcurrencyLimiterBuilder + { + if (! $this->store instanceof LockProvider) { + throw new BadMethodCallException('This cache store does not support locks.'); + } + + return new ConcurrencyLimiterBuilder($this, enum_value($name)); + } + /** * Remove an item from the cache. */ diff --git a/src/redis/src/Limiters/ConcurrencyLimiter.php b/src/redis/src/Limiters/ConcurrencyLimiter.php index 439e25422..77ad98541 100644 --- a/src/redis/src/Limiters/ConcurrencyLimiter.php +++ b/src/redis/src/Limiters/ConcurrencyLimiter.php @@ -5,6 +5,7 @@ namespace Hypervel\Redis\Limiters; use Hypervel\Contracts\Redis\LimiterTimeoutException; +use Hypervel\Redis\LuaScripts; use Hypervel\Redis\RedisProxy; use Hypervel\Support\Sleep; use Hypervel\Support\Str; @@ -12,6 +13,13 @@ class ConcurrencyLimiter { + /** + * Precomputed slot names. Built once in the constructor. + * + * @var list + */ + protected array $slots; + /** * Create a new concurrency limiter instance. * @@ -26,6 +34,9 @@ public function __construct( protected int $maxLocks, protected int $releaseAfter ) { + $this->slots = $maxLocks < 1 + ? [] + : array_map(fn (int $i): string => $name . $i, range(1, $maxLocks)); } /** @@ -50,7 +61,7 @@ public function block(int $timeout, ?callable $callback = null, int $sleep = 250 if (is_callable($callback)) { try { - return tap($callback(), function () use ($slot, $id) { + return tap($callback(), function () use ($slot, $id): void { $this->release($slot, $id); }); } catch (Throwable $exception) { @@ -70,59 +81,24 @@ public function block(int $timeout, ?callable $callback = null, int $sleep = 250 */ protected function acquire(string $id): mixed { - $slots = array_map(function ($i) { - return $this->name . $i; - }, range(1, $this->maxLocks)); + // Without slots there's nothing to claim. Calling eval with zero KEYS + // would error inside Lua via unpack({}) → redis.call('mget') with no args. + if ($this->slots === []) { + return false; + } return $this->redis->eval(...array_merge( - [$this->lockScript(), count($slots)], - array_merge($slots, [$this->name, $this->releaseAfter, $id]) + [LuaScripts::acquireConcurrencySlot(), count($this->slots)], + $this->slots, + [$this->name, $this->releaseAfter, $id], )); } - /** - * Get the Lua script for acquiring a lock. - * - * KEYS - The keys that represent available slots - * ARGV[1] - The limiter name - * ARGV[2] - The number of seconds the slot should be reserved - * ARGV[3] - The unique identifier for this lock - */ - protected function lockScript(): string - { - return <<<'LUA' -for index, value in pairs(redis.call('mget', unpack(KEYS))) do - if not value then - redis.call('set', KEYS[index], ARGV[3], "EX", ARGV[2]) - return ARGV[1]..index - end -end -LUA; - } - /** * Release the lock. */ protected function release(string $key, string $id): void { - $this->redis->eval($this->releaseScript(), 1, $key, $id); - } - - /** - * Get the Lua script to atomically release a lock. - * - * KEYS[1] - The name of the lock - * ARGV[1] - The unique identifier for this lock - */ - protected function releaseScript(): string - { - return <<<'LUA' -if redis.call('get', KEYS[1]) == ARGV[1] -then - return redis.call('del', KEYS[1]) -else - return 0 -end -LUA; + $this->redis->eval(LuaScripts::releaseLock(), 1, $key, $id); } } diff --git a/src/redis/src/LuaScripts.php b/src/redis/src/LuaScripts.php new file mode 100644 index 000000000..3830faf26 --- /dev/null +++ b/src/redis/src/LuaScripts.php @@ -0,0 +1,78 @@ + 0 then + redis.call('set', KEYS[index], ARGV[3], "EX", ARGV[2]) + else + redis.call('set', KEYS[index], ARGV[3]) + end + return ARGV[1]..index + end +end +LUA; + } +} diff --git a/src/support/src/Facades/Cache.php b/src/support/src/Facades/Cache.php index 33f4907c9..fc4529488 100644 --- a/src/support/src/Facades/Cache.php +++ b/src/support/src/Facades/Cache.php @@ -60,6 +60,7 @@ * @method static mixed flexible(\UnitEnum|string $key, array $ttl, callable $callback, null|array $lock = null, bool $alwaysDefer = false) * @method static mixed flexibleNullable(\UnitEnum|string $key, array $ttl, callable $callback, null|array $lock = null, bool $alwaysDefer = false) * @method static mixed withoutOverlapping(\UnitEnum|string $key, callable $callback, int $lockFor = 0, int $waitFor = 10, string|null $owner = null) + * @method static \Hypervel\Cache\Limiters\ConcurrencyLimiterBuilder funnel(\UnitEnum|string $name) * @method static bool flushLocks() * @method static bool supportsTags() * @method static bool supportsFlushingLocks() diff --git a/tests/Cache/ConcurrencyLimiterTest.php b/tests/Cache/ConcurrencyLimiterTest.php new file mode 100644 index 000000000..068954106 --- /dev/null +++ b/tests/Cache/ConcurrencyLimiterTest.php @@ -0,0 +1,382 @@ +repository = new Repository(new ArrayStore); + } + + public function testItLocksTasksWhenNoSlotAvailable() + { + $store = []; + + foreach (range(1, 2) as $i) { + (new ConcurrencyLimiterMockThatDoesntRelease($this->repository->getStore(), 'key', 2, 5))->block(2, function () use (&$store, $i) { + $store[] = $i; + }); + } + + try { + (new ConcurrencyLimiterMockThatDoesntRelease($this->repository->getStore(), 'key', 2, 5))->block(0, function () use (&$store) { + $store[] = 3; + }); + } catch (Throwable $e) { + $this->assertInstanceOf(LimiterTimeoutException::class, $e); + } + + (new ConcurrencyLimiterMockThatDoesntRelease($this->repository->getStore(), 'other_key', 2, 5))->block(2, function () use (&$store) { + $store[] = 4; + }); + + $this->assertEquals([1, 2, 4], $store); + } + + public function testItReleasesLockAfterTaskFinishes() + { + $store = []; + + foreach (range(1, 4) as $i) { + (new ConcurrencyLimiter($this->repository->getStore(), 'key', 2, 5))->block(2, function () use (&$store, $i) { + $store[] = $i; + }); + } + + $this->assertEquals([1, 2, 3, 4], $store); + } + + public function testItReleasesLockIfTaskTookTooLong() + { + $store = []; + + $lock = new ConcurrencyLimiterMockThatDoesntRelease($this->repository->getStore(), 'key', 1, 1); + + $lock->block(2, function () use (&$store) { + $store[] = 1; + }); + + try { + $lock->block(0, function () use (&$store) { + $store[] = 2; + }); + } catch (Throwable $e) { + $this->assertInstanceOf(LimiterTimeoutException::class, $e); + } + + usleep(1_200_000); + + $lock->block(0, function () use (&$store) { + $store[] = 3; + }); + + $this->assertEquals([1, 3], $store); + } + + public function testItFailsImmediatelyOrRetriesForAWhileBasedOnAGivenTimeout() + { + $store = []; + + $lock = new ConcurrencyLimiterMockThatDoesntRelease($this->repository->getStore(), 'key', 1, 2); + + $lock->block(2, function () use (&$store) { + $store[] = 1; + }); + + try { + $lock->block(0, function () use (&$store) { + $store[] = 2; + }); + } catch (Throwable $e) { + $this->assertInstanceOf(LimiterTimeoutException::class, $e); + } + + $lock->block(3, function () use (&$store) { + $store[] = 3; + }); + + $this->assertEquals([1, 3], $store); + } + + public function testItFailsAfterRetryTimeout() + { + $store = []; + + $lock = new ConcurrencyLimiterMockThatDoesntRelease($this->repository->getStore(), 'key', 1, 10); + + $lock->block(2, function () use (&$store) { + $store[] = 1; + }); + + try { + $lock->block(2, function () use (&$store) { + $store[] = 2; + }); + } catch (Throwable $e) { + $this->assertInstanceOf(LimiterTimeoutException::class, $e); + } + + $this->assertEquals([1], $store); + } + + public function testItReleasesIfErrorIsThrown() + { + $store = []; + + $lock = new ConcurrencyLimiter($this->repository->getStore(), 'key', 1, 5); + + try { + $lock->block(1, function () { + throw new Error; + }); + } catch (Error) { + } + + $lock = new ConcurrencyLimiter($this->repository->getStore(), 'key', 1, 5); + $lock->block(1, function () use (&$store) { + $store[] = 1; + }); + + $this->assertEquals([1], $store); + } + + public function testFunnelMethodOnRepository() + { + $store = []; + + $result = $this->repository->funnel('test-funnel') + ->limit(2) + ->releaseAfter(5) + ->block(2) + ->then(function () use (&$store) { + $store[] = 1; + + return 'ok'; + }); + + $this->assertEquals([1], $store); + $this->assertSame('ok', $result); + } + + public function testFunnelMethodAcceptsBackedEnum() + { + $store = []; + + $result = $this->repository->funnel(ConcurrencyLimiterBackedEnum::TestFunnel) + ->limit(2) + ->releaseAfter(5) + ->block(2) + ->then(function () use (&$store) { + $store[] = 1; + + return 'ok'; + }); + + $this->assertEquals([1], $store); + $this->assertSame('ok', $result); + } + + public function testFunnelMethodAcceptsUnitEnum() + { + $store = []; + + $result = $this->repository->funnel(ConcurrencyLimiterUnitEnum::TestFunnel) + ->limit(2) + ->releaseAfter(5) + ->block(2) + ->then(function () use (&$store) { + $store[] = 1; + + return 'ok'; + }); + + $this->assertEquals([1], $store); + $this->assertSame('ok', $result); + } + + public function testFunnelBackedEnumSharesKeyWithStringEquivalent() + { + // Fill all slots using the backed enum's string value + foreach (range(1, 2) as $i) { + (new ConcurrencyLimiterMockThatDoesntRelease($this->repository->getStore(), 'test-funnel', 2, 5))->block(2, function () { + }); + } + + // Try to acquire via the BackedEnum — should conflict with the string key + $result = $this->repository->funnel(ConcurrencyLimiterBackedEnum::TestFunnel) + ->limit(2) + ->releaseAfter(5) + ->block(0) + ->then( + function () { + return 'success'; + }, + function () { + return 'failed'; + } + ); + + $this->assertSame('failed', $result); + } + + public function testFunnelThrowsExceptionWhenStoreDoesNotSupportLocks() + { + $store = $this->createStub(Store::class); + $repository = new Repository($store); + + $this->assertNotInstanceOf(LockProvider::class, $store); + + $this->expectException(BadMethodCallException::class); + $this->expectExceptionMessage('This cache store does not support locks.'); + + $repository->funnel('test'); + } + + public function testFunnelWithFailureCallback() + { + $store = []; + + // Fill all slots without releasing + foreach (range(1, 2) as $i) { + (new ConcurrencyLimiterMockThatDoesntRelease($this->repository->getStore(), 'funnel-key', 2, 5))->block(2, function () use (&$store, $i) { + $store[] = $i; + }); + } + + // Try to acquire when all slots are full + $result = $this->repository->funnel('funnel-key') + ->limit(2) + ->releaseAfter(5) + ->block(0) + ->then( + function () use (&$store) { + $store[] = 'success'; + }, + function ($e) use (&$store) { + $this->assertInstanceOf(LimiterTimeoutException::class, $e); + $store[] = 'failed'; + + return 'failure-result'; + } + ); + + $this->assertEquals([1, 2, 'failed'], $store); + $this->assertSame('failure-result', $result); + } + + public function testFunnelWithZeroLimitDoesNotRunCallback() + { + $called = false; + + $result = $this->repository->funnel('zero') + ->limit(0) + ->releaseAfter(5) + ->block(0) + ->then( + function () use (&$called) { + $called = true; + + return 'should-not-run'; + }, + fn () => 'failed', + ); + + $this->assertFalse($called); + $this->assertSame('failed', $result); + } + + public function testFunnelWithNegativeLimitDoesNotRunCallback() + { + $called = false; + + $result = $this->repository->funnel('neg') + ->limit(-1) + ->releaseAfter(5) + ->block(0) + ->then( + function () use (&$called) { + $called = true; + + return 'should-not-run'; + }, + fn () => 'failed', + ); + + $this->assertFalse($called); + $this->assertSame('failed', $result); + } + + public function testReleaseAfterAcceptsDateInterval() + { + $store = []; + + $result = $this->repository->funnel('test') + ->limit(2) + ->releaseAfter(new DateInterval('PT5S')) + ->block(2) + ->then(function () use (&$store) { + $store[] = 1; + + return 'ok'; + }); + + $this->assertEquals([1], $store); + $this->assertSame('ok', $result); + } + + public function testReleaseAfterAcceptsDateTime() + { + $store = []; + + $result = $this->repository->funnel('test') + ->limit(2) + ->releaseAfter((new DateTimeImmutable)->modify('+5 seconds')) + ->block(2) + ->then(function () use (&$store) { + $store[] = 1; + + return 'ok'; + }); + + $this->assertEquals([1], $store); + $this->assertSame('ok', $result); + } +} + +class ConcurrencyLimiterMockThatDoesntRelease extends ConcurrencyLimiter +{ + protected function release(Lock $lock): void + { + } +} + +enum ConcurrencyLimiterBackedEnum: string +{ + case TestFunnel = 'test-funnel'; +} + +enum ConcurrencyLimiterUnitEnum +{ + case TestFunnel; +} diff --git a/tests/Cache/FunnelUnsupportedStoresTest.php b/tests/Cache/FunnelUnsupportedStoresTest.php new file mode 100644 index 000000000..dfcd2772b --- /dev/null +++ b/tests/Cache/FunnelUnsupportedStoresTest.php @@ -0,0 +1,29 @@ +assertFalse(is_subclass_of(SwooleStore::class, LockProvider::class)); + } + + public function testStackStoreDoesNotImplementLockProvider() + { + $this->assertFalse(is_subclass_of(StackStore::class, LockProvider::class)); + } + + public function testSessionStoreDoesNotImplementLockProvider() + { + $this->assertFalse(is_subclass_of(SessionStore::class, LockProvider::class)); + } +} diff --git a/tests/Integration/Cache/ArrayCacheFunnelTest.php b/tests/Integration/Cache/ArrayCacheFunnelTest.php new file mode 100644 index 000000000..39b0f66fb --- /dev/null +++ b/tests/Integration/Cache/ArrayCacheFunnelTest.php @@ -0,0 +1,18 @@ +releaseFunnelLocks(); + } catch (Throwable) { + } + } + + public function testFunnelBasicHappyPath() + { + $result = $this->cache()->funnel('test') + ->limit(2) + ->releaseAfter(60) + ->block(0) + ->then(fn () => 'hello'); + + $this->assertSame('hello', $result); + } + + public function testFunnelReleasesLockAfterCallback() + { + for ($i = 0; $i < 5; ++$i) { + $result = $this->cache()->funnel('test') + ->limit(1) + ->releaseAfter(60) + ->block(0) + ->then(fn () => 'ok'); + + $this->assertSame('ok', $result); + } + } + + public function testFunnelLockReleasedOnException() + { + try { + $this->cache()->funnel('test') + ->limit(1) + ->releaseAfter(60) + ->block(0) + ->then(function () { + throw new Exception('fail'); + }); + } catch (Exception) { + } + + $result = $this->cache()->funnel('test') + ->limit(1) + ->releaseAfter(60) + ->block(0) + ->then(fn () => 'recovered'); + + $this->assertSame('recovered', $result); + } + + public function testFunnelTimeoutExceptionWithoutFailureCallback() + { + $this->cache()->lock('test1', 60)->get(); + $this->cache()->lock('test2', 60)->get(); + + $this->expectException(LimiterTimeoutException::class); + + $this->cache()->funnel('test') + ->limit(2) + ->releaseAfter(60) + ->block(0) + ->then(fn () => 'should not run'); + } + + public function testFunnelFailureCallbackReceivesException() + { + $this->cache()->lock('test1', 60)->get(); + $this->cache()->lock('test2', 60)->get(); + + $result = $this->cache()->funnel('test') + ->limit(2) + ->releaseAfter(60) + ->block(0) + ->then( + fn () => 'should not run', + function ($e) { + $this->assertInstanceOf(LimiterTimeoutException::class, $e); + + return 'failed'; + } + ); + + $this->assertSame('failed', $result); + } + + public function testFunnelIndependentKeys() + { + $this->cache()->lock('key-a1', 60)->get(); + + $result = $this->cache()->funnel('key-b') + ->limit(1) + ->releaseAfter(60) + ->block(0) + ->then(fn () => 'key-b-ok'); + + $this->assertSame('key-b-ok', $result); + } + + protected function releaseFunnelLocks(): void + { + $this->cache()->lock('test1')->forceRelease(); + $this->cache()->lock('test2')->forceRelease(); + $this->cache()->lock('key-a1')->forceRelease(); + $this->cache()->lock('key-b1')->forceRelease(); + } + + protected function tearDown(): void + { + try { + $this->releaseFunnelLocks(); + } catch (Throwable) { + } + + parent::tearDown(); + } +} diff --git a/tests/Integration/Cache/DatabaseCacheFunnelTest.php b/tests/Integration/Cache/DatabaseCacheFunnelTest.php new file mode 100644 index 000000000..80793d2a3 --- /dev/null +++ b/tests/Integration/Cache/DatabaseCacheFunnelTest.php @@ -0,0 +1,21 @@ +configureLockConnection([ + 'serializer' => Redis::SERIALIZER_NONE, + ]); + + $this->assertFunnelAcquiresAndReleases(); + } + + public function testFunnelReleasesSlotWithPhpSerialization() + { + $this->configureLockConnection([ + 'serializer' => Redis::SERIALIZER_PHP, + ]); + + $this->assertFunnelAcquiresAndReleases(); + } + + public function testFunnelReleasesSlotWithJsonSerialization() + { + $this->configureLockConnection([ + 'serializer' => Redis::SERIALIZER_JSON, + ]); + + $this->assertFunnelAcquiresAndReleases(); + } + + public function testFunnelReleasesSlotWithIgbinarySerialization() + { + if (! defined('Redis::SERIALIZER_IGBINARY')) { + $this->markTestSkipped('Redis extension is not configured to support the igbinary serializer.'); + } + + $this->configureLockConnection([ + 'serializer' => Redis::SERIALIZER_IGBINARY, + ]); + + $this->assertFunnelAcquiresAndReleases(); + } + + public function testFunnelReleasesSlotWithMsgpackSerialization() + { + if (! defined('Redis::SERIALIZER_MSGPACK')) { + $this->markTestSkipped('Redis extension is not configured to support the msgpack serializer.'); + } + + $this->configureLockConnection([ + 'serializer' => Redis::SERIALIZER_MSGPACK, + ]); + + $this->assertFunnelAcquiresAndReleases(); + } + + public function testFunnelReleasesSlotWithLzfCompression() + { + if (! defined('Redis::COMPRESSION_LZF')) { + $this->markTestSkipped('Redis extension is not configured to support the lzf compression.'); + } + + $this->configureLockConnection([ + 'serializer' => Redis::SERIALIZER_NONE, + 'compression' => Redis::COMPRESSION_LZF, + ]); + + $this->assertFunnelAcquiresAndReleases(); + } + + public function testFunnelReleasesSlotWithZstdCompression() + { + if (! defined('Redis::COMPRESSION_ZSTD')) { + $this->markTestSkipped('Redis extension is not configured to support the zstd compression.'); + } + + $this->configureLockConnection([ + 'serializer' => Redis::SERIALIZER_NONE, + 'compression' => Redis::COMPRESSION_ZSTD, + 'compression_level' => Redis::COMPRESSION_ZSTD_DEFAULT, + ]); + + $this->assertFunnelAcquiresAndReleases(); + } + + public function testFunnelReleasesSlotWithLz4Compression() + { + if (! defined('Redis::COMPRESSION_LZ4')) { + $this->markTestSkipped('Redis extension is not configured to support the lz4 compression.'); + } + + $this->configureLockConnection([ + 'serializer' => Redis::SERIALIZER_NONE, + 'compression' => Redis::COMPRESSION_LZ4, + 'compression_level' => 1, + ]); + + $this->assertFunnelAcquiresAndReleases(); + } + + public function testFunnelReleasesSlotWithSerializationAndCompression() + { + if (! defined('Redis::COMPRESSION_LZF')) { + $this->markTestSkipped('Redis extension is not configured to support the lzf compression.'); + } + + $this->configureLockConnection([ + 'serializer' => Redis::SERIALIZER_PHP, + 'compression' => Redis::COMPRESSION_LZF, + ]); + + $this->assertFunnelAcquiresAndReleases(); + } + + /** + * Configure a dedicated Redis connection for funnel testing with the given options. + * + * Creates a 'lock-test' Redis connection with the specified serializer/compression + * options, points the cache store's lock_connection to it, and purges the cache + * store so it picks up the new configuration. Identical to PhpRedisCacheLockTest's + * helper since the funnel uses the same lock_connection path. + */ + protected function configureLockConnection(array $options): void + { + $baseConfig = $this->app['config']->get('database.redis.default'); + + $this->app['config']->set('database.redis.lock-test', array_merge($baseConfig, [ + 'options' => $options, + ])); + + $this->app['config']->set('cache.stores.redis.connection', 'default'); + $this->app['config']->set('cache.stores.redis.lock_connection', 'lock-test'); + + Cache::forgetDriver('redis'); + } + + /** + * Assert that Cache::funnel() acquires a slot, runs the callback, and releases + * the slot cleanly — verified by a second consecutive call under block(0) + * succeeding immediately. + * + * If the owner-packing fix is missing (raw $id stored in Lua, packed compared + * in RedisLock::release()), the second call throws LimiterTimeoutException + * because the slot from the first call was never actually released. + */ + protected function assertFunnelAcquiresAndReleases(): void + { + $repository = Cache::store('redis'); + $repository->lock('test1')->forceRelease(); + + $first = $repository->funnel('test') + ->limit(1) + ->releaseAfter(60) + ->block(0) + ->then(fn () => 'first'); + + $this->assertSame('first', $first); + + $second = $repository->funnel('test') + ->limit(1) + ->releaseAfter(60) + ->block(0) + ->then(fn () => 'second'); + + $this->assertSame('second', $second); + + $repository->lock('test1')->forceRelease(); + } +} diff --git a/tests/Integration/Cache/RedisCacheFunnelTest.php b/tests/Integration/Cache/RedisCacheFunnelTest.php new file mode 100644 index 000000000..d962ee880 --- /dev/null +++ b/tests/Integration/Cache/RedisCacheFunnelTest.php @@ -0,0 +1,113 @@ +cache(); + + $lock1 = $cache->lock('test1', 60); + $lock1->get(); + $lock2 = $cache->lock('test2', 60); + $lock2->get(); + + $parallel = new Parallel(5); + for ($i = 0; $i < 10; ++$i) { + $parallel->add( + static fn () => $cache->funnel('test') + ->limit(2)->releaseAfter(60)->block(0) + ->then(fn () => 'success', fn () => 'failed') + ); + } + + $this->assertSame(array_fill(0, 10, 'failed'), $parallel->wait()); + + $lock1->forceRelease(); + $lock2->forceRelease(); + } + + public function testCoroutineConcurrencyLimitMatchesCount() + { + $cache = $this->cache(); + + $parallel = new Parallel(5); + for ($i = 0; $i < 5; ++$i) { + $parallel->add( + static fn () => $cache->funnel('test') + ->limit(5)->releaseAfter(60)->block(2) + ->then(fn () => 'ok') + ); + } + + $results = $parallel->wait(); + $this->assertCount(5, $results); + $this->assertNotContains(null, $results); + foreach ($results as $result) { + $this->assertSame('ok', $result); + } + } + + public function testFunnelWithZeroReleaseAfterAcquiresAndReleasesPermanentSlot() + { + // releaseAfter(0) means no TTL — RedisLock semantic for "permanent". + // The Lua acquire path must SET without EX in that case (EX 0 errors). + // Slot still releases via the explicit RedisLock release after the callback. + $cache = $this->cache(); + $cache->lock('perm1')->forceRelease(); + + $first = $cache->funnel('perm') + ->limit(1) + ->releaseAfter(0) + ->block(0) + ->then(fn () => 'first'); + $this->assertSame('first', $first); + + $second = $cache->funnel('perm') + ->limit(1) + ->releaseAfter(0) + ->block(0) + ->then(fn () => 'second'); + $this->assertSame('second', $second); + + $cache->lock('perm1')->forceRelease(); + } + + public function testFunnelWithZeroLimitOnRedisDoesNotRunCallback() + { + // limit(0) results in zero precomputed slots — the limiter must short-circuit + // before calling Lua eval, otherwise unpack({}) → redis.call('mget') errors. + $called = false; + + $result = $this->cache()->funnel('zero') + ->limit(0) + ->releaseAfter(5) + ->block(0) + ->then( + function () use (&$called) { + $called = true; + + return 'should-not-run'; + }, + fn () => 'failed', + ); + + $this->assertFalse($called); + $this->assertSame('failed', $result); + } +} diff --git a/tests/Redis/ConcurrencyLimiterTest.php b/tests/Redis/ConcurrencyLimiterTest.php index 68b9b1883..ca0048ead 100644 --- a/tests/Redis/ConcurrencyLimiterTest.php +++ b/tests/Redis/ConcurrencyLimiterTest.php @@ -140,6 +140,30 @@ public function testAcquirePassesCorrectKeysToLuaScript() $limiter->block(5); } + public function testBlockWithZeroLimitDoesNotCallEvalAndTimesOut() + { + $redis = $this->mockRedis(); + + // limit(0) means no slots — acquire must short-circuit before calling eval, + // otherwise Lua hits redis.call('mget') with no args and errors. + $redis->shouldNotReceive('eval'); + + $this->expectException(LimiterTimeoutException::class); + + (new ConcurrencyLimiter($redis, 'zero', 0, 5))->block(0); + } + + public function testBlockWithNegativeLimitDoesNotCallEvalAndTimesOut() + { + $redis = $this->mockRedis(); + + $redis->shouldNotReceive('eval'); + + $this->expectException(LimiterTimeoutException::class); + + (new ConcurrencyLimiter($redis, 'neg', -1, 5))->block(0); + } + /** * Create a mock RedisProxy. */