Skip to content

feat(chain): add conditional execution and sync points #32

@maycon

Description

@maycon

Summary

Extend request chaining with:

  1. Conditional execution: Skip steps based on conditions
  2. Sync points: Control where threads synchronize within the chain
  3. Error handling: Configure behavior on step failure

Parent Issue:

  • Part of #XX (Request Chaining Support)
  • Depends on #XX (Basic Request Chaining)

Configuration

race_attack:
  race:
    threads: 20
    sync_mechanism: barrier
    
    chain:
      - name: check_balance
        request: "GET /api/balance"
        extract:
          balance: "json.amount"
      
      - name: transfer
        request: "POST /api/transfer"
        # Only execute if balance > 0
        condition: "{{ chain.check_balance.balance | float > 100 }}"
        # What to do if condition is false
        on_skip: continue  # continue | abort
      
      - name: verify
        request: "GET /api/balance"
        # What to do if this step fails
        on_error: continue  # continue | abort | retry
        retry:
          max_attempts: 3
          delay_ms: 100
    
    # All threads sync AFTER check_balance, BEFORE transfer
    sync_after: check_balance
    
    # Alternative: sync BEFORE specific step
    sync_before: transfer

Sync Point Behavior

Without sync point (sync at start):
├─ Thread 1: ──[SYNC]──[check]──[transfer]──[verify]──►
├─ Thread 2: ──[SYNC]──[check]──[transfer]──[verify]──►
└─ Thread 3: ──[SYNC]──[check]──[transfer]──[verify]──►
                 ↑
            All sync here, then diverge

With sync_after: check_balance
├─ Thread 1: ──[check]──[SYNC]──[transfer]──[verify]──►
├─ Thread 2: ──[check]──[SYNC]──[transfer]──[verify]──►
└─ Thread 3: ──[check]──[SYNC]──[transfer]──[verify]──►
                          ↑
                All threads have fresh balance,
                then race the transfer together

Implementation

1. Update models/config.py

from enum import Enum

class OnSkipBehavior(Enum):
    CONTINUE = "continue"  # Continue to next step
    ABORT = "abort"        # Stop chain execution

class OnErrorBehavior(Enum):
    CONTINUE = "continue"  # Continue to next step
    ABORT = "abort"        # Stop chain execution
    RETRY = "retry"        # Retry the step

@dataclass
class RetryConfig:
    max_attempts: int = 3
    delay_ms: int = 100
    backoff_factor: float = 2.0

@dataclass
class ChainStep:
    name: str
    request: str
    extract: Optional[Dict[str, ExtractPattern]] = None
    description: Optional[str] = None
    
    # Conditional execution
    condition: Optional[str] = None
    on_skip: OnSkipBehavior = OnSkipBehavior.CONTINUE
    
    # Error handling
    on_error: OnErrorBehavior = OnErrorBehavior.ABORT
    retry: Optional[RetryConfig] = None

@dataclass
class ChainConfig:
    steps: List[ChainStep] = field(default_factory=list)
    
    # Sync points
    sync_after: Optional[str] = None   # Step name to sync after
    sync_before: Optional[str] = None  # Step name to sync before

2. Update chain/executor.py

import time
import logging
from typing import Optional, Callable

logger = logging.getLogger(__name__)

