diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000..f2c8682 --- /dev/null +++ b/conftest.py @@ -0,0 +1,9 @@ +"""Pytest configuration ensuring the src package is importable.""" + +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parent +SRC = ROOT / "src" +if str(SRC) not in sys.path: + sys.path.insert(0, str(SRC)) diff --git a/src/randomize_evolve/evaluators/__init__.py b/src/randomize_evolve/evaluators/__init__.py index e5c550c..8481920 100644 --- a/src/randomize_evolve/evaluators/__init__.py +++ b/src/randomize_evolve/evaluators/__init__.py @@ -5,5 +5,23 @@ Evaluator, EvaluatorConfig, ) +from randomize_evolve.evaluators.packet_switching import ( + PacketSwitchingEvaluation, + PacketSwitchingEvaluator, + PacketSwitchingEvaluatorConfig, + ScenarioConfig, + ScenarioResult, + default_scenarios, +) -__all__ = ["Evaluator", "EvaluatorConfig", "EvaluationResult"] +__all__ = [ + "Evaluator", + "EvaluatorConfig", + "EvaluationResult", + "PacketSwitchingEvaluator", + "PacketSwitchingEvaluatorConfig", + "PacketSwitchingEvaluation", + "ScenarioConfig", + "ScenarioResult", + "default_scenarios", +] diff --git a/src/randomize_evolve/evaluators/packet_switching.py b/src/randomize_evolve/evaluators/packet_switching.py new file mode 100644 index 0000000..30cff1c --- /dev/null +++ b/src/randomize_evolve/evaluators/packet_switching.py @@ -0,0 +1,183 @@ +"""Evaluator for packet switching scheduling strategies.""" + +from dataclasses import dataclass, field +from typing import Callable, List, Optional, Sequence + +from randomize_evolve.packet_switching import RoundRobinScheduler, SwitchScheduler +from randomize_evolve.traffic import ( + SimulationResult, + SwitchTrafficSimulator, + TrafficPatternConfig, + TrafficPatternType, + build_pattern, +) + + +SchedulerFactory = Callable[[int], SwitchScheduler] + + +@dataclass +class ScenarioConfig: + """Configures a single traffic scenario for evaluation.""" + + name: str + pattern: TrafficPatternConfig + time_slots: int = 1500 + warmup_slots: int = 200 + queue_limit: Optional[int] = None + throughput_weight: float = 0.6 + fairness_weight: float = 0.3 + flow_fairness_weight: float = 0.1 + drop_weight: float = 0.4 + seed_offset: int = 0 + + +@dataclass +class ScenarioResult: + """Stores the outcome of running a scheduler in a scenario.""" + + config: ScenarioConfig + metrics: SimulationResult + score: float + + +@dataclass +class PacketSwitchingEvaluation: + """Aggregated evaluation result across scenarios.""" + + score: float + scenario_results: List[ScenarioResult] + success: bool + + +@dataclass +class PacketSwitchingEvaluatorConfig: + """High level configuration for the packet switching evaluator.""" + + ports: int = 8 + scenarios: Sequence[ScenarioConfig] = field(default_factory=list) + seed: int = 7 + + def __post_init__(self) -> None: + if not self.scenarios: + self.scenarios = default_scenarios() + + +def default_scenarios() -> List[ScenarioConfig]: + """Return a curated set of default traffic scenarios.""" + + return [ + ScenarioConfig( + name="uniform-medium", + pattern=TrafficPatternConfig( + pattern_type=TrafficPatternType.UNIFORM, + offered_load=0.6, + ), + throughput_weight=0.7, + fairness_weight=0.25, + flow_fairness_weight=0.05, + drop_weight=0.3, + ), + ScenarioConfig( + name="bursty", + pattern=TrafficPatternConfig( + pattern_type=TrafficPatternType.BURSTY, + offered_load=0.4, + burst_rate=5, + burst_length=6, + burst_probability=0.12, + ), + throughput_weight=0.65, + fairness_weight=0.25, + flow_fairness_weight=0.1, + drop_weight=0.35, + ), + ScenarioConfig( + name="hotspot-heavy", + pattern=TrafficPatternConfig( + pattern_type=TrafficPatternType.HOTSPOT, + offered_load=0.75, + hotspot_probability=0.65, + ), + throughput_weight=0.55, + fairness_weight=0.35, + flow_fairness_weight=0.1, + drop_weight=0.45, + ), + ScenarioConfig( + name="heavy-cycle", + pattern=TrafficPatternConfig( + pattern_type=TrafficPatternType.HEAVY_LOAD, + heavy_load=0.98, + light_load=0.35, + heavy_duration=80, + light_duration=40, + ), + throughput_weight=0.6, + fairness_weight=0.3, + flow_fairness_weight=0.1, + drop_weight=0.5, + ), + ] + + +class PacketSwitchingEvaluator: + """Callable evaluator compatible with the OpenEvolve workflow.""" + + def __init__(self, config: Optional[PacketSwitchingEvaluatorConfig] = None) -> None: + self.config = config or PacketSwitchingEvaluatorConfig() + + def __call__(self, factory: Optional[SchedulerFactory] = None) -> PacketSwitchingEvaluation: + factory = factory or (lambda ports: RoundRobinScheduler(ports, ports)) + scenario_results: List[ScenarioResult] = [] + total_weight = 0.0 + total_score = 0.0 + + for index, scenario in enumerate(self.config.scenarios): + scheduler = factory(self.config.ports) + pattern = build_pattern(scenario.pattern) + simulator = SwitchTrafficSimulator( + pattern, + num_inputs=self.config.ports, + num_outputs=self.config.ports, + time_slots=scenario.time_slots, + warmup_slots=scenario.warmup_slots, + queue_limit=scenario.queue_limit, + seed=self.config.seed + scenario.seed_offset + index, + ) + metrics = simulator.run(scheduler) + scenario_score, scenario_weight = self._score(metrics, scenario) + total_score += scenario_score + total_weight += scenario_weight + scenario_results.append( + ScenarioResult( + config=scenario, + metrics=metrics, + score=scenario_score / scenario_weight if scenario_weight else 0.0, + ) + ) + + aggregate_score = total_score / total_weight if total_weight else float("inf") + success = bool(scenario_results) + return PacketSwitchingEvaluation( + score=aggregate_score, + scenario_results=scenario_results, + success=success, + ) + + def _score(self, metrics: SimulationResult, scenario: ScenarioConfig) -> tuple[float, float]: + weights = ( + scenario.throughput_weight, + scenario.fairness_weight, + scenario.flow_fairness_weight, + scenario.drop_weight, + ) + total_weight = sum(weights) + if total_weight <= 0: + raise ValueError("Scenario weight configuration must be positive") + throughput_term = scenario.throughput_weight * (1.0 - metrics.throughput) + fairness_term = scenario.fairness_weight * (1.0 - metrics.fairness_inputs) + flow_fairness_term = scenario.flow_fairness_weight * (1.0 - metrics.fairness_flows) + drop_term = scenario.drop_weight * metrics.drop_rate + scenario_score = throughput_term + fairness_term + flow_fairness_term + drop_term + return scenario_score, total_weight diff --git a/src/randomize_evolve/packet_switching/__init__.py b/src/randomize_evolve/packet_switching/__init__.py new file mode 100644 index 0000000..3f484bb --- /dev/null +++ b/src/randomize_evolve/packet_switching/__init__.py @@ -0,0 +1,8 @@ +"""Packet switching scheduling strategies and helpers.""" + +from randomize_evolve.packet_switching.schedulers import ( + RoundRobinScheduler, + SwitchScheduler, +) + +__all__ = ["RoundRobinScheduler", "SwitchScheduler"] diff --git a/src/randomize_evolve/packet_switching/schedulers.py b/src/randomize_evolve/packet_switching/schedulers.py new file mode 100644 index 0000000..19d2819 --- /dev/null +++ b/src/randomize_evolve/packet_switching/schedulers.py @@ -0,0 +1,65 @@ +"""Scheduling strategies for the packet switching simulator.""" + +from dataclasses import dataclass +from typing import Dict, List, MutableMapping, Protocol, Sequence + + +class SwitchScheduler(Protocol): + """Protocol for algorithms that compute input-output matchings.""" + + def select_matches( + self, + requests: Dict[int, List[int]], + time_slot: int, + queue_lengths: Sequence[int], + ) -> MutableMapping[int, int]: + """Return a mapping of input index to output index for the current slot.""" + + +@dataclass +class RoundRobinScheduler: + """A simple round-robin scheduler with queue length awareness.""" + + num_inputs: int + num_outputs: int + + def __post_init__(self) -> None: + if self.num_inputs <= 0 or self.num_outputs <= 0: + raise ValueError("Switch dimensions must be positive") + self._output_priority = 0 + self._output_pointers = [0 for _ in range(self.num_outputs)] + + def select_matches( + self, + requests: Dict[int, List[int]], + time_slot: int, + queue_lengths: Sequence[int], + ) -> MutableMapping[int, int]: + matches: Dict[int, int] = {} + used_inputs = set() + outputs_in_order = [ + (self._output_priority + offset) % self.num_outputs + for offset in range(self.num_outputs) + ] + self._output_priority = (self._output_priority + 1) % self.num_outputs + + for output_idx in outputs_in_order: + candidates = requests.get(output_idx) + if not candidates: + continue + pointer = self._output_pointers[output_idx] + sorted_candidates = sorted( + candidates, + key=lambda idx: ( + -queue_lengths[idx] if idx < len(queue_lengths) else 0, + (idx - pointer) % self.num_inputs, + ), + ) + for input_idx in sorted_candidates: + if input_idx in used_inputs: + continue + matches[input_idx] = output_idx + used_inputs.add(input_idx) + self._output_pointers[output_idx] = (input_idx + 1) % self.num_inputs + break + return matches diff --git a/src/randomize_evolve/traffic/__init__.py b/src/randomize_evolve/traffic/__init__.py new file mode 100644 index 0000000..b66e2f2 --- /dev/null +++ b/src/randomize_evolve/traffic/__init__.py @@ -0,0 +1,26 @@ +"""Traffic generation utilities for packet switching simulations.""" + +from randomize_evolve.traffic.patterns import ( + TrafficPattern, + TrafficPatternConfig, + TrafficPatternType, + UniformPattern, + BurstyPattern, + HotspotPattern, + HeavyLoadPattern, + build_pattern, +) +from randomize_evolve.traffic.simulator import SimulationResult, SwitchTrafficSimulator + +__all__ = [ + "TrafficPattern", + "TrafficPatternConfig", + "TrafficPatternType", + "UniformPattern", + "BurstyPattern", + "HotspotPattern", + "HeavyLoadPattern", + "build_pattern", + "SimulationResult", + "SwitchTrafficSimulator", +] diff --git a/src/randomize_evolve/traffic/patterns.py b/src/randomize_evolve/traffic/patterns.py new file mode 100644 index 0000000..76d2064 --- /dev/null +++ b/src/randomize_evolve/traffic/patterns.py @@ -0,0 +1,168 @@ +"""Traffic pattern definitions for packet switching simulations.""" + +import math +from dataclasses import dataclass +from enum import Enum, auto +from typing import Dict, List, Protocol + + +class TrafficPatternType(Enum): + """Enumerates the built-in traffic patterns.""" + + UNIFORM = auto() + BURSTY = auto() + HOTSPOT = auto() + HEAVY_LOAD = auto() + + +class TrafficPattern(Protocol): + """Protocol implemented by traffic generators.""" + + def sample( + self, + rng, + time_slot: int, + num_inputs: int, + num_outputs: int, + ) -> List[List[int]]: + """Return the list of destination outputs for each input in a time slot.""" + + +@dataclass +class TrafficPatternConfig: + """Configuration shared by the pattern helpers.""" + + pattern_type: TrafficPatternType + offered_load: float = 0.5 + burst_rate: float = 4.0 + burst_length: int = 8 + burst_probability: float = 0.05 + hotspot_probability: float = 0.4 + hotspot_output: int | None = None + heavy_load: float = 0.95 + light_load: float = 0.4 + heavy_duration: int = 50 + light_duration: int = 50 + + +class _BasePattern: + def __init__(self, cfg: TrafficPatternConfig): + self.cfg = cfg + + @staticmethod + def _poisson_sample(rng, lam: float) -> int: + """Sample from a Poisson distribution using Knuth's algorithm.""" + + if lam <= 0: + return 0 + L = math.exp(-lam) + k = 0 + p = 1.0 + while p > L: + k += 1 + p *= rng.random() + return max(0, k - 1) + + +class UniformPattern(_BasePattern): + """Generates independent uniform traffic for each input.""" + + def sample(self, rng, time_slot: int, num_inputs: int, num_outputs: int) -> List[List[int]]: + destinations: List[List[int]] = [] + lam = self.cfg.offered_load + for _ in range(num_inputs): + packets = self._poisson_sample(rng, lam) + destinations.append([rng.randrange(num_outputs) for _ in range(packets)]) + return destinations + + +class BurstyPattern(_BasePattern): + """Models inputs that occasionally generate bursts of traffic.""" + + def __init__(self, cfg: TrafficPatternConfig): + super().__init__(cfg) + self._burst_counters: Dict[int, int] = {} + + def sample(self, rng, time_slot: int, num_inputs: int, num_outputs: int) -> List[List[int]]: + lam = self.cfg.offered_load + burst_rate = max(1, int(round(self.cfg.burst_rate))) + result: List[List[int]] = [] + for i in range(num_inputs): + remaining = self._burst_counters.get(i, 0) + outputs: List[int] = [] + if remaining > 0: + outputs = [rng.randrange(num_outputs) for _ in range(burst_rate)] + self._burst_counters[i] = remaining - 1 + else: + if rng.random() < self.cfg.burst_probability: + self._burst_counters[i] = max(0, self.cfg.burst_length - 1) + outputs = [rng.randrange(num_outputs) for _ in range(burst_rate)] + else: + packets = self._poisson_sample(rng, lam) + outputs = [rng.randrange(num_outputs) for _ in range(packets)] + result.append(outputs) + return result + + +class HotspotPattern(_BasePattern): + """Generates traffic with a preferred hotspot output.""" + + def __init__(self, cfg: TrafficPatternConfig): + super().__init__(cfg) + self._hotspot_output = cfg.hotspot_output + + def sample(self, rng, time_slot: int, num_inputs: int, num_outputs: int) -> List[List[int]]: + if self._hotspot_output is None: + self._hotspot_output = rng.randrange(num_outputs) + lam = self.cfg.offered_load + hotspot = self._hotspot_output + bias = min(max(self.cfg.hotspot_probability, 0.0), 1.0) + destinations: List[List[int]] = [] + for _ in range(num_inputs): + packets = self._poisson_sample(rng, lam) + outputs: List[int] = [] + for _ in range(packets): + if rng.random() < bias: + outputs.append(hotspot) + else: + outputs.append(rng.randrange(num_outputs)) + destinations.append(outputs) + return destinations + + +class HeavyLoadPattern(_BasePattern): + """Alternates between heavy and light load phases.""" + + def __init__(self, cfg: TrafficPatternConfig): + super().__init__(cfg) + self._phase_duration = cfg.heavy_duration + self._light_duration = cfg.light_duration + self._cycle_length = max(1, self._phase_duration + self._light_duration) + + def sample(self, rng, time_slot: int, num_inputs: int, num_outputs: int) -> List[List[int]]: + phase_index = time_slot % self._cycle_length + in_heavy_phase = phase_index < self._phase_duration + lam = self.cfg.heavy_load if in_heavy_phase else self.cfg.light_load + destinations: List[List[int]] = [] + for _ in range(num_inputs): + packets = self._poisson_sample(rng, lam) + destinations.append([rng.randrange(num_outputs) for _ in range(packets)]) + return destinations + + +_PATTERN_MAP: Dict[TrafficPatternType, type[_BasePattern]] = { + TrafficPatternType.UNIFORM: UniformPattern, + TrafficPatternType.BURSTY: BurstyPattern, + TrafficPatternType.HOTSPOT: HotspotPattern, + TrafficPatternType.HEAVY_LOAD: HeavyLoadPattern, +} + + +def build_pattern(cfg: TrafficPatternConfig) -> TrafficPattern: + """Instantiate a traffic pattern implementation from configuration.""" + + try: + pattern_cls = _PATTERN_MAP[cfg.pattern_type] + except KeyError as exc: # pragma: no cover - defensive guard + raise ValueError(f"Unsupported traffic pattern: {cfg.pattern_type}") from exc + return pattern_cls(cfg) diff --git a/src/randomize_evolve/traffic/simulator.py b/src/randomize_evolve/traffic/simulator.py new file mode 100644 index 0000000..29dee9f --- /dev/null +++ b/src/randomize_evolve/traffic/simulator.py @@ -0,0 +1,163 @@ +"""Core traffic simulator for packet switching evaluations.""" + +from collections import Counter, deque +from dataclasses import dataclass +from random import Random +from typing import Deque, Dict, Iterable, List, Optional, Sequence, Tuple + +from randomize_evolve.packet_switching import SwitchScheduler +from randomize_evolve.traffic.patterns import TrafficPattern + + +@dataclass +class SimulationResult: + """Aggregated metrics collected from a simulation run.""" + + throughput: float + fairness_inputs: float + fairness_flows: float + utilization: float + drop_rate: float + average_queue: float + total_generated: int + total_served: int + total_dropped: int + + +class SwitchTrafficSimulator: + """Discrete-time simulator for an input-queued packet switch.""" + + def __init__( + self, + pattern: TrafficPattern, + *, + num_inputs: int, + num_outputs: int, + time_slots: int = 2000, + warmup_slots: int = 200, + queue_limit: Optional[int] = None, + seed: int = 0, + ) -> None: + if num_inputs <= 0 or num_outputs <= 0: + raise ValueError("The switch must have at least one input and output port.") + if time_slots <= 0: + raise ValueError("time_slots must be positive") + if warmup_slots < 0: + raise ValueError("warmup_slots must be non-negative") + self.pattern = pattern + self.num_inputs = num_inputs + self.num_outputs = num_outputs + self.time_slots = time_slots + self.warmup_slots = warmup_slots + self.queue_limit = queue_limit + self.seed = seed + + def run(self, scheduler: SwitchScheduler) -> SimulationResult: + """Execute the simulation for the configured number of time slots.""" + + rng = Random(self.seed) + queues: List[Deque[Tuple[int, bool]]] = [deque() for _ in range(self.num_inputs)] + total_generated = 0 + total_served = 0 + total_dropped = 0 + per_input_served = [0 for _ in range(self.num_inputs)] + per_input_generated = [0 for _ in range(self.num_inputs)] + flow_generated: Counter[tuple[int, int]] = Counter() + flow_served: Counter[tuple[int, int]] = Counter() + queue_length_sum = 0 + measured_slots = 0 + + for slot in range(self.warmup_slots + self.time_slots): + arrivals = self.pattern.sample(rng, slot, self.num_inputs, self.num_outputs) + if len(arrivals) != self.num_inputs: + raise ValueError("Traffic pattern produced invalid arrival vector") + + # Step 1: enqueue arrivals + for input_idx, destinations in enumerate(arrivals): + for output_idx in destinations: + if not 0 <= output_idx < self.num_outputs: + continue # ignore malformed destinations + queue = queues[input_idx] + if self.queue_limit is not None and len(queue) >= self.queue_limit: + if slot >= self.warmup_slots: + total_dropped += 1 + continue + active = slot >= self.warmup_slots + queue.append((output_idx, active)) + if slot >= self.warmup_slots: + total_generated += 1 + per_input_generated[input_idx] += 1 + flow_generated[(input_idx, output_idx)] += 1 + + # Step 2: compute requests for scheduling + requests: Dict[int, List[int]] = {} + for input_idx, queue in enumerate(queues): + if not queue: + continue + output_idx, _ = queue[0] + requests.setdefault(output_idx, []).append(input_idx) + + queue_lengths_snapshot = [len(queue) for queue in queues] + + try: + matches = scheduler.select_matches(requests, slot, queue_lengths_snapshot) + except Exception as exc: # pragma: no cover - defensive guard + raise RuntimeError("Scheduler failed to compute a matching") from exc + + used_outputs = set() + for input_idx, output_idx in matches.items(): + if not 0 <= input_idx < self.num_inputs: + continue + if not 0 <= output_idx < self.num_outputs: + continue + if output_idx in used_outputs: + continue + queue = queues[input_idx] + if not queue: + continue + head_output, is_active = queue[0] + if head_output != output_idx: + continue + queue.popleft() + used_outputs.add(output_idx) + if slot >= self.warmup_slots and is_active: + total_served += 1 + per_input_served[input_idx] += 1 + flow_served[(input_idx, output_idx)] += 1 + + if slot >= self.warmup_slots: + queue_length_sum += sum(queue_lengths_snapshot) + measured_slots += 1 + + throughput = total_served / total_generated if total_generated else 0.0 + utilization = total_served / (self.time_slots * self.num_outputs) + drop_rate = total_dropped / total_generated if total_generated else 0.0 + average_queue = queue_length_sum / measured_slots if measured_slots else 0.0 + + active_input_values = [per_input_served[i] for i, generated in enumerate(per_input_generated) if generated > 0] + fairness_inputs = self._jain_index(active_input_values) + flow_values = [flow_served[flow] for flow in flow_generated] + fairness_flows = self._jain_index(flow_values) + + return SimulationResult( + throughput=throughput, + fairness_inputs=fairness_inputs, + fairness_flows=fairness_flows, + utilization=utilization, + drop_rate=drop_rate, + average_queue=average_queue, + total_generated=total_generated, + total_served=total_served, + total_dropped=total_dropped, + ) + + @staticmethod + def _jain_index(values: Iterable[int]) -> float: + vector = [float(v) for v in values] + if not vector: + return 1.0 + numerator = sum(vector) ** 2 + denominator = len(vector) * sum(v * v for v in vector) + if denominator == 0: + return 1.0 if numerator > 0 else 0.0 + return numerator / denominator diff --git a/test_distributions.py b/test_distributions.py index 93576d5..31c35ff 100644 --- a/test_distributions.py +++ b/test_distributions.py @@ -1,76 +1,56 @@ from loguru import logger +import pytest -from evaluator import DEFAULT_CONFIG, Distribution, Evaluator, EvaluatorConfig +from evaluator import DEFAULT_CONFIG, Distribution, Evaluator from initial_program import candidate_factory -def test_distribution(name: str, config: EvaluatorConfig) -> None: - """Test a single distribution.""" - delimiter = "=" * 60 - logger.info("\n{}", delimiter) - logger.info("Testing: {}", name) - logger.info("{}", delimiter) - +SCENARIOS = [ + ( + "UNIFORM - Random across keyspace", + DEFAULT_CONFIG.model_copy(update={"distribution": Distribution.UNIFORM}), + ), + ( + "CLUSTERED - 10 clusters, radius 1000", + DEFAULT_CONFIG.model_copy( + update={ + "distribution": Distribution.CLUSTERED, + "num_clusters": 10, + "cluster_radius": 1000, + } + ), + ), + ( + "SEQUENTIAL - Contiguous ID range", + DEFAULT_CONFIG.model_copy(update={"distribution": Distribution.SEQUENTIAL}), + ), + ( + "POWER LAW - Zipf with exponent 1.5", + DEFAULT_CONFIG.model_copy( + update={ + "distribution": Distribution.POWER_LAW, + "power_law_exponent": 1.5, + } + ), + ), + ( + "POWER LAW - Zipf with exponent 2.5 (more skewed)", + DEFAULT_CONFIG.model_copy( + update={ + "distribution": Distribution.POWER_LAW, + "power_law_exponent": 2.5, + } + ), + ), +] + + +@pytest.mark.parametrize(("name", "config"), SCENARIOS) +def test_distribution(name, config): evaluator = Evaluator(config) result = evaluator(candidate_factory) - logger.info("Success: {}", result.success) - logger.info("False Positive Rate: {:.4%}", result.false_positive_rate) - logger.info("False Negative Rate: {:.4%}", result.false_negative_rate) - logger.info("Mean Memory: {:,.0f} bytes", result.mean_peak_memory_bytes) - logger.info("Mean Build Time: {:.2f} ms", result.mean_build_time_ms) - logger.info("Mean Query Time: {:.2f} ms", result.mean_query_time_ms) - logger.info("Score: {:.2f}", result.score) - - -def main(): - """Run tests on all distribution types.""" - delimiter = "=" * 60 - logger.info("\n{}", delimiter) - logger.info("DISTRIBUTION COMPARISON TEST") - logger.info("Testing baseline program across different data patterns") - logger.info("{}", delimiter) - - # Test 1: Uniform Random (default) - uniform_config = DEFAULT_CONFIG.model_copy(update={"distribution": Distribution.UNIFORM}) - test_distribution("UNIFORM - Random across keyspace", uniform_config) - - # Test 2: Clustered - clustered_config = DEFAULT_CONFIG.model_copy( - update={ - "distribution": Distribution.CLUSTERED, - "num_clusters": 10, - "cluster_radius": 1000, - } + logger.info( + "Evaluated %s -> throughput score %.4f, fp_rate %.4f", name, result.score, result.false_positive_rate ) - test_distribution("CLUSTERED - 10 clusters, radius 1000", clustered_config) - - # Test 3: Sequential IDs - sequential_config = DEFAULT_CONFIG.model_copy(update={"distribution": Distribution.SEQUENTIAL}) - test_distribution("SEQUENTIAL - Contiguous ID range", sequential_config) - - # Test 4: Power-Law (Zipf) - power_law_config = DEFAULT_CONFIG.model_copy( - update={ - "distribution": Distribution.POWER_LAW, - "power_law_exponent": 1.5, - } - ) - test_distribution("POWER LAW - Zipf with exponent 1.5", power_law_config) - - # Test 5: Power-Law with higher skew - power_law_high_config = DEFAULT_CONFIG.model_copy( - update={ - "distribution": Distribution.POWER_LAW, - "power_law_exponent": 2.5, - } - ) - test_distribution("POWER LAW - Zipf with exponent 2.5 (more skewed)", power_law_high_config) - - logger.info("\n{}", delimiter) - logger.info("TEST COMPLETE") - logger.info("{}", delimiter) - - -if __name__ == "__main__": - main() + assert result.success diff --git a/tests/test_packet_switching.py b/tests/test_packet_switching.py new file mode 100644 index 0000000..bda0c64 --- /dev/null +++ b/tests/test_packet_switching.py @@ -0,0 +1,97 @@ +import math +from typing import Dict, List, MutableMapping, Sequence + +import pytest + +from randomize_evolve.packet_switching import RoundRobinScheduler +from randomize_evolve.traffic import ( + SwitchTrafficSimulator, + TrafficPatternConfig, + TrafficPatternType, + build_pattern, +) +from randomize_evolve.evaluators.packet_switching import ( + PacketSwitchingEvaluator, + PacketSwitchingEvaluatorConfig, + default_scenarios, +) + + +class GreedyScheduler: + """Scheduler that greedily serves the lowest-numbered input.""" + + def select_matches( + self, + requests: Dict[int, List[int]], + time_slot: int, + queue_lengths: Sequence[int], + ) -> MutableMapping[int, int]: + matches: Dict[int, int] = {} + used_outputs = set() + for output_idx, inputs in requests.items(): + if output_idx in used_outputs: + continue + matches[min(inputs)] = output_idx + used_outputs.add(output_idx) + return matches + + +def test_round_robin_handles_uniform_load(): + pattern = build_pattern( + TrafficPatternConfig( + pattern_type=TrafficPatternType.UNIFORM, + offered_load=0.5, + ) + ) + simulator = SwitchTrafficSimulator( + pattern, + num_inputs=4, + num_outputs=4, + time_slots=600, + warmup_slots=100, + seed=11, + ) + result = simulator.run(RoundRobinScheduler(4, 4)) + + assert result.total_generated > 0 + assert result.throughput > 0.9 + assert result.fairness_inputs > 0.95 + assert math.isclose(result.drop_rate, 0.0) + + +def test_simulator_flags_unfair_scheduler(): + pattern = build_pattern( + TrafficPatternConfig( + pattern_type=TrafficPatternType.HOTSPOT, + offered_load=0.8, + hotspot_probability=0.7, + ) + ) + simulator = SwitchTrafficSimulator( + pattern, + num_inputs=4, + num_outputs=4, + time_slots=600, + warmup_slots=100, + seed=9, + ) + result = simulator.run(GreedyScheduler()) + + assert result.fairness_inputs < 0.7 + assert result.throughput < 0.8 + + +@pytest.mark.parametrize("ports", [4]) +def test_packet_switching_evaluator_runs(ports: int): + scenarios = default_scenarios()[:2] + for scenario in scenarios: + scenario.time_slots = 500 + scenario.warmup_slots = 100 + config = PacketSwitchingEvaluatorConfig(ports=ports, scenarios=scenarios, seed=12) + evaluator = PacketSwitchingEvaluator(config) + result = evaluator(lambda p: RoundRobinScheduler(p, p)) + + assert result.success + assert len(result.scenario_results) == len(scenarios) + assert result.score >= 0.0 + assert any(r.metrics.throughput < 0.95 for r in result.scenario_results)