Summary
Implement basic request chaining where each race thread executes a sequence of requests, with the ability to extract and pass data between steps.
Configuration
race_attack:
race:
threads: 10
sync_mechanism: barrier
chain:
- name: step1
request: |
GET /api/data HTTP/1.1
Host: {{ config.host }}
extract:
value:
type: jpath
pattern: "$.data.id"
- name: step2
request: |
POST /api/action HTTP/1.1
Host: {{ config.host }}
Content-Type: application/json
{"id": "{{ chain.step1.value }}"}
Implementation
1. Update models/config.py
from typing import List, Optional, Dict, Any
from dataclasses import dataclass, field
@dataclass
class ChainStep:
"""Single step in a request chain."""
name: str
request: str
extract: Optional[Dict[str, ExtractPattern]] = None
description: Optional[str] = None
@dataclass
class ChainConfig:
"""Configuration for request chaining."""
steps: List[ChainStep] = field(default_factory=list)
@dataclass
class RaceConfig:
"""Race condition configuration."""
threads: int = 10
sync_mechanism: str = "barrier"
connection_strategy: str = "preconnect"
thread_propagation: str = "single"
chain: Optional[List[Dict[str, Any]]] = None # ← Add this
def get_chain_config(self) -> Optional[ChainConfig]:
"""Parse chain configuration."""
if not self.chain:
return None
steps = []
for step_data in self.chain:
steps.append(ChainStep(
name=step_data.get('name', f'step_{len(steps)}'),
request=step_data['request'],
extract=step_data.get('extract'),
description=step_data.get('description'),
))
return ChainConfig(steps=steps)
2. Create chain/context.py
from typing import Dict, Any, Optional
from dataclasses import dataclass, field
import httpx
@dataclass
class StepResult:
"""Result of a single chain step."""
name: str
response: httpx.Response
extracted: Dict[str, Any] = field(default_factory=dict)
duration_ms: float = 0.0
success: bool = True
error: Optional[str] = None
class ChainContext:
"""
Context for a request chain execution.
Each thread gets its own ChainContext to maintain isolation.
Provides access to previous step results via `chain..`.
"""
def __init__(self, thread_id: int):
self.thread_id = thread_id
self._steps: Dict[str, StepResult] = {}
def add_result(self, result: StepResult) -> None:
"""Add a step result to the context."""
self._steps[result.name] = result
def get_step(self, name: str) -> Optional[StepResult]:
"""Get result of a specific step."""
return self._steps.get(name)
def get_extracted(self, step_name: str, var_name: str) -> Optional[Any]:
"""Get extracted variable from a step."""
step = self._steps.get(step_name)
if step:
return step.extracted.get(var_name)
return None
def to_template_context(self) -> Dict[str, Any]:
"""
Convert to context dict for Jinja2 templates.
Allows templates to use: {{ chain.step1.value }}
"""
context = {}
for name, result in self._steps.items():
context[name] = {
**result.extracted,
'response': result.response,
'status_code': result.response.status_code,
'text': result.response.text,
'success': result.success,
}
return context
@property
def all_successful(self) -> bool:
"""Check if all steps succeeded."""
return all(step.success for step in self._steps.values())
@property
def total_duration_ms(self) -> float:
"""Total duration of all steps."""
return sum(step.duration_ms for step in self._steps.values())
3. Create chain/executor.py
import time
import logging
from typing import Optional, Dict, Any
import httpx
from treco.chain.context import ChainContext, StepResult
from treco.models.config import ChainConfig, ChainStep
from treco.template import TemplateEngine
from treco.http.parser import HTTPParser
from treco.http.extractor import ExtractorRegistry
logger = logging.getLogger(__name__)
class ChainExecutor:
"""Executes a request chain within a single thread."""
def __init__(
self,
config: ChainConfig,
client: httpx.Client,
template_engine: TemplateEngine,
base_context: Dict[str, Any],
):
self.config = config
self.client = client
self.template = template_engine
self.base_context = base_context
self.parser = HTTPParser()
def execute(self, thread_id: int) -> ChainContext:
"""
Execute all steps in the chain.
Args:
thread_id: ID of the current thread
Returns:
ChainContext with all step results
"""
chain_ctx = ChainContext(thread_id)
for step in self.config.steps:
result = self._execute_step(step, chain_ctx)
chain_ctx.add_result(result)
if not result.success:
logger.warning(f"[Thread {thread_id}] Chain step '{step.name}' failed: {result.error}")
break
return chain_ctx
def _execute_step(self, step: ChainStep, chain_ctx: ChainContext) -> StepResult:
"""Execute a single chain step."""
start_time = time.perf_counter()
try:
# Build template context with chain data
context = {
**self.base_context,
'chain': chain_ctx.to_template_context(),
}
# Render request template
rendered = self.template.render(step.request, context)
# Parse HTTP request
method, path, headers, body = self.parser.parse(rendered)
# Send request
response = self.client.request(
method=method,
url=path,
headers=headers,
content=body if body else None,
)
# Extract data
extracted = {}
if step.extract:
extracted = self._extract_data(response, step.extract)
duration = (time.perf_counter() - start_time) * 1000
return StepResult(
name=step.name,
response=response,
extracted=extracted,
duration_ms=duration,
success=True,
)
except Exception as e:
duration = (time.perf_counter() - start_time) * 1000
return StepResult(
name=step.name,
response=None,
duration_ms=duration,
success=False,
error=str(e),
)
def _extract_data(
self,
response: httpx.Response,
patterns: Dict[str, Any]
) -> Dict[str, Any]:
"""Extract data from response using configured extractors."""
extracted = {}
for var_name, pattern_config in patterns.items():
extractor_type = pattern_config.get('type', 'regex')
pattern = pattern_config.get('pattern', '')
extractor = ExtractorRegistry.get_instance(extractor_type)
value = extractor.extract(response, pattern)
if value is not None:
extracted[var_name] = value
return extracted
4. Update orchestrator/coordinator.py
# In race_worker method
def race_worker(
self,
thread_id: int,
state: StateConfig,
context: ExecutionContext,
# ...
) -> RaceResult:
race_config = state.race
chain_config = race_config.get_chain_config()
if chain_config:
# Execute chain
chain_executor = ChainExecutor(
config=chain_config,
client=client,
template_engine=self.template,
base_context=context.to_dict(),
)
# Wait at sync point
sync.wait(thread_id)
# Execute chain
chain_ctx = chain_executor.execute(thread_id)
# Return last response
last_step = list(chain_ctx._steps.values())[-1]
return RaceResult(
thread_id=thread_id,
response=last_step.response,
duration_ms=chain_ctx.total_duration_ms,
chain_context=chain_ctx,
)
else:
# Original single-request logic
# ...
Acceptance Criteria
Example Test Case
def test_basic_chain_execution():
"""Test that chain steps execute in order with data passing."""
config = ChainConfig(steps=[
ChainStep(
name="get_token",
request="GET /api/token",
extract={"token": {"type": "jpath", "pattern": "$.token"}}
),
ChainStep(
name="use_token",
request="GET /api/protected\nAuthorization: Bearer {{ chain.get_token.token }}"
),
])
executor = ChainExecutor(config, client, template, {})
ctx = executor.execute(thread_id=0)
assert ctx.all_successful
assert ctx.get_extracted("get_token", "token") is not None
References
- Current race execution:
orchestrator/coordinator.py
- Template engine:
template/engine.py
Summary
Implement basic request chaining where each race thread executes a sequence of requests, with the ability to extract and pass data between steps.
Configuration
Implementation
1. Update
models/config.py2. Create
chain/context.py3. Create
chain/executor.py4. Update
orchestrator/coordinator.pyAcceptance Criteria
chainconfig from YAMLChainStepandChainConfigmodelsChainContextfor thread-isolated stateChainExecutorto run steps sequentially{{ chain.step.var }})Example Test Case
References
orchestrator/coordinator.pytemplate/engine.py