class ChainExecutor:
    
    def __init__(
        self,
        config: ChainConfig,
        client: httpx.Client,
        template_engine: TemplateEngine,
        base_context: Dict[str, Any],
        sync_callback: Optional[Callable[[int, str], None]] = None,
    ):
        self.config = config
        self.client = client
        self.template = template_engine
        self.base_context = base_context
        self.sync_callback = sync_callback  # Called at sync points
    
    def execute(self, thread_id: int) -> ChainContext:
        chain_ctx = ChainContext(thread_id)
        
        for step in self.config.steps:
            # Check for sync_before
            if self.config.sync_before == step.name:
                self._do_sync(thread_id, f"before:{step.name}")
            
            # Evaluate condition
            if step.condition:
                should_execute = self._evaluate_condition(step.condition, chain_ctx)
                if not should_execute:
                    logger.info(f"[Thread {thread_id}] Skipping '{step.name}': condition not met")
                    
                    if step.on_skip == OnSkipBehavior.ABORT:
                        break
                    continue
            
            # Execute step with retry logic
            result = self._execute_with_retry(step, chain_ctx, thread_id)
            chain_ctx.add_result(result)
            
            # Handle errors
            if not result.success:
                if step.on_error == OnErrorBehavior.ABORT:
                    break
                # CONTINUE: just proceed to next step
            
            # Check for sync_after
            if self.config.sync_after == step.name:
                self._do_sync(thread_id, f"after:{step.name}")
        
        return chain_ctx
    
    def _evaluate_condition(self, condition: str, chain_ctx: ChainContext) -> bool:
        """Evaluate a Jinja2 condition."""
        context = {
            **self.base_context,
            'chain': chain_ctx.to_template_context(),
        }
        
        try:
            # Render as template and evaluate
            rendered = self.template.render(f"{{{{ {condition} }}}}", context)
            return rendered.lower() in ('true', '1', 'yes')
        except Exception as e:
            logger.warning(f"Condition evaluation failed: {e}")
            return False
    
    def _execute_with_retry(
        self, 
        step: ChainStep, 
        chain_ctx: ChainContext,
        thread_id: int,
    ) -> StepResult:
        """Execute step with optional retry logic."""
        
        max_attempts = 1
        delay_ms = 0
        backoff = 1.0
        
        if step.retry and step.on_error == OnErrorBehavior.RETRY:
            max_attempts = step.retry.max_attempts
            delay_ms = step.retry.delay_ms
            backoff = step.retry.backoff_factor
        
        last_result = None
        
        for attempt in range(max_attempts):
            result = self._execute_step(step, chain_ctx)
            last_result = result
            
            if result.success:
                return result
            
            if attempt < max_attempts - 1:
                sleep_time = delay_ms * (backoff ** attempt) / 1000
                logger.debug(f"[Thread {thread_id}] Retry {attempt + 1}/{max_attempts} for '{step.name}' in {sleep_time:.2f}s")
                time.sleep(sleep_time)
        
        return last_result
    
    def _do_sync(self, thread_id: int, sync_point: str) -> None:
        """Execute sync callback if provided."""
        if self.sync_callback:
            logger.debug(f"[Thread {thread_id}] Sync point: {sync_point}")
            self.sync_callback(thread_id, sync_point)

3. Update orchestrator/coordinator.py

def race_worker(self, thread_id: int, state: StateConfig, ...):
    
    chain_config = state.race.get_chain_config()
    
    if chain_config:
        # Create sync callback
        def sync_callback(tid: int, point: str):
            sync.wait(tid)
        
        chain_executor = ChainExecutor(
            config=chain_config,
            client=client,
            template_engine=self.template,
            base_context=context.to_dict(),
            sync_callback=sync_callback,
        )
        
        # If no explicit sync point, sync at start
        if not chain_config.sync_after and not chain_config.sync_before:
            sync.wait(thread_id)
        
        chain_ctx = chain_executor.execute(thread_id)
        # ...

Acceptance Criteria

  • Parse condition field in chain steps
  • Evaluate Jinja2 conditions with chain context
  • Implement on_skip behavior (continue/abort)
  • Implement on_error behavior (continue/abort/retry)
  • Implement retry logic with backoff
  • Support sync_after configuration
  • Support sync_before configuration
  • Sync callback integration with race sync mechanisms
  • Unit tests for conditional execution
  • Unit tests for sync points
  • Unit tests for retry logic
  • Documentation with examples

Example: Double-Spend with Sync Point

race_double_spend:
  race:
    threads: 20
    
    chain:
      - name: check_balance
        request: "GET /api/balance"
        extract:
          balance: "json.available"
      
      - name: withdraw
        request: |
          POST /api/withdraw
          {"amount": {{ chain.check_balance.balance }}}
        condition: "{{ chain.check_balance.balance | float > 0 }}"
        on_error: continue
    
    # Critical: all threads check balance, then race withdraw
    sync_after: check_balance

Example: Resilient Chain with Retry

chain:
  - name: get_token
    request: "POST /oauth/token"
    extract:
      token: "json.access_token"
    on_error: retry
    retry:
      max_attempts: 3
      delay_ms: 500
      backoff_factor: 2.0
  
  - name: call_api
    request: |
      GET /api/data
      Authorization: Bearer {{ chain.get_token.token }}
    condition: "{{ chain.get_token.token is defined }}"
    on_skip: abort

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions