-
Notifications
You must be signed in to change notification settings - Fork 5
Description
What happened?
Description:
Problem
When running integration tests in parallel, each test creates unique workers (via Uuid::new_v4()) and waits for its jobs to finish by polling lock_by IN (worker_a, worker_b). However, apalis dispatches jobs based on job_type, not worker_id. This means any worker registered with worker_type = "family_cloud::jobs::DeleteJob" can steal jobs from another test's workers.
Root Cause
From inspecting apalis.jobs and apalis.workers during a parallel test run:
- 8 workers were registered (4×
DeleteJob, 4×CopyJob) across 6 parallel tests - All 11 jobs were processed by only 2 worker UUIDs
- The remaining 6 worker UUIDs never picked up any jobs
- Tests waiting on those 6 UUIDs via
lock_bylooped until timeout
Observed Behavior
- 2 out of 6 integration tests always fail with
"worker did not finish in time" - Increasing timeout (up to 600s) has no effect — the jobs were already processed by a different worker
Expected behavior
Each test should be able to reliably wait for its own enqueued jobs to finish.
Steps to reproduce
- Create multiple integration tests that each call
init_workers()(generates unique worker UUIDs per test) - Each test enqueues jobs via an API endpoint that dispatches
DeleteJob/CopyJob - Each test waits for its jobs by polling
apalis.jobs WHERE lock_by IN (worker_a_uuid, worker_b_uuid) - Run all tests in parallel:
cargo test
Expected
Each test's jobs are processed by its own workers and lock_by matches the test's worker UUIDs.
Actual
Apalis dispatches jobs by job_type — any worker with matching worker_type picks up the job regardless of UUID. 2 out of 6 tests always timeout because their enqueued jobs were processed by another test's workers, so lock_by never matches their UUIDs.
Reproducible: Yes, consistently 2/6 tests fail on every parallel run.
Minimal code example
pub async fn init_workers(con: &PgPool, rfs: Client) -> anyhow::Result<WorkersName> {
let n_worker = WorkersName {
delete: Uuid::new_v4().to_string(),
copy: Uuid::new_v4().to_string(),
};
init_apalis(con, rfs, n_worker.clone()).await?;
Ok(n_worker)
}
async fn wait_job_until_finishes(con: &PgPool, workers: &WorkersName) -> anyhow::Result<()> {
tokio::time::timeout(Duration::from_secs(25), async {
loop {
let value = sqlx::query!(
r#"SELECT (
-- Phase 1: jobs must exist first
(SELECT COUNT(*) FROM apalis.jobs j
WHERE j.lock_by IN ($1,$2)) > 0
AND
-- Phase 2: all of them must be done
(SELECT COUNT(*) FROM apalis.jobs j
WHERE j.lock_by IN ($1,$2) AND j.done_at IS NULL) = 0
) AS "done!: bool""#,
workers.delete,
workers.copy,
)
.fetch_one(con)
.await?;
if value.done {
break;
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
Ok::<_, sqlx::Error>(())
})
.await
.expect("worker did not finish in time")?;
Ok(())
}Version
1.0.0-rc.x
Environment
- OS: Linux rusty 6.19.2-2-cachyos V0.3 apalis#1 SMP PREEMPT_DYNAMIC Mon, 16 Feb 2026 20:41:55 +0000 x86_64 GNU/Linux
- Rust version: rustc 1.90.0 (1159e78c4 2025-09-14)
- Cargo version: cargo 1.90.0 (840b83a10 2025-07-30)
Relevant log output
Possible Solutions
- Wait by
job_type + run_at >= enqueued_attimestamp instead oflock_by - Capture and track job IDs at enqueue time
- Use a unique apalis schema per test to fully isolate job queues
log tables:
jobs_table.xlsx
workers_table.xlsx