Skip to content

Conversation

@techmahedy
Copy link
Member

@techmahedy techmahedy commented Dec 13, 2025

Job Chaining allows us to run multiple jobs sequentially, where each job starts only after the previous one completes successfully. This is perfect for multi-step workflows like:

  • Video processing pipelines
  • Data import → Transform → Export
  • Order processing workflows
  • Multi-step notifications
  • Complex business processes

Basic Chain

use Doppar\Queue\Drain;

Drain::conduct([
    new DownloadJob($url),
    new ProcessJob($path),
    new UploadJob($file),
    new NotifyJob($userId),
])->dispatch();

What happens:

  1. DownloadJob executes
  2. ProcessJob executes (only if download succeeds)
  3. UploadJob executes (only if processing succeeds)
  4. NotifyJob executes (only if upload succeeds)

If any job fails, the chain stops and remaining jobs won't execute.

Usages Pattern

Pattern 1: Static Chain Creation

use Doppar\Queue\Drain;

$chainId = Drain::conduct([
    new Job1(),
    new Job2(),
    new Job3(),
])->dispatch();

Pattern 2: Instance Method Chaining

$chainId = (new Job1())->chain([
    new Job2(),
    new Job3(),
])->dispatch();

Pattern 3: With Configuration

Drain::conduct([new Job1(), new Job2()])
    ->onQueue('priority')
    ->delayFor(60)
    ->dispatch();

With Success Handler and With Error Handler

Drain::conduct([...])
    ->onQueue('priority')
    ->delayFor(60)
    ->then(fn() => echo "Done!")
    ->catch(fn($job, $ex, $index) => Log::error($ex))
    ->dispatch();

Synchronous Execution

Execute chain immediately without queueing:

Drain::conduct([new Job1(), new Job2()])
    ->dispatchSync(); // Blocks until all jobs complete

How It Works Internally

1. Drain::conduct([Job1, Job2, Job3])->dispatch()
   ↓
2. Chain ID generated: "chain_abc123"
   ↓
3. First job (Job1) prepared with chain context:
   - chainId = "chain_abc123"
   - chainJobs = [Job1, Job2, Job3]
   - chainIndex = 0
   - chainOnComplete = serialized closure
   - chainOnFailure = serialized closure
   ↓
4. ONLY Job1 pushed to queue
   ↓
   
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
WORKER PROCESSES JOB 1
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

5. Worker picks up Job1 from queue
   ↓
6. Job1->handle() executes
   ↓
7. Job deleted from queue
   ↓
8. Check: job->isChained() → TRUE
   ↓
9. Call: job->dispatchNextChainJob()
   ↓
10. Inside dispatchNextChainJob():
    - nextIndex = 0 + 1 = 1
    - Get Job2 from chainJobs[1]
    - Attach chain context to Job2:
      * chainId = "chain_abc123"
      * chainJobs = [Job1, Job2, Job3]
      * chainIndex = 1
      * chainOnComplete = serialized closure
      * chainOnFailure = serialized closure
      * queueName = "default"
    - Apply queueable attributes
    - Push ONLY Job2 to queue
   ↓

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
WORKER PROCESSES JOB 2
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

11. Worker picks up Job2 from queue
    ↓
12. Job2->handle() executes
    ↓
13. Job deleted from queue
    ↓
14. Check: job->isChained() → TRUE
    ↓
15. Call: job->dispatchNextChainJob()
    ↓
16. Inside dispatchNextChainJob():
    - nextIndex = 1 + 1 = 2
    - Get Job3 from chainJobs[2]
    - Attach chain context to Job3
    - Apply queueable attributes
    - Push ONLY Job3 to queue
    ↓

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
WORKER PROCESSES JOB 3
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

17. Worker picks up Job3 from queue
    ↓
18. Job3->handle() executes
    ↓
19. Job deleted from queue
    ↓
20. Check: job->isChained() → TRUE
    ↓
21. Call: job->dispatchNextChainJob()
    ↓
22. Inside dispatchNextChainJob():
    - nextIndex = 2 + 1 = 3
    - Check: 3 >= count([Job1, Job2, Job3]) → TRUE
    - Chain complete!
    - Unserialize chainOnComplete closure
    - Call completion callback
    ↓

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
CHAIN COMPLETED
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

23. Queue is empty
24. All jobs executed sequentially
25. Completion callback executed

Failure Scenario

Assume Job2 Fails:

1. Worker picks up Job2
   ↓
2. Job2->handle() throws exception
   ↓
3. handleJobException() called in QueueWorker
   ↓
4. Check: job->isChained() → TRUE
   ↓
5. Call: job->handleChainFailure($exception)
   ↓
6. Inside handleChainFailure():
   - Unserialize chainOnFailure closure
   - Call failure callback with:
     * $job = Job2 instance
     * $exception = thrown exception
     * $index = 1 (Job2's position)
   ↓
7. Job retried OR marked as failed (based on tries)
   ↓
8. Job3 is NEVER dispatched
   ↓
9. Chain stops at Job2

Chain Implementation Flow

┌─────────────────────────────────────────────────────────────┐
│ Drain::conduct([JobA, JobB, JobC])->dispatch()             │
└────────────────────────────┬────────────────────────────────┘
                             │
                             ▼
                    Generate chain_abc123
                             │
                             ▼
┌─────────────────────────────────────────────────────────────┐
│ Attach to JobA:                                             │
│ • chainId = "chain_abc123"                                  │
│ • chainJobs = [JobA, JobB, JobC]                            │
│ • chainIndex = 0                                            │
│ • chainOnComplete = serialized closure                      │
│ • chainOnFailure = serialized closure                       │
└────────────────────────────┬────────────────────────────────┘
                             │
                             ▼
                   Push ONLY JobA to queue
                             │
                             ▼
┌─────────────────────────────────────────────────────────────┐
│ QUEUE: [JobA]                                               │
└────────────────────────────┬────────────────────────────────┘
                             │
                             ▼
                    Worker processes JobA
                             │
                   ┌─────────┴─────────┐
                   │                   │
                SUCCESS ✅           FAIL ❌
                   │                   │
                   ▼                   ▼
          dispatchNextChainJob()  handleChainFailure()
                   │                   │
                   ▼                   ▼
          Push JobB to queue    Call catch() callback
                   │                   │
                   ▼                   ▼
          QUEUE: [JobB]         JobC never dispatched ⛔
                   │
                   ▼
          Worker processes JobB
                   │
                (repeat...)

@rrr63 rrr63 merged commit 33adb1d into doppar:1.x Dec 14, 2025
3 checks passed
@rrr63
Copy link
Member

rrr63 commented Dec 14, 2025

Thank you @techmahedy

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants