Skip to content

Commit 829d9d5

Browse files
authored
Merge pull request #127 from devoption/codex/feat-28-surreal-queue
feat: add a surreal queue driver
2 parents 270f491 + 884657b commit 829d9d5

6 files changed

Lines changed: 420 additions & 2 deletions

File tree

README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,24 @@ php artisan migrate --database=surreal --path=database/migrations/0001_01_01_000
133133
- Session read, write, update, and expiry behavior are covered in the test suite against a real Surreal runtime.
134134
- This driver intentionally follows Laravel's normal database-session lifecycle, so expiry cleanup still relies on Laravel's standard session lottery / pruning behavior instead of Surreal-native TTL features.
135135

136+
### Surreal-Backed Queues
137+
138+
Katra now also exposes a first-class `surreal` Laravel queue connection for teams that want jobs to live in SurrealDB alongside the rest of the application state.
139+
140+
- Set `QUEUE_CONNECTION=surreal` to use the Surreal-backed queue connector.
141+
- The connector defaults to the `surreal` database connection, but you can override the queue connection, table, queue name, and retry window with `SURREAL_QUEUE_CONNECTION`, `SURREAL_QUEUE_TABLE`, `SURREAL_QUEUE`, and `SURREAL_QUEUE_RETRY_AFTER`.
142+
- If you also want failed job records in SurrealDB, set `QUEUE_FAILED_DRIVER=database-uuids` and `QUEUE_FAILED_DATABASE=surreal`.
143+
- Make sure the queue tables exist on the Surreal connection before you start a worker. Katra's queue migration also creates `job_batches` and `failed_jobs` alongside `jobs`:
144+
145+
```bash
146+
php artisan migrate --database=surreal --path=database/migrations/0001_01_01_000002_create_jobs_table.php
147+
```
148+
149+
- Start a worker against the Surreal connection with `php artisan queue:work surreal`.
150+
- Queue enqueue, reserve, complete, retry, and failed-job behavior are covered in the test suite against a real Surreal runtime.
151+
- This connector currently uses optimistic job reservation instead of SQL row locks and database transactions, so it is best suited to Katra's current low-contention worker model rather than high-volume multi-worker contention scenarios.
152+
- `after_commit` is not supported on the Surreal queue connection because the Surreal database connection does not expose SQL transaction semantics.
153+
136154
## Planning Docs
137155

138156
- [Katra v2 Overview](docs/v2-overview.md)

app/Providers/AppServiceProvider.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
namespace App\Providers;
44

55
use App\Services\Surreal\Migrations\SurrealMigrationRepository;
6+
use App\Services\Surreal\Queue\SurrealQueueConnector;
67
use App\Services\Surreal\Schema\SurrealSchemaConnection;
78
use App\Services\Surreal\SurrealConnection;
89
use App\Services\Surreal\SurrealDocumentStore;
910
use App\Services\Surreal\SurrealHttpClient;
1011
use App\Services\Surreal\SurrealRuntimeManager;
1112
use Illuminate\Database\DatabaseManager;
13+
use Illuminate\Queue\QueueManager;
1214
use Illuminate\Session\DatabaseSessionHandler;
1315
use Illuminate\Support\ServiceProvider;
1416

@@ -50,5 +52,11 @@ public function boot(): void
5052
$app,
5153
);
5254
});
55+
56+
$this->app->afterResolving('queue', function (QueueManager $manager): void {
57+
$manager->addConnector('surreal', function (): SurrealQueueConnector {
58+
return new SurrealQueueConnector($this->app['db']);
59+
});
60+
});
5361
}
5462
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
<?php
2+
3+
namespace App\Services\Surreal\Queue;
4+
5+
use Illuminate\Queue\DatabaseQueue;
6+
use Illuminate\Queue\Jobs\DatabaseJob;
7+
use Illuminate\Queue\Jobs\DatabaseJobRecord;
8+
use Illuminate\Support\Carbon;
9+
use stdClass;
10+
11+
class SurrealQueue extends DatabaseQueue
12+
{
13+
public function pop($queue = null): ?DatabaseJob
14+
{
15+
$queue = $this->getQueue($queue);
16+
17+
foreach ($this->nextAvailableJobs($queue) as $jobRecord) {
18+
$reservedJob = $this->attemptToReserve($jobRecord);
19+
20+
if ($reservedJob !== null) {
21+
return new DatabaseJob(
22+
$this->container,
23+
$this,
24+
$reservedJob,
25+
$this->connectionName,
26+
$queue,
27+
);
28+
}
29+
}
30+
31+
return null;
32+
}
33+
34+
public function deleteReserved($queue, $id): void
35+
{
36+
$this->database->table($this->table)
37+
->where('id', $id)
38+
->delete();
39+
}
40+
41+
public function deleteAndRelease($queue, $job, $delay): void
42+
{
43+
$this->deleteReserved($queue, $job->getJobId());
44+
45+
$this->release($queue, $job->getJobRecord(), $delay);
46+
}
47+
48+
/**
49+
* @return list<DatabaseJobRecord>
50+
*/
51+
private function nextAvailableJobs(string $queue): array
52+
{
53+
$expiration = Carbon::now()->subSeconds($this->retryAfter)->getTimestamp();
54+
55+
return $this->database->table($this->table)
56+
->where('queue', $queue)
57+
->where(function ($query) use ($expiration): void {
58+
$query->where(function ($query): void {
59+
$query->whereNull('reserved_at')
60+
->where('available_at', '<=', $this->currentTime());
61+
})->orWhere(function ($query) use ($expiration): void {
62+
$query->whereNotNull('reserved_at')
63+
->where('reserved_at', '<=', $expiration);
64+
});
65+
})
66+
->orderBy('id', 'asc')
67+
->limit(5)
68+
->get()
69+
->map(static function (stdClass $record): DatabaseJobRecord {
70+
$record->reserved_at ??= null;
71+
72+
return new DatabaseJobRecord($record);
73+
})
74+
->all();
75+
}
76+
77+
private function attemptToReserve(DatabaseJobRecord $job): ?DatabaseJobRecord
78+
{
79+
$reservedAt = $this->currentTime();
80+
$attempts = $job->attempts + 1;
81+
$existingReservedAt = $job->reserved_at ?? null;
82+
83+
$query = $this->database->table($this->table)
84+
->where('id', $job->id);
85+
86+
if ($existingReservedAt === null) {
87+
$query->whereNull('reserved_at')
88+
->where('available_at', '<=', $reservedAt);
89+
} else {
90+
$query->where('reserved_at', $existingReservedAt);
91+
}
92+
93+
$updated = $query->update([
94+
'reserved_at' => $reservedAt,
95+
'attempts' => $attempts,
96+
]);
97+
98+
if ($updated !== 1) {
99+
return null;
100+
}
101+
102+
return new DatabaseJobRecord((object) [
103+
'id' => $job->id,
104+
'queue' => $job->queue,
105+
'payload' => $job->payload,
106+
'attempts' => $attempts,
107+
'reserved_at' => $reservedAt,
108+
'available_at' => $job->available_at,
109+
'created_at' => $job->created_at,
110+
]);
111+
}
112+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?php
2+
3+
namespace App\Services\Surreal\Queue;
4+
5+
use Illuminate\Database\ConnectionResolverInterface;
6+
use Illuminate\Queue\Connectors\ConnectorInterface;
7+
8+
class SurrealQueueConnector implements ConnectorInterface
9+
{
10+
public function __construct(
11+
private readonly ConnectionResolverInterface $connections,
12+
) {}
13+
14+
public function connect(array $config): SurrealQueue
15+
{
16+
return new SurrealQueue(
17+
$this->connections->connection($config['connection'] ?? 'surreal'),
18+
$config['table'],
19+
$config['queue'],
20+
$config['retry_after'] ?? 60,
21+
$config['after_commit'] ?? false,
22+
);
23+
}
24+
}

config/queue.php

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
| used by your application. An example configuration is provided for
2525
| each backend supported by Laravel. You're also free to add more.
2626
|
27-
| Drivers: "sync", "database", "beanstalkd", "sqs", "redis",
27+
| Drivers: "sync", "database", "surreal", "beanstalkd", "sqs", "redis",
2828
| "deferred", "background", "failover", "null"
2929
|
3030
*/
@@ -44,6 +44,15 @@
4444
'after_commit' => false,
4545
],
4646

47+
'surreal' => [
48+
'driver' => 'surreal',
49+
'connection' => env('SURREAL_QUEUE_CONNECTION', 'surreal'),
50+
'table' => env('SURREAL_QUEUE_TABLE', env('DB_QUEUE_TABLE', 'jobs')),
51+
'queue' => env('SURREAL_QUEUE', env('DB_QUEUE', 'default')),
52+
'retry_after' => (int) env('SURREAL_QUEUE_RETRY_AFTER', env('DB_QUEUE_RETRY_AFTER', 90)),
53+
'after_commit' => false,
54+
],
55+
4756
'beanstalkd' => [
4857
'driver' => 'beanstalkd',
4958
'host' => env('BEANSTALKD_QUEUE_HOST', 'localhost'),
@@ -122,7 +131,7 @@
122131

123132
'failed' => [
124133
'driver' => env('QUEUE_FAILED_DRIVER', 'database-uuids'),
125-
'database' => env('DB_CONNECTION', 'sqlite'),
134+
'database' => env('QUEUE_FAILED_DATABASE', env('DB_CONNECTION', 'sqlite')),
126135
'table' => 'failed_jobs',
127136
],
128137

0 commit comments

Comments
 (0)