Skip to content

Fork/Join Primitives Implementation #7

@maycon

Description

@maycon

Fork/Join Primitives Implementation

Summary

Implement fork/join primitives that enable parallel execution of multiple states with configurable synchronization strategies, leveraging Python 3.14t's GIL-free threading for true parallelism.

Motivation

Security testing often requires testing multiple attack vectors simultaneously:

  • Scan multiple endpoints concurrently
  • Test different user roles in parallel
  • Execute distributed race conditions
  • Compare timing across parallel requests

Current TRECO only supports sequential state execution. Parallel testing requires running multiple instances of the tool, which:

  • Cannot share state between parallel branches
  • Has no way to synchronize or aggregate results
  • Lacks coordinated timing for complex race conditions
  • Requires manual result correlation

Proposed Syntax

states:
  parallel_vulnerability_scan:
    description: "Test multiple endpoints simultaneously"
    
    fork:
      # Define parallel branches
      branches:
        - state: test_admin_panel
          input:
            endpoint: "/admin"
            token: "{{ auth_token }}"
        
        - state: test_api_endpoints
          input:
            endpoint: "/api"
            token: "{{ auth_token }}"
        
        - state: test_webhooks
          input:
            endpoint: "/webhooks"
            token: "{{ auth_token }}"
      
      # Synchronization strategy
      join:
        strategy: all  # Wait for all branches
        timeout_seconds: 30
    
    # Process results after join
    on_join:
      extract:
        admin_vulnerable:
          from_branch: test_admin_panel
          path: "result.vulnerable"
        
        api_vulnerable:
          from_branch: test_api_endpoints
          path: "result.vulnerable"
    
    next:
      - when:
          - condition: "{{ admin_vulnerable or api_vulnerable }}"
        goto: exploit_found
      - otherwise:
        goto: no_vulnerabilities

  # Branch state definition
  test_admin_panel:
    request: |
      GET {{ endpoint }}
      Authorization: Bearer {{ token }}
    
    # Return values to parent
    returns:
      vulnerable: "{{ status == 200 and 'admin' in body }}"
      response_time: "{{ response_time_ms }}"

Join Strategies

1. all - Wait for All Branches (Default)

Wait for all branches to complete before proceeding.

join:
  strategy: all
  timeout_seconds: 60

Use case: Comprehensive testing where all results are needed.

Behavior:

  • Blocks until all branches complete or timeout
  • Returns all branch results
  • Fails if any branch times out (unless fail_fast: false)

2. any - First to Complete

Proceed as soon as any branch completes.

join:
  strategy: any
  timeout_seconds: 30

Use case: Finding first vulnerability, fastest endpoint.

Behavior:

  • Returns immediately when first branch completes
  • Cancels remaining branches (optional: cancel_others: false)
  • Returns single result from fastest branch

3. majority - More Than Half

Proceed when >50% of branches complete.

join:
  strategy: majority
  timeout_seconds: 45

Use case: Consensus-based decisions, fault tolerance.

Behavior:

  • Waits for ceil(N/2) branches
  • Returns results from completed branches
  • Continues waiting for timeout for remaining branches

4. custom - Custom Completion Logic

Define custom completion condition.

join:
  strategy: custom
  condition: "{{ successful_branches >= 2 or elapsed_time > 10 }}"
  check_interval_ms: 100

Use case: Complex synchronization requirements.

Implementation Details

Fork Executor

from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Barrier, Event
import time

class ForkJoinExecutor:
    def __init__(self):
        self.executor = ThreadPoolExecutor(max_workers=10)
    
    def execute_fork(self, fork_config, context):
        """Execute fork/join pattern."""
        branches = fork_config['branches']
        join_config = fork_config.get('join', {'strategy': 'all'})
        
        # Create futures for each branch
        futures = {}
        for branch in branches:
            future = self.executor.submit(
                self.execute_branch,
                branch['state'],
                {**context, **branch.get('input', {})}
            )
            futures[future] = branch
        
        # Wait according to join strategy
        results = self.wait_for_join(futures, join_config)
        
        return results
    
    def wait_for_join(self, futures, join_config):
        """Wait for branches according to strategy."""
        strategy = join_config.get('strategy', 'all')
        timeout = join_config.get('timeout_seconds', 60)
        
        if strategy == 'all':
            return self._join_all(futures, timeout)
        elif strategy == 'any':
            return self._join_any(futures, timeout)
        elif strategy == 'majority':
            return self._join_majority(futures, timeout)
        elif strategy == 'custom':
            return self._join_custom(futures, join_config)
    
    def _join_all(self, futures, timeout):
        """Wait for all branches."""
        results = {}
        start_time = time.time()
        
        for future in futures:
            remaining_timeout = timeout - (time.time() - start_time)
            if remaining_timeout <= 0:
                raise TimeoutError("Fork join timeout")
            
            try:
                branch = futures[future]
                result = future.result(timeout=remaining_timeout)
                results[branch['state']] = result
            except Exception as e:
                results[branch['state']] = {'error': str(e)}
        
        return results
    
    def _join_any(self, futures, timeout):
        """Return first completed branch."""
        completed, pending = wait(
            futures.keys(),
            timeout=timeout,
            return_when=FIRST_COMPLETED
        )
        
        if not completed:
            raise TimeoutError("No branch completed in time")
        
        # Cancel pending futures if configured
        if self.config.get('cancel_others', True):
            for future in pending:
                future.cancel()
        
        # Return first result
        first_future = next(iter(completed))
        branch = futures[first_future]
        return {branch['state']: first_future.result()}
    
    def _join_majority(self, futures, timeout):
        """Wait for majority of branches."""
        total_branches = len(futures)
        required = (total_branches // 2) + 1
        
        results = {}
        for future in as_completed(futures.keys(), timeout=timeout):
            branch = futures[future]
            try:
                results[branch['state']] = future.result()
            except Exception as e:
                results[branch['state']] = {'error': str(e)}
            
            # Check if we have majority
            if len(results) >= required:
                return results
        
        # Timeout or not enough results
        if len(results) < required:
            raise TimeoutError(f"Only {len(results)}/{required} branches completed")
        
        return results

Branch State Execution

def execute_branch(self, state_name, branch_context):
    """Execute a single branch state."""
    state_config = self.states[state_name]
    
    # Execute the state
    response = self.execute_state(state_config, branch_context)
    
    # Process returns
    returns = state_config.get('returns', {})
    result = {}
    
    for key, expression in returns.items():
        # Evaluate expression with response context
        result[key] = self.evaluate_expression(
            expression,
            {**branch_context, 'response': response}
        )
    
    return {
        'state': state_name,
        'result': result,
        'response': response,
        'success': response.ok,
        'status': response.status_code
    }

Result Extraction from Branches

def extract_branch_results(self, on_join_config, fork_results):
    """Extract values from branch results."""
    extracted = {}
    
    for var_name, extract_config in on_join_config.get('extract', {}).items():
        from_branch = extract_config['from_branch']
        path = extract_config['path']
        
        if from_branch not in fork_results:
            raise ValueError(f"Branch '{from_branch}' not found in results")
        
        # Navigate path (e.g., "result.vulnerable")
        value = fork_results[from_branch]
        for key in path.split('.'):
            value = value[key]
        
        extracted[var_name] = value
    
    return extracted

Advanced Features

1. Nested Forks

states:
  multi_level_test:
    fork:
      branches:
        - state: region_us_tests
        - state: region_eu_tests
    
    join:
      strategy: all

  region_us_tests:
    fork:
      branches:
        - state: us_east_test
        - state: us_west_test
    join:
      strategy: all

2. Branch Dependencies

fork:
  branches:
    - state: setup
      wait_for: []
    
    - state: attack_1
      wait_for: [setup]
    
    - state: attack_2
      wait_for: [setup]
    
    - state: cleanup
      wait_for: [attack_1, attack_2]

3. Resource Limits

fork:
  branches: [...]
  
  limits:
    max_concurrent: 5  # Max parallel branches
    memory_mb: 512     # Per-branch limit
    timeout_per_branch: 10

4. Shared State

fork:
  branches: [...]
  
  shared_state:
    counter: 0
    results: []
  
  synchronization:
    lock: true  # Thread-safe access

Use Cases

1. Multi-Endpoint Vulnerability Scan

states:
  comprehensive_scan:
    fork:
      branches:
        - {state: scan_admin, input: {path: "/admin"}}
        - {state: scan_api, input: {path: "/api"}}
        - {state: scan_graphql, input: {path: "/graphql"}}
        - {state: scan_websocket, input: {path: "/ws"}}
    
    join:
      strategy: all
      timeout_seconds: 120

2. Multi-User Race Condition

states:
  distributed_race:
    fork:
      branches:
        - {state: user_redeem, input: {user: "alice"}}
        - {state: user_redeem, input: {user: "bob"}}
        - {state: user_redeem, input: {user: "charlie"}}
      
      # Synchronize all branches before sending requests
      synchronization:
        barrier: true
    
    join:
      strategy: all

3. A/B Testing Attack Variants

states:
  test_payloads:
    fork:
      branches:
        - {state: sql_injection, input: {payload: "' OR 1=1--"}}
        - {state: sql_injection, input: {payload: "'; DROP TABLE--"}}
        - {state: sql_injection, input: {payload: "1' UNION SELECT--"}}
    
    join:
      strategy: any  # Stop on first successful exploit

4. Load Distribution Testing

states:
  load_test:
    fork:
      branches:
        # Generate 50 parallel branches
        - state: make_request
          repeat: 50
    
    join:
      strategy: all
    
    on_join:
      extract:
        avg_response_time:
          aggregate: mean
          from_all_branches: "result.response_time_ms"

Testing Requirements

Unit Tests

  • Fork creates correct number of threads
  • All join strategies work correctly
  • Timeout handling for each strategy
  • Branch result aggregation
  • Error handling in branches
  • Branch cancellation (for any strategy)

Integration Tests

  • Complex fork/join scenarios
  • Nested forks work correctly
  • Shared state synchronization
  • Resource limits enforced
  • Real HTTP requests in parallel

Performance Tests

  • 10 parallel branches complete efficiently
  • 100 parallel branches (stress test)
  • Memory usage with parallel execution
  • GIL-free performance vs regular Python

Concurrency Tests

  • No race conditions in result aggregation
  • Thread-safe shared state access
  • Deadlock detection
  • Proper cleanup on timeout/error

Documentation Updates

  • "Parallel Execution" section in README
  • Join strategy decision guide
  • Performance benchmarks (with/without GIL)
  • 5+ practical examples
  • Debugging parallel states guide
  • Best practices for fork/join

Schema Updates

{
  "fork": {
    "type": "object",
    "required": ["branches"],
    "properties": {
      "branches": {
        "type": "array",
        "minItems": 2,
        "items": {
          "type": "object",
          "required": ["state"],
          "properties": {
            "state": {"type": "string"},
            "input": {"type": "object"},
            "wait_for": {"type": "array", "items": {"type": "string"}}
          }
        }
      },
      "join": {
        "type": "object",
        "properties": {
          "strategy": {
            "type": "string",
            "enum": ["all", "any", "majority", "custom"]
          },
          "timeout_seconds": {"type": "integer", "minimum": 1},
          "cancel_others": {"type": "boolean"}
        }
      }
    }
  },
  "on_join": {
    "type": "object",
    "properties": {
      "extract": {"type": "object"}
    }
  },
  "returns": {
    "type": "object",
    "additionalProperties": {"type": "string"}
  }
}

Python 3.14t Optimization

This feature specifically benefits from Python 3.14t's GIL-free threading:

# Performance comparison
# Python 3.13 (with GIL): ~100ms for 10 parallel requests
# Python 3.14t (no GIL): ~15ms for 10 parallel requests

# True parallel execution without GIL contention
with ThreadPoolExecutor(max_workers=50) as executor:
    futures = [executor.submit(send_request) for _ in range(50)]
    # All 50 requests execute truly in parallel!

Acceptance Criteria

  • All 4 join strategies implemented and tested
  • Branch execution is truly parallel (GIL-free)
  • Timeout handling works for all strategies
  • Results correctly aggregated and extracted
  • Nested forks work correctly
  • Error in one branch doesn't crash all branches
  • Performance: 10 parallel branches ~10x faster than sequential
  • Documentation with 5+ examples
  • All tests pass

Related Issues

  • Depends on: #XX Basic Loop Implementation
  • Blocks: #XX Nested Race Conditions Support
  • Related to: Python 3.14t free-threaded build requirement

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions