diff --git a/.dockerignore b/.dockerignore index 55da80f8..d10199c4 100644 --- a/.dockerignore +++ b/.dockerignore @@ -40,3 +40,26 @@ __pycache__/ # Docker docker-compose*.yml +# Large untracked directories +FTXUI/ +c-kzg-4844/ +data/ +genesis/ +genesis_64/ +genesis_128/ +genesis_256/ +hash-sig/ +hash-sig-cli/ +lean-quickstart/ +leanMultisig/ +leanSpec/ +leanp2p/ +qdrvm-crates/ +ream/ +shadow.yaml +sszpp/ +zeam/ +db/ +simulation_run_64/ +output/ +data/ diff --git a/.gitignore b/.gitignore index c68c7c1c..6f6e50e0 100644 --- a/.gitignore +++ b/.gitignore @@ -53,3 +53,5 @@ /.vcpkg /.build* /vcpkg_installed + +# Generated simulation genesis (run-real.py / run-shadow.py --generate-genesis) diff --git a/Dockerfile.shadow b/Dockerfile.shadow new file mode 100644 index 00000000..f27aa492 --- /dev/null +++ b/Dockerfile.shadow @@ -0,0 +1,93 @@ +# Build qlean with Shadow support and package with Shadow simulator +# Prerequisites: +# make docker_build_dependencies (one-time, builds vcpkg deps) +# Build: +# docker build -f Dockerfile.shadow -t qlean-shadow . +# +# Build context must be the qlean-mini repo root. + +# Stage 0: pre-built vcpkg dependencies +FROM qlean-mini-dependencies:latest AS dependencies + +# Stage 1: Build qlean for linux/arm64 +FROM ubuntu:24.04 AS builder + +ARG DEBIAN_FRONTEND=noninteractive +ARG QLEAN_ENABLE_SHADOW=ON +ENV DEBIAN_FRONTEND=${DEBIAN_FRONTEND} +ENV VCPKG_FORCE_SYSTEM_BINARIES=1 + +ENV PROJECT=/qlean-mini +ENV VENV=${PROJECT}/.venv +ENV BUILD=${PROJECT}/.build +ENV VCPKG_ROOT=${PROJECT}/.vcpkg +ENV PATH=${VENV}/bin:/root/.cargo/bin:${PATH} +ENV CARGO_HOME=/root/.cargo +ENV RUSTUP_HOME=/root/.rustup + +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + build-essential \ + cmake \ + ninja-build \ + git \ + curl \ + ca-certificates \ + pkg-config \ + python3 \ + python3-venv \ + libstdc++6 \ + zip \ + unzip && \ + rm -rf /var/lib/apt/lists/* + +WORKDIR ${PROJECT} + +# Copy pre-built dependencies from deps image +COPY --from=dependencies ${VENV} ${VENV} +COPY --from=dependencies ${PROJECT}/.vcpkg ${PROJECT}/.vcpkg +COPY --from=dependencies ${BUILD}/vcpkg_installed ${BUILD}/vcpkg_installed +COPY --from=dependencies /root/.cargo /root/.cargo +COPY --from=dependencies /root/.rustup /root/.rustup + +# Copy project source code +COPY . ${PROJECT} + +# Configure with shadow support, using pre-built vcpkg deps +RUN VCPKG_ROOT=${PROJECT}/.vcpkg cmake -G Ninja --preset default \ + -DPython3_EXECUTABLE="${VENV}/bin/python3" \ + -DQLEAN_ENABLE_SHADOW=${QLEAN_ENABLE_SHADOW} \ + -DTESTING=OFF \ + -DVCPKG_INSTALLED_DIR=${BUILD}/vcpkg_installed \ + -DVCPKG_MANIFEST_MODE=OFF \ + -DCMAKE_BUILD_TYPE=Release \ + -B ${BUILD} \ + ${PROJECT} + +# Build only the qlean binary (deps already compiled) +RUN cmake --build ${BUILD} --parallel $(nproc) && \ + mkdir -p /opt/artifacts/out/bin && \ + cp ${BUILD}/out/bin/qlean /opt/artifacts/out/bin/qlean && \ + strip /opt/artifacts/out/bin/qlean + +# Stage 2: Combine with Shadow +FROM kamilsa/shadow-arm:latest + +# Install python3 for bandwidth tier loading +RUN apt-get update && \ + apt-get install -y --no-install-recommends python3 && \ + rm -rf /var/lib/apt/lists/* + +# Copy qlean binary +COPY --from=builder /opt/artifacts/out/bin/qlean /opt/qlean/bin/qlean + +# Copy entrypoint +COPY scripts/entrypoint.sh /usr/local/bin/ +RUN chmod +x /usr/local/bin/entrypoint.sh + +# Default directories +RUN mkdir -p /genesis /data /output +WORKDIR /output + +ENTRYPOINT ["/usr/local/bin/entrypoint.sh"] +CMD ["/genesis"] diff --git a/docker-compose.shadow.yml b/docker-compose.shadow.yml new file mode 100644 index 00000000..63fba397 --- /dev/null +++ b/docker-compose.shadow.yml @@ -0,0 +1,20 @@ +services: + qlean-shadow: + build: + context: .. + dockerfile: Dockerfile.shadow + platform: linux/arm64 + security_opt: + - seccomp=unconfined + shm_size: "4g" + environment: + - STOP_TIME=${STOP_TIME:-120s} + - UDP_PORT_BASE=${UDP_PORT_BASE:-10000} + - METRICS_PORT_BASE=${METRICS_PORT_BASE:-9100} + - MAX_BOOTNODES=${MAX_BOOTNODES:-25} + volumes: + - ${GENESIS_DIR:-./genesis_shadow}:/genesis:ro + - ${OUTPUT_DIR:-./output}:/output + - ${DATA_DIR:-./data}:/data + - ${TOPOLOGY_DIR:-./topology}:/topology:ro + command: ${GENESIS_DIR:-/genesis} diff --git a/example/0-single/genesis/config.yaml b/example/0-single/genesis/config.yaml index 92e0125a..e6ce7cb4 100644 --- a/example/0-single/genesis/config.yaml +++ b/example/0-single/genesis/config.yaml @@ -1,8 +1,8 @@ # Genesis Settings # Replace with desired genesis Unix timestamp (seconds since epoch), e.g., current time plus 20 seconds: -# future_time=$(( $(date +%s) + 20 )) && sed -i '' "s/GENESIS_TIME: .*/GENESIS_TIME: $future_time/" config.yaml -GENESIS_TIME: 1767225600 +# future_time=$(( $(date +%s) + 20 )) && sed -i '' "s/GENESIS_TIME: 1779112987 +GENESIS_TIME: 1779112987 # Key Settings ACTIVE_EPOCH: 18 diff --git a/scripts/analyze_propagation.py b/scripts/analyze_propagation.py new file mode 100644 index 00000000..09d1fbd8 --- /dev/null +++ b/scripts/analyze_propagation.py @@ -0,0 +1,286 @@ +#!/usr/bin/env python3 +"""Analyze attestation propagation time from Shadow simulation data. + +Computes p99 time-to-95%-coverage for attestations across the network. + +Usage: + python3 analyze_propagation.py [--round N] + +Outputs: + - Per-round stats (mean, median, p95, p99 coverage times) + - p99 across all (node, slot) pairs +""" + +import argparse +import json +import re +import sys +from collections import defaultdict +from pathlib import Path + + +def parse_shadow_timestamp(shadow_ts: str) -> float: + """Parse Shadow timestamp like '00.01.01 00:00:03.800841' to seconds.""" + # Format: YY.MM.DD HH:MM:SS.ffffff (Shadow's format) + # Return seconds from simulation start + m = re.match(r'\d+\.\d+\.\d+\s+(\d+):(\d+):(\d+)\.(\d+)', shadow_ts) + if m: + h, m_val, s, us = m.groups() + return int(h) * 3600 + int(m_val) * 60 + int(s) + int(us) / 1_000_000 + return 0.0 + + +def parse_attestation_event(line: str) -> dict | None: + """Parse a RECEIVE-ATTESTATION line from Shadow stdout. + + Format: + ["LEAN-INTEROP-TEST", 946684803800, "RECEIVE-ATTESTATION", + [validator_id, [source_slot, target_slot, slot, committee_index, "block_hash"]]] + + Returns dict with: sim_time_s, validator_id, slot, block_hash, node_name + """ + # Extract JSON-like array from the line + m = re.search(r'\["LEAN-INTEROP-TEST",\s*(\d+),\s*"RECEIVE-ATTESTATION",\s*\[(\d+),\s*\[([^\]]+)\]', line) + if not m: + return None + + ts_ms = int(m.group(1)) + validator_id = int(m.group(2)) + inner = m.group(3).strip() + + # Parse inner array: source_slot, target_slot, head_slot, slot, "block_hash" + parts = [p.strip().strip('"') for p in inner.split(',')] + if len(parts) < 5: + return None + + slot = int(parts[3]) # The slot being attested to + block_hash = parts[4] + + return { + 'ts_ms': ts_ms, + 'validator_id': validator_id, + 'slot': slot, + 'block_hash': block_hash, + } + + +def analyze(data_dir: str, target_round: int | None = None): + """Main analysis function.""" + data_path = Path(data_dir) + if not data_path.is_dir(): + print(f"ERROR: {data_dir} is not a directory") + sys.exit(1) + + # Collect all events: node_name -> [(ts_ms, validator_id, slot, block_hash)] + node_events: dict[str, list[dict]] = defaultdict(list) + + # Host directories in shadow.data/hosts/ + hosts_dir = data_path / "hosts" + if not hosts_dir.is_dir(): + # Try shadow.data directly + hosts_dir = data_path / "shadow.data" / "hosts" + if not hosts_dir.is_dir(): + print(f"ERROR: hosts directory not found under {data_dir}") + sys.exit(1) + + node_count = 0 + for host_dir in sorted(hosts_dir.iterdir()): + if not host_dir.is_dir(): + continue + node_name = host_dir.name + node_count += 1 + + # Read all stdout files for this host + for stdout_file in sorted(host_dir.glob("*.stdout")): + with open(stdout_file) as f: + for line in f: + if "RECEIVE-ATTESTATION" in line: + evt = parse_attestation_event(line) + if evt: + evt['node'] = node_name + node_events[node_name].append(evt) + + print(f"Nodes: {node_count}") + total_events = sum(len(v) for v in node_events.values()) + print(f"Total RECEIVE-ATTESTATION events: {total_events}") + + if total_events == 0: + print("WARNING: No RECEIVE-ATTESTATION events found. Falling back to PUBLISH-ATTESTATION.") + # Fall back to PUBLISH-ATTESTATION as a proxy + for host_dir in sorted(hosts_dir.iterdir()): + if not host_dir.is_dir(): + continue + node_name = host_dir.name + for stdout_file in sorted(host_dir.glob("*.stdout")): + with open(stdout_file) as f: + for line in f: + if "PUBLISH-ATTESTATION" in line: + evt = parse_attestation_event(line.replace("PUBLISH-ATTESTATION", "RECEIVE-ATTESTATION")) + if evt: + evt['node'] = node_name + node_events[node_name].append(evt) + total_events = sum(len(v) for v in node_events.values()) + print(f"Total PUBLISH-ATTESTATION events (fallback): {total_events}") + + if total_events == 0: + print("ERROR: No attestation events found at all") + sys.exit(1) + + # Group events by slot + slot_publishers: dict[int, set[int]] = defaultdict(set) + for events in node_events.values(): + for evt in events: + slot_publishers[evt['slot']].add(evt['validator_id']) + + print(f"Slots with attestations: {len(slot_publishers)}") + if slot_publishers: + slots = sorted(slot_publishers.keys()) + print(f"Slot range: {slots[0]} - {slots[-1]}") + for s in slots[:5]: + print(f" Slot {s}: {len(slot_publishers[s])} unique publishers") + + # Determine genesis_time for time offset calculation + genesis_time_ms = None + for events in node_events.values(): + for evt in events: + if genesis_time_ms is None: + genesis_time_ms = evt['ts_ms'] + genesis_time_ms = min(genesis_time_ms, evt['ts_ms']) + + # Round genesis_time_ms down to nearest second boundary + if genesis_time_ms: + genesis_time_ms = (genesis_time_ms // 1_000_000) * 1_000_000 + print(f"Estimated genesis_time_ms: {genesis_time_ms}") + + # Per-slot analysis + coverage_times: list[float] = [] # All (node, slot) coverage times in ms + slot_stats: dict[int, dict] = {} # Per-slot aggregate stats + + for slot in sorted(slot_publishers.keys()): + publishers = slot_publishers[slot] + n_publishers = len(publishers) + if n_publishers < 3: + continue # Skip slots with too few publishers + + # For each receiving node, compute time to 95% coverage + node_coverage: dict[str, float] = {} + + for node_name, events in node_events.items(): + # Get receive times for attestations from each publisher in this slot + received: dict[int, float] = {} # publisher -> earliest receive time (ms) + for evt in events: + if evt['slot'] != slot: + continue + publisher = evt['validator_id'] + ts_offset = evt['ts_ms'] - genesis_time_ms if genesis_time_ms else evt['ts_ms'] + if publisher not in received or ts_offset < received[publisher]: + received[publisher] = ts_offset + + if len(received) == 0: + continue + + # Sort by receive time + sorted_times = sorted(received.values()) + # Time to 95% coverage: index at ceil(0.95 * n_publishers) - 1 + threshold_idx = min(len(sorted_times) - 1, + int(0.95 * n_publishers) - 1) + if threshold_idx < 0: + threshold_idx = 0 + + # Time relative to first receive for this node+slot + first_receive = sorted_times[0] + coverage_time = sorted_times[threshold_idx] - first_receive + node_coverage[node_name] = coverage_time + + if node_coverage: + times = list(node_coverage.values()) + coverage_times.extend(times) + + times_sorted = sorted(times) + n = len(times_sorted) + slot_stats[slot] = { + 'n_publishers': n_publishers, + 'n_nodes': n, + 'mean_ms': sum(times) / n, + 'median_ms': times_sorted[n // 2], + 'p95_ms': times_sorted[int(0.95 * n)] if n > 1 else times_sorted[0], + 'p99_ms': times_sorted[int(0.99 * n)] if n > 1 else times_sorted[0], + } + + # Filter rounds/slots if requested + if target_round is not None: + filtered_times = [] + for slot in sorted(slot_publishers.keys()): + if slot == target_round: + publishers = slot_publishers[slot] + n_publishers = len(publishers) + if n_publishers < 3: + continue + for node_name, events in node_events.items(): + received = {} + for evt in events: + if evt['slot'] != slot: + continue + publisher = evt['validator_id'] + ts_offset = evt['ts_ms'] - genesis_time_ms if genesis_time_ms else evt['ts_ms'] + if publisher not in received or ts_offset < received[publisher]: + received[publisher] = ts_offset + if len(received) == 0: + continue + sorted_times = sorted(received.values()) + threshold_idx = min(len(sorted_times) - 1, + int(0.95 * n_publishers) - 1) + if threshold_idx < 0: + threshold_idx = 0 + first_receive = sorted_times[0] + coverage_time = sorted_times[threshold_idx] - first_receive + filtered_times.append(coverage_time) + if filtered_times: + print(f"\nTarget round {target_round}: {len(filtered_times)} measurements") + fs = sorted(filtered_times) + n = len(fs) + print(f" Mean: {sum(fs)/n:.1f} ms") + print(f" P50: {fs[n//2]:.1f} ms") + print(f" P95: {fs[int(0.95*n)]:.1f} ms") + print(f" P99: {fs[int(0.99*n)]:.1f} ms") + else: + print(f"No data for target round {target_round}") + + # Print per-slot stats + print(f"\nPer-slot stats ({len(slot_stats)} slots):") + for slot in sorted(slot_stats.keys())[:10]: + s = slot_stats[slot] + print(f" Slot {slot:3d}: n={s['n_nodes']:3d} nodes, " + f"{s['n_publishers']:3d} pubs, " + f"mean={s['mean_ms']:7.1f}ms, " + f"median={s['median_ms']:7.1f}ms, " + f"p95={s['p95_ms']:7.1f}ms, " + f"p99={s['p99_ms']:7.1f}ms") + + if len(slot_stats) > 10: + print(f" ... and {len(slot_stats) - 10} more slots") + + # Aggregate p99 across all (node, slot) pairs + if coverage_times: + sorted_all = sorted(coverage_times) + n_all = len(sorted_all) + p99_idx = int(0.99 * n_all) + p99_idx = min(p99_idx, n_all - 1) + + print(f"\n{'='*60}") + print(f"AGGREGATE P99 PROPAGATION TIME") + print(f"{'='*60}") + print(f"Total (node, slot) measurements: {n_all}") + print(f"P50: {sorted_all[n_all//2]:.1f} ms") + print(f"P95: {sorted_all[int(0.95*n_all)]:.1f} ms") + print(f"P99: {sorted_all[p99_idx]:.1f} ms") + print(f"Max: {sorted_all[-1]:.1f} ms") + print(f"Mean: {sum(sorted_all)/n_all:.1f} ms") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Analyze attestation propagation time") + parser.add_argument("data_dir", help="Shadow data directory (containing hosts/)") + parser.add_argument("--round", type=int, help="Focus on a specific round/slot") + args = parser.parse_args() + analyze(args.data_dir, args.round) diff --git a/scripts/build-qlean.py b/scripts/build-qlean.py new file mode 100755 index 00000000..ec57ade7 --- /dev/null +++ b/scripts/build-qlean.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 +"""Rebuild the qlean binary inside the Shadow Docker image. + +Usage: + uv run scripts/build-qlean.py +""" + +import subprocess +import sys +from pathlib import Path + +PROJECT = Path(__file__).resolve().parent.parent +SIM_DOCKERFILE = PROJECT / "simulation" / "Dockerfile" +IMAGE = "qlean-shadow:latest" +DEPS_IMAGE = "qlean-mini-dependencies:latest" + + +def die(msg: str) -> None: + print(f"ERROR: {msg}", file=sys.stderr) + sys.exit(1) + + +def image_exists(name: str) -> bool: + result = subprocess.run( + ["docker", "images", "--format", "{{.Repository}}:{{.Tag}}", name], + capture_output=True, text=True, + ) + return name in result.stdout + + +def main() -> None: + if not SIM_DOCKERFILE.exists(): + die(f"Dockerfile not found: {SIM_DOCKERFILE}") + + if not image_exists(DEPS_IMAGE): + die( + f"Dependencies image '{DEPS_IMAGE}' not found.\n" + f"Build it first: make docker_build_dependencies" + ) + + print(f"==> Building {IMAGE}...") + subprocess.run( + ["docker", "build", "-f", str(SIM_DOCKERFILE), "-t", IMAGE, str(PROJECT)], + check=True, + ) + print(f"==> Done: {IMAGE}") + + +if __name__ == "__main__": + main() diff --git a/scripts/entrypoint.sh b/scripts/entrypoint.sh new file mode 100755 index 00000000..de62dce3 --- /dev/null +++ b/scripts/entrypoint.sh @@ -0,0 +1,182 @@ +#!/bin/bash +set -euo pipefail + +QLEAN_BIN="${QLEAN_BIN:-/opt/qlean/bin/qlean}" +GENESIS_DIR="${1:-/genesis}" +OUTPUT_DIR="${OUTPUT_DIR:-/output}" +STOP_TIME="${STOP_TIME:-120s}" +TOPOLOGY_DIR="${TOPOLOGY_DIR:-/topology}" +UDP_PORT_BASE="${UDP_PORT_BASE:-10000}" +METRICS_PORT_BASE="${METRICS_PORT_BASE:-9100}" +MAX_BOOTNODES="${MAX_BOOTNODES:-25}" + +echo "==> Qlean Shadow Simulation" +echo "==> Genesis dir: ${GENESIS_DIR}" +echo "==> Output dir: ${OUTPUT_DIR}" +echo "==> Topology dir: ${TOPOLOGY_DIR}" +echo "==> Stop time: ${STOP_TIME}" +echo "==> Max bootnodes: ${MAX_BOOTNODES}" +echo "==> Qlean bin: ${QLEAN_BIN}" + +mkdir -p "${OUTPUT_DIR}" + +# Verify genesis directory +if [ ! -d "${GENESIS_DIR}" ]; then + echo "ERROR: Genesis directory not found: ${GENESIS_DIR}" + exit 1 +fi + +CONFIG_YAML="${GENESIS_DIR}/config.yaml" +NODES_YAML="${GENESIS_DIR}/nodes.yaml" +VALIDATORS_YAML="${GENESIS_DIR}/validators.yaml" + +for f in "${CONFIG_YAML}" "${NODES_YAML}" "${VALIDATORS_YAML}"; do + if [ ! -f "$f" ]; then + echo "ERROR: Required file missing: $f" + exit 1 + fi +done + +# Count nodes from node key files +NODE_COUNT=$(ls -1 "${GENESIS_DIR}"/node_*.key 2>/dev/null | wc -l) +if [ "$NODE_COUNT" -eq 0 ]; then + echo "ERROR: No node_*.key files found in ${GENESIS_DIR}" + exit 1 +fi +echo "==> Node count: ${NODE_COUNT}" + +# Genesis time (Shadow uses simulated time) +GENESIS_TIME=$(grep 'GENESIS_TIME:' "${CONFIG_YAML}" | awk -F': ' '{print $2}') +echo "==> GENESIS_TIME: ${GENESIS_TIME}" + +# Load bandwidth tiers if available +declare -A BANDWIDTHS +if [ -f "${TOPOLOGY_DIR}/bandwidths.json" ]; then + echo "==> Loading bandwidth tiers from ${TOPOLOGY_DIR}/bandwidths.json" + while IFS="=" read -r key value; do + BANDWIDTHS["$key"]="$value" + done < <(python3 -c " +import json +with open('${TOPOLOGY_DIR}/bandwidths.json') as f: + bw = json.load(f) +for k, v in bw.items(): + print(f'{k}={v}') +") +fi + +# Check for GML topology +GML_FILE="${TOPOLOGY_DIR}/topology.gml" +USE_GML=false +if [ -f "${GML_FILE}" ]; then + USE_GML=true + echo "==> Using GML topology: ${GML_FILE}" +else + echo "==> No GML file found, using 1_gbit_switch" +fi + +# Generate shadow.yaml +SHADOW_YAML="${OUTPUT_DIR}/shadow.yaml" +echo "==> Generating ${SHADOW_YAML}..." + +if $USE_GML; then + cat > "${SHADOW_YAML}" << YAMLHEAD +general: + stop_time: ${STOP_TIME} + model_unblocked_syscall_latency: true +experimental: + native_preemption_enabled: true +network: + graph: + type: gml + file: + path: ${GML_FILE} + use_shortest_path: true +hosts: +YAMLHEAD +else + cat > "${SHADOW_YAML}" << YAMLHEAD +general: + stop_time: ${STOP_TIME} + model_unblocked_syscall_latency: true +experimental: + native_preemption_enabled: true +network: + graph: + type: 1_gbit_switch +hosts: +YAMLHEAD +fi + +for ((i=0; i> "${SHADOW_YAML}" << YAMLHOST + ${SHADOW_HOST}: + network_node_id: ${NET_NODE_ID} + ip_addr: 10.0.0.${IP_LAST} + bandwidth_up: "${BW}" + bandwidth_down: "${BW}" + processes: + - path: ${QLEAN_BIN} + args: "${ARGS_ESCAPED}" + expected_final_state: running + +YAMLHOST +done + +echo "==> Shadow config written (${NODE_COUNT} hosts)" + +# Clean old shadow data +rm -rf "${OUTPUT_DIR}/shadow.data" + +echo "==> Starting Shadow simulation..." +cd "${OUTPUT_DIR}" +shadow --progress true ${SHADOW_FLAGS:-} "${SHADOW_YAML}" + +echo "==> Simulation complete. Data in ${OUTPUT_DIR}/shadow.data/" diff --git a/scripts/gen_topology.py b/scripts/gen_topology.py new file mode 100644 index 00000000..d4dab134 --- /dev/null +++ b/scripts/gen_topology.py @@ -0,0 +1,209 @@ +#!/usr/bin/env python3 +"""Generate Shadow topology with geo latencies and bandwidth tiers. + +Usage: + python3 gen_topology.py [--seed SEED] + +Produces: + - topology.gml: graph with per-edge latencies + - bandwidths.json: {"node_0": "50 Mbit", ...} bandwidth assignments +""" + +import argparse +import json +import math +import random +import sys +from pathlib import Path + +# Simplified latency model: 6 regions with approximate RTTs (ms) +REGIONS = { + "us-east": {"us-east": 20, "us-west": 60, "europe": 80, "asia": 150, "sa": 120, "africa": 180}, + "us-west": {"us-east": 60, "us-west": 20, "europe": 130, "asia": 110, "sa": 160, "africa": 200}, + "europe": {"us-east": 80, "us-west": 130, "europe": 15, "asia": 100, "sa": 170, "africa": 80}, + "asia": {"us-east": 150, "us-west": 110, "europe": 100, "asia": 20, "sa": 250, "africa": 160}, + "sa": {"us-east": 120, "us-west": 160, "europe": 170, "asia": 250, "sa": 25, "africa": 220}, + "africa": {"us-east": 180, "us-west": 200, "europe": 80, "asia": 160, "sa": 220, "africa": 30}, +} + +# Node distribution weights (roughly proportional to validator distribution) +REGION_WEIGHTS = { + "us-east": 0.30, + "us-west": 0.15, + "europe": 0.25, + "asia": 0.20, + "sa": 0.05, + "africa": 0.05, +} + +SUPERNODE_FRACTION = 0.05 # 5% of nodes get 1Gbps +SUPERNODE_BANDWIDTH = "1 Gbit" +NORMAL_BANDWIDTH = "50 Mbit" +JITTER_RATIO = 0.3 # ±30% jitter on latency +LINK_PACKET_LOSS = 0.0 + + +def assign_regions(node_count: int, rng: random.Random) -> dict[str, str]: + """Assign each node to a weighted random region.""" + regions = list(REGION_WEIGHTS.keys()) + weights = [REGION_WEIGHTS[r] for r in regions] + # Normalize weights + total = sum(weights) + probs = [w / total for w in weights] + + assignments = {} + for i in range(node_count): + assignments[f"node_{i}"] = rng.choices(regions, weights=probs, k=1)[0] + return assignments + + +def bandwidth_tiers(node_count: int, rng: random.Random) -> dict[str, str]: + """Assign bandwidth tiers: 5% supernodes, 95% normal.""" + n_super = max(1, int(node_count * SUPERNODE_FRACTION)) + indices = list(range(node_count)) + rng.shuffle(indices) + super_indices = set(indices[:n_super]) + + tiers = {} + for i in range(node_count): + tiers[f"node_{i}"] = SUPERNODE_BANDWIDTH if i in super_indices else NORMAL_BANDWIDTH + return tiers + + +def generate_gml(node_count: int, + region_assignments: dict[str, str], + bandwidths: dict[str, str], + rng: random.Random) -> str: + """Generate a GML graph with full-mesh geo latencies. + + Uses a central switch topology with per-host latencies to the switch. + The switch itself has 10 Gbit bandwidth, hosts get their tier bandwidth. + """ + lines = ["graph [", " directed 0"] + + # Calculate combined bandwidth capacity for switch + n_super = sum(1 for b in bandwidths.values() if "Gbit" in b) + n_normal = node_count - n_super + + # Host nodes + for i in range(node_count): + name = f"node_{i}" + region = region_assignments[name] + bw = bandwidths[name] + lines.append(f" node [") + lines.append(f" id {i}") + lines.append(f' host_bandwidth_up "{bw}"') + lines.append(f' host_bandwidth_down "{bw}"') + lines.append(f' label "{name}"') + lines.append(f' region "{region}"') + lines.append(f" ]") + + # Central switch + switch_id = node_count + lines.append(f" node [") + lines.append(f" id {switch_id}") + lines.append(f' host_bandwidth_up "10 Gbit"') + lines.append(f' host_bandwidth_down "10 Gbit"') + lines.append(f' label "switch"') + lines.append(f" ]") + + # Self-loop for each host (host ↔ host on same machine won't exist, + # but Shadow needs them for localhost-equivalent routing) + for i in range(node_count): + lines.append(f" edge [") + lines.append(f" source {i}") + lines.append(f" target {i}") + lines.append(f' latency "1 ms"') + lines.append(f" packet_loss {LINK_PACKET_LOSS}") + lines.append(f" ]") + + # Switch self-loop + lines.append(f" edge [") + lines.append(f" source {switch_id}") + lines.append(f" target {switch_id}") + lines.append(f' latency "1 ms"') + lines.append(f" packet_loss {LINK_PACKET_LOSS}") + lines.append(f" ]") + + # Edges from each host to central switch with geo latency + for i in range(node_count): + name = f"node_{i}" + region = region_assignments[name] + # Use intra-region latency as base for host→switch link + # The switch is "in the cloud", so use the host's self-latency as base + base_ms = REGIONS[region][region] * 0.5 + # Add jitter + jitter = base_ms * JITTER_RATIO * (rng.random() * 2 - 1) + latency = max(0.5, base_ms + jitter) + + lines.append(f" edge [") + lines.append(f" source {i}") + lines.append(f" target {switch_id}") + lines.append(f' latency "{max(1, round(latency))} ms"') + lines.append(f" packet_loss {LINK_PACKET_LOSS}") + lines.append(f" ]") + + # Full mesh edges between hosts (for peer-to-peer latency) + # Only generate edges where latency is geo-dependent + for i in range(node_count): + name_i = f"node_{i}" + region_i = region_assignments[name_i] + for j in range(i + 1, node_count): + name_j = f"node_{j}" + region_j = region_assignments[name_j] + + base_ms = REGIONS[region_i].get(region_j, 200) + jitter = base_ms * JITTER_RATIO * (rng.random() * 2 - 1) + latency = max(0.5, base_ms + jitter) + + lines.append(f" edge [") + lines.append(f" source {i}") + lines.append(f" target {j}") + lines.append(f' latency "{max(1, round(latency))} ms"') + lines.append(f" packet_loss {LINK_PACKET_LOSS}") + lines.append(f" ]") + + lines.append("]") + return "\n".join(lines) + + +def main(): + parser = argparse.ArgumentParser(description="Generate Shadow topology with geo latencies") + parser.add_argument("node_count", type=int, help="Number of nodes") + parser.add_argument("output_dir", type=str, help="Output directory") + parser.add_argument("--seed", type=int, default=42, help="Random seed") + args = parser.parse_args() + + rng = random.Random(args.seed) + out = Path(args.output_dir) + out.mkdir(parents=True, exist_ok=True) + + regions = assign_regions(args.node_count, rng) + bandwidths = bandwidth_tiers(args.node_count, rng) + + # Write topology.gml + gml = generate_gml(args.node_count, regions, bandwidths, rng) + gml_path = out / "topology.gml" + gml_path.write_text(gml) + print(f"Wrote {gml_path} ({args.node_count} nodes, {gml_path.stat().st_size} bytes)") + + # Write bandwidths.json + bw_path = out / "bandwidths.json" + bw_path.write_text(json.dumps(bandwidths, indent=2)) + print(f"Wrote {bw_path}") + + # Write region assignments for reference + region_path = out / "regions.json" + region_path.write_text(json.dumps(regions, indent=2)) + print(f"Wrote {region_path}") + + # Print summary + n_super = sum(1 for b in bandwidths.values() if "Gbit" in b) + region_counts = {} + for r in regions.values(): + region_counts[r] = region_counts.get(r, 0) + 1 + print(f"Summary: {args.node_count} nodes, {n_super} supernodes, regions={region_counts}") + + +if __name__ == "__main__": + main() diff --git a/scripts/run-real.py b/scripts/run-real.py new file mode 100755 index 00000000..1899dd32 --- /dev/null +++ b/scripts/run-real.py @@ -0,0 +1,313 @@ +#!/usr/bin/env python3 +"""Run a Shadow simulation for qlean with real signatures and aggregation. + +Uses the qlean binary built with QLEAN_ENABLE_SHADOW=OFF so the real XMSS +provider is used for signing, verifying, aggregating, and verifying aggregated +signatures. The run still happens inside the Shadow-arm Docker container. + +Usage: + uv run scripts/run-real.py # default: 64 nodes, 180s + uv run scripts/run-real.py 4 # 4 nodes + uv run scripts/run-real.py 128 300 # 128 nodes, 300s stop time + uv run scripts/run-real.py 64 --generate-genesis # generate real genesis + uv run scripts/run-real.py 64 --genesis-dir /path/to/genesis +""" + +import argparse +import os +import re +import shutil +import subprocess +import sys +import tempfile +from pathlib import Path + +PROJECT = Path(__file__).resolve().parent.parent +IMAGE = "qlean-real:latest" +STOP_TIME = "180s" +UDP_PORT_BASE = 10000 +METRICS_PORT_BASE = 9100 +SHM_SIZE = "4g" + +MAX_BOOTNODES = 50 +SUBNET_COUNT = 1 + + +def die(msg: str) -> None: + print(f"ERROR: {msg}", file=sys.stderr) + sys.exit(1) + + +def docker_image_exists(name: str) -> bool: + result = subprocess.run( + ["docker", "images", "--format", "{{.Repository}}:{{.Tag}}", name], + capture_output=True, text=True + ) + return name in result.stdout + + +def build_image() -> None: + print("==> Building Docker image (this may take a few minutes)...") + subprocess.run( + ["docker", "build", "-f", str(PROJECT / "Dockerfile.shadow"), + "--build-arg", "QLEAN_ENABLE_SHADOW=OFF", + "-t", IMAGE, str(PROJECT)], + check=True + ) + + +def _run_qlean(genesis_dir: Path, n: int, subnet_count: int, shadow: bool) -> None: + """Run qlean generate-genesis inside the Docker image.""" + genesis_dir.mkdir(parents=True, exist_ok=True) + cmd = [ + "docker", "run", "--rm", + "--platform", "linux/arm64", + "--entrypoint", "/opt/qlean/bin/qlean", + "-v", f"{genesis_dir}:/genesis", + IMAGE, + "generate-genesis", "/genesis", str(n), str(subnet_count), + ] + if shadow: + cmd.append("shadow") + print(f" genesis-cmd: {' '.join(cmd)}") + subprocess.run(cmd, check=True) + + +def _read_pubkeys_from_manifest(manifest: Path) -> list[str]: + """Read pubkey_hex values (with 0x prefix) from validator-keys-manifest.yaml. + + Returns list in manifest order (index 0, 1, 2, ...). + """ + text = manifest.read_text() + pubkeys = re.findall(r'pubkey_hex:\s*(0x[0-9a-fA-F]+)', text) + if not pubkeys: + die(f"No pubkey_hex entries found in {manifest}") + return pubkeys + + +def _has_real_keys(genesis_dir: Path, n: int) -> bool: + """Check if genesis dir already has real (non-zero) XMSS key files.""" + keys_dir = genesis_dir / "hash-sig-keys" + manifest = keys_dir / "validator-keys-manifest.yaml" + if not manifest.exists(): + return False + pubkeys = _read_pubkeys_from_manifest(manifest) + # Real pubkeys are non-zero + if any(k == "0x" + "00" * 52 for k in pubkeys): + return False + # All secret key files must exist + for i in range(n): + if not (keys_dir / f"validator_{i}_sk.ssz").exists(): + return False + return True + + +def _patch_file_pubkeys(path: Path, real_pubkeys: list[str], + strip_0x: bool = False) -> None: + """Replace all pubkey_hex values in a file with real pubkeys, in order.""" + if not path.exists(): + die(f"File not found: {path}") + text = path.read_text() + keys = [k[2:] if strip_0x else k for k in real_pubkeys] + it = iter(keys) + + def _repl(m: re.Match) -> str: + return f"pubkey_hex: {next(it)}" + + text = re.sub(r'pubkey_hex:\s*(0x[0-9a-fA-F]+)', _repl, text) + path.write_text(text) + + +def _patch_genesis_validators(path: Path, real_pubkeys: list[str]) -> None: + """Replace GENESIS_VALIDATORS hex entries (no 0x prefix) with real pubkeys.""" + if not path.exists(): + die(f"File not found: {path}") + text = path.read_text() + lines = text.splitlines() + keys = iter(k[2:] for k in real_pubkeys) # strip 0x prefix + new_lines: list[str] = [] + for line in lines: + if line.strip().startswith("- "): + new_lines.append(f" - {next(keys)}") + else: + new_lines.append(line) + path.write_text("\n".join(new_lines) + "\n") + + +def generate_genesis(n: int, subnet_count: int, genesis_dir: Path) -> None: + """Generate a genesis directory with real XMSS keys and shadow-compatible ENRs. + + Uses two passes of the qlean generate-genesis command: + 1. With 'shadow' flag -> correct shadow ENRs/IPs but fake zero pubkeys. + 2. Without 'shadow' flag -> real XMSS key files but 127.0.0.1 IPs. + Then merges: real key files + real pubkeys into the shadow-config genesis. + """ + reuse_keys = genesis_dir.exists() and _has_real_keys(genesis_dir, n) + if reuse_keys: + print(f"==> Reusing existing keys from {genesis_dir}") + else: + print(f"==> Generating real genesis for {n} nodes (subnet_count={subnet_count})...") + + with tempfile.TemporaryDirectory(prefix="qlean-real-shadow-") as tmp_cfg_dir_str: + tmp_cfg = Path(tmp_cfg_dir_str) + + # Pass 1: generate genesis with shadow flag -> correct ENRs, fake pubkeys + print(" [1/2] Generating shadow-config genesis (ENRs/IPs)...") + _run_qlean(tmp_cfg, n, subnet_count, shadow=True) + + if reuse_keys: + # Copy existing keys into the fresh shadow config + print(" [2/2] Using existing key files...") + src_keys = genesis_dir / "hash-sig-keys" + dst_keys = tmp_cfg / "hash-sig-keys" + shutil.copytree(src_keys, dst_keys, dirs_exist_ok=True) + else: + # Pass 2: generate genesis without shadow flag -> real key files + print(" [2/2] Generating real key files...") + with tempfile.TemporaryDirectory(prefix="qlean-real-keys-") as tmp_keys_dir_str: + tmp_keys = Path(tmp_keys_dir_str) + _run_qlean(tmp_keys, n, subnet_count, shadow=False) + + # Copy real key files (.ssz) into shadow genesis + src_keys = tmp_keys / "hash-sig-keys" + dst_keys = tmp_cfg / "hash-sig-keys" + dst_keys.mkdir(parents=True, exist_ok=True) + + for f in src_keys.iterdir(): + if f.suffix in (".ssz", ".json"): + shutil.copy2(f, dst_keys / f.name) + + # Copy the real manifest (has real pubkeys) + real_manifest = src_keys / "validator-keys-manifest.yaml" + if real_manifest.exists(): + shutil.copy2(real_manifest, dst_keys / "validator-keys-manifest.yaml") + + # Read real pubkeys from the manifest + manifest = dst_keys / "validator-keys-manifest.yaml" + if not manifest.exists(): + die(f"validator-keys-manifest.yaml not found in {dst_keys}") + real_pubkeys = _read_pubkeys_from_manifest(manifest) + + # Copy merged genesis to target directory + if genesis_dir.exists(): + shutil.rmtree(genesis_dir) + shutil.copytree(tmp_cfg, genesis_dir) + + # Patch pubkeys in config files (replace zero pubkeys with real ones) + _patch_genesis_validators( + genesis_dir / "config.yaml", real_pubkeys) + _patch_file_pubkeys( + genesis_dir / "annotated_validators.yaml", real_pubkeys) + + print(f"==> Genesis ready at {genesis_dir}") + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Run qlean Shadow simulation with real signatures/aggregation") + parser.add_argument("nodes", nargs="?", type=int, default=64, + help="Number of nodes (default: 64)") + parser.add_argument("stop_time", nargs="?", default=STOP_TIME, + help=f"Simulation stop time (default: {STOP_TIME})") + parser.add_argument("--no-build", action="store_true", + help="Skip Docker image existence check and build") + parser.add_argument("--build", action="store_true", + help="Force rebuild Docker image") + parser.add_argument("--generate-genesis", action="store_true", + help="Generate genesis directory with real XMSS keys") + parser.add_argument("--generate-topology", action="store_true", + help="Generate GML topology with bandwidth and regions") + parser.add_argument("--genesis-dir", type=str, default=None, + help="Path to genesis directory (default: /tmp/qlean-simulations/-real)") + parser.add_argument("--subnet-count", type=int, default=SUBNET_COUNT, + help=f"Number of aggregator/subnet nodes (default: {SUBNET_COUNT})") + args = parser.parse_args() + + n = args.nodes + stop_time = args.stop_time if args.stop_time.endswith("s") else f"{args.stop_time}s" + max_bootnodes = MAX_BOOTNODES + subnet_count = args.subnet_count + + # Determine genesis directory + sim_dir = Path(f"/tmp/qlean-simulations/{n}-real") + if args.genesis_dir: + genesis_dir = Path(args.genesis_dir).resolve() + else: + genesis_dir = sim_dir + + topology_dir = sim_dir + output_dir = Path(f"/tmp/qlean-real-sim-{n}/output") + data_dir = Path(f"/tmp/qlean-real-sim-{n}/data") + + # Build or check image + if args.build or (not args.no_build and not docker_image_exists(IMAGE)): + build_image() + else: + print(f"==> Using existing image: {IMAGE}") + + # Generate genesis if requested + if args.generate_genesis: + generate_genesis(n, subnet_count, genesis_dir) + + # Generate topology if requested + if args.generate_topology: + print(f"==> Generating topology for {n} nodes...") + topology_dir.mkdir(parents=True, exist_ok=True) + subprocess.run([ + sys.executable, + str(PROJECT / "scripts" / "gen_topology.py"), + str(n), + str(topology_dir) + ], check=True) + + # Validate genesis directory + if not genesis_dir.is_dir(): + die( + f"Genesis directory not found: {genesis_dir}\n" + f" Use --generate-genesis to create it, or --genesis-dir to specify a path." + ) + if not topology_dir.is_dir(): + die(f"Topology directory not found: {topology_dir}") + + print(f"==> Nodes: {n}") + print(f"==> Stop time: {stop_time}") + print(f"==> Max-bootnodes: {max_bootnodes}") + print(f"==> Subnet count: {subnet_count}") + print(f"==> Genesis: {genesis_dir}") + print(f"==> Topology: {topology_dir}") + print(f"==> Output: {output_dir}") + + # Clean previous run data + for d in (output_dir, data_dir): + if d.exists(): + print(f"==> Cleaning previous {d.name}...") + shutil.rmtree(d) + d.mkdir(parents=True, exist_ok=True) + + # Run simulation + cmd = [ + "docker", "run", "--rm", + "--platform", "linux/arm64", + "--security-opt", "seccomp=unconfined", + "--shm-size", SHM_SIZE, + "-v", f"{genesis_dir}:/genesis:ro", + "-v", f"{topology_dir}:/topology:ro", + "-v", f"{output_dir}:/output", + "-v", f"{data_dir}:/data", + "-e", "TOPOLOGY_DIR=/topology", + "-e", f"STOP_TIME={stop_time}", + "-e", f"MAX_BOOTNODES={max_bootnodes}", + "-e", f"UDP_PORT_BASE={UDP_PORT_BASE}", + "-e", f"METRICS_PORT_BASE={METRICS_PORT_BASE}", + IMAGE, + "/genesis", + ] + + print(f"==> {' '.join(cmd)}") + print("=" * 60) + subprocess.run(cmd, check=True) + print(f"\n==> Done. Output in {output_dir}/shadow.data/") + + +if __name__ == "__main__": + main() diff --git a/scripts/run-shadow.py b/scripts/run-shadow.py new file mode 100755 index 00000000..95269a10 --- /dev/null +++ b/scripts/run-shadow.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python3 +"""Run a Shadow simulation for qlean with a single command. + +Usage: + uv run scripts/run-shadow.py # default: 64 nodes, 180s + uv run scripts/run-shadow.py 4 # 4 nodes + uv run scripts/run-shadow.py 128 300 # 128 nodes, 300s stop time + uv run scripts/run-shadow.py 64 --no-build # skip image check + uv run scripts/run-shadow.py 64 --generate-genesis --subnet-count 4 +""" + +import argparse +import os +import shutil +import subprocess +import sys +import tempfile +from pathlib import Path + +PROJECT = Path(__file__).resolve().parent.parent +IMAGE = "qlean-shadow:latest" +STOP_TIME = "180s" +UDP_PORT_BASE = 10000 +METRICS_PORT_BASE = 9100 +SHM_SIZE = "4g" + +MAX_BOOTNODES = 50 +SUBNET_COUNT = 1 + + +def die(msg: str) -> None: + print(f"ERROR: {msg}", file=sys.stderr) + sys.exit(1) + + +def docker_image_exists(name: str) -> bool: + result = subprocess.run( + ["docker", "images", "--format", "{{.Repository}}:{{.Tag}}", name], + capture_output=True, text=True + ) + return name in result.stdout + + +def build_image() -> None: + print("==> Building Docker image (this may take a few minutes)...") + subprocess.run( + ["docker", "build", "-f", str(PROJECT / "Dockerfile.shadow"), + "--build-arg", "QLEAN_ENABLE_SHADOW=ON", + "-t", IMAGE, str(PROJECT)], + check=True + ) + + +def generate_genesis(n: int, subnet_count: int, genesis_dir: Path) -> None: + """Generate a shadow genesis directory with the given subnet count.""" + print(f"==> Generating shadow genesis for {n} nodes (subnet_count={subnet_count})...") + + with tempfile.TemporaryDirectory(prefix="qlean-shadow-gen-") as tmp_dir_str: + tmp_dir = Path(tmp_dir_str) + subprocess.run([ + "docker", "run", "--rm", + "--platform", "linux/arm64", + "--entrypoint", "/opt/qlean/bin/qlean", + "-v", f"{tmp_dir}:/genesis", + IMAGE, + "generate-genesis", "/genesis", str(n), str(subnet_count), "shadow", + ], check=True) + + if genesis_dir.exists(): + shutil.rmtree(genesis_dir) + shutil.copytree(tmp_dir, genesis_dir) + + print(f"==> Genesis ready at {genesis_dir}") + + +def main() -> None: + parser = argparse.ArgumentParser(description="Run qlean Shadow simulation") + parser.add_argument("nodes", nargs="?", type=int, default=64, + help="Number of nodes (default: 64)") + parser.add_argument("stop_time", nargs="?", default=STOP_TIME, + help=f"Simulation stop time (default: {STOP_TIME})") + parser.add_argument("--no-build", action="store_true", + help="Skip Docker image existence check and build") + parser.add_argument("--build", action="store_true", + help="Force rebuild Docker image") + parser.add_argument("--generate-genesis", action="store_true", + help="Generate genesis directory") + parser.add_argument("--generate-topology", action="store_true", + help="Generate GML topology with bandwidth and regions") + parser.add_argument("--genesis-dir", type=str, default=None, + help="Path to genesis directory (default: /tmp/qlean-simulations/-fake)") + parser.add_argument("--subnet-count", type=int, default=SUBNET_COUNT, + help=f"Number of subnets/aggregators (default: {SUBNET_COUNT})") + args = parser.parse_args() + + n = args.nodes + stop_time = args.stop_time if args.stop_time.endswith("s") else f"{args.stop_time}s" + max_bootnodes = MAX_BOOTNODES + subnet_count = args.subnet_count + + # Determine genesis directory + sim_dir = Path(f"/tmp/qlean-simulations/{n}-fake") + if args.genesis_dir: + genesis_dir = Path(args.genesis_dir).resolve() + else: + genesis_dir = sim_dir + + topology_dir = sim_dir + output_dir = Path(f"/tmp/qlean-sim-{n}/output") + data_dir = Path(f"/tmp/qlean-sim-{n}/data") + + # Build image if needed + if args.build or (not args.no_build and not docker_image_exists(IMAGE)): + build_image() + else: + print(f"==> Using existing image: {IMAGE}") + + # Generate genesis if requested + if args.generate_genesis: + generate_genesis(n, subnet_count, genesis_dir) + + # Generate topology if requested + if args.generate_topology: + print(f"==> Generating topology for {n} nodes...") + topology_dir.mkdir(parents=True, exist_ok=True) + subprocess.run([ + sys.executable, + str(PROJECT / "scripts" / "gen_topology.py"), + str(n), + str(topology_dir) + ], check=True) + + if not genesis_dir.is_dir(): + die(f"Genesis directory not found: {genesis_dir}") + if not topology_dir.is_dir(): + die(f"Topology directory not found: {topology_dir}") + + print(f"==> Nodes: {n}") + print(f"==> Stop time: {stop_time}") + print(f"==> Max-bootnodes: {max_bootnodes}") + print(f"==> Subnet count: {subnet_count}") + print(f"==> Genesis: {genesis_dir}") + print(f"==> Topology: {topology_dir}") + print(f"==> Output: {output_dir}") + + # Clean previous run data (stale blocks break finalization) + for d in (output_dir, data_dir): + if d.exists(): + print(f"==> Cleaning previous {d.name}...") + shutil.rmtree(d) + d.mkdir(parents=True, exist_ok=True) + + # Run simulation + cmd = [ + "docker", "run", "--rm", + "--platform", "linux/arm64", + "--security-opt", "seccomp=unconfined", + "--shm-size", SHM_SIZE, + "-v", f"{genesis_dir}:/genesis:ro", + "-v", f"{topology_dir}:/topology:ro", + "-v", f"{output_dir}:/output", + "-v", f"{data_dir}:/data", + "-e", "TOPOLOGY_DIR=/topology", + "-e", f"STOP_TIME={stop_time}", + "-e", f"MAX_BOOTNODES={max_bootnodes}", + "-e", f"UDP_PORT_BASE={UDP_PORT_BASE}", + "-e", f"METRICS_PORT_BASE={METRICS_PORT_BASE}", + IMAGE, + "/genesis", + ] + + print(f"==> {' '.join(cmd)}") + print("=" * 60) + subprocess.run(cmd, check=True) + print(f"\n==> Done. Output in {output_dir}/shadow.data/") + + +if __name__ == "__main__": + main() diff --git a/scripts/stats-real.py b/scripts/stats-real.py new file mode 100755 index 00000000..a8ccd5b1 --- /dev/null +++ b/scripts/stats-real.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python3 +"""Print attestation propagation stats from a Real Shadow simulation run. + +Usage: + uv run scripts/stats-real.py # default: 64 nodes, all slots + uv run scripts/stats-real.py 4 # 4-node run + uv run scripts/stats-real.py 64 5 # 64 nodes, focus on slot 5 + uv run scripts/stats-real.py 64 --all # show all per-slot P99 values +""" + +import argparse +import re +import sys +from collections import defaultdict +from pathlib import Path + + +def parse_events(data_dir: str): + """Parse RECEIVE-ATTESTATION events from shadow stdout files.""" + hosts_dir = Path(data_dir) / "hosts" + if not hosts_dir.is_dir(): + hosts_dir = Path(data_dir) / "shadow.data" / "hosts" + if not hosts_dir.is_dir(): + print(f"ERROR: hosts directory not found under {data_dir}", file=sys.stderr) + sys.exit(1) + + node_events: dict[str, list[dict]] = defaultdict(list) + for host_dir in sorted(hosts_dir.iterdir()): + if not host_dir.is_dir(): + continue + for stdout_file in sorted(host_dir.glob("*.stdout")): + with open(stdout_file) as f: + for line in f: + if "RECEIVE-ATTESTATION" in line: + evt = _parse_line(line) + if evt: + evt["node"] = host_dir.name + node_events[host_dir.name].append(evt) + + total = sum(len(v) for v in node_events.values()) + if total == 0: + print("ERROR: No RECEIVE-ATTESTATION events found", file=sys.stderr) + sys.exit(1) + return node_events + + +def _parse_line(line: str) -> dict | None: + m = re.search( + r'\["LEAN-INTEROP-TEST",\s*(\d+),\s*"RECEIVE-ATTESTATION",\s*\[(\d+),\s*\[([^\]]+)\]', + line, + ) + if not m: + return None + parts = [p.strip().strip('"') for p in m.group(3).split(",")] + if len(parts) < 5: + return None + return { + "ts_ms": int(m.group(1)), + "validator_id": int(m.group(2)), + "slot": int(parts[3]), + "block_hash": parts[4], + } + + +def compute_p99(node_events, genesis_ms, target_slot=None): + """Compute per-slot P99 time-to-95%-coverage across all nodes.""" + slot_publishers: dict[int, set[int]] = defaultdict(set) + for events in node_events.values(): + for evt in events: + slot_publishers[evt["slot"]].add(evt["validator_id"]) + + rows = [] + for slot in sorted(slot_publishers.keys()): + if target_slot is not None and slot != target_slot: + continue + + pubs = slot_publishers[slot] + n_publishers = len(pubs) + if n_publishers < 3: + continue + + node_times = [] + for node, events in node_events.items(): + received = {} + for evt in events: + if evt["slot"] != slot: + continue + offset = evt["ts_ms"] - genesis_ms + if evt["validator_id"] not in received or offset < received[evt["validator_id"]]: + received[evt["validator_id"]] = offset + + if len(received) < 3: + continue + + times = sorted(received.values()) + idx = min(len(times) - 1, int(0.95 * n_publishers) - 1) + node_times.append(times[idx] - times[0]) + + if node_times: + s = sorted(node_times) + n = len(s) + rows.append((slot, n, n_publishers, s[n // 2], s[int(0.95 * n)], s[int(0.99 * n)])) + return rows + + +def main(): + parser = argparse.ArgumentParser(description="Print attestation propagation stats for real simulation") + parser.add_argument("nodes", nargs="?", type=int, default=64, + help="Number of nodes (default: 64)") + parser.add_argument("slot", nargs="?", type=int, default=None, + help="Focus on a single slot (default: show summary)") + parser.add_argument("--all", action="store_true", + help="Show all per-slot P99 values") + args = parser.parse_args() + + data_dir = f"/tmp/qlean-real-sim-{args.nodes}/output/shadow.data" + node_events = parse_events(data_dir) + + # Estimate genesis time (round to nearest second) + genesis_ms = None + for events in node_events.values(): + for evt in events: + if genesis_ms is None or evt["ts_ms"] < genesis_ms: + genesis_ms = evt["ts_ms"] + genesis_ms = (genesis_ms // 1_000_000) * 1_000_000 + + nodes = len(node_events) + total = sum(len(v) for v in node_events.values()) + slots = sorted({e["slot"] for events in node_events.values() for e in events}) + print(f"Nodes: {nodes} Events: {total} Slots: {slots[0]}-{slots[-1]} ({len(slots)} slots)") + print() + + rows = compute_p99(node_events, genesis_ms, target_slot=args.slot) + + if args.slot is not None: + for slot, n, pubs, p50, p95, p99 in rows: + print(f"Slot {slot}: {n} nodes, {pubs} publishers") + print(f" P50: {p50:7.0f} ms") + print(f" P95: {p95:7.0f} ms") + print(f" P99: {p99:7.0f} ms") + return + + if args.all: + print(f"{'Slot':>5} {'Nodes':>5} {'Pubs':>5} {'P50(ms)':>8} {'P95(ms)':>8} {'P99(ms)':>8}") + print("-" * 47) + for slot, n, pubs, p50, p95, p99 in rows: + print(f"{slot:5d} {n:5d} {pubs:5d} {p50:8.0f} {p95:8.0f} {p99:8.0f}") + return + + # Summary: aggregate P99 across all (node, slot) pairs + all_times = [] + for events in node_events.values(): + for evt in events: + all_times.append(evt["ts_ms"] - genesis_ms) + all_times.sort() + n_all = len(all_times) + + print(f"Per-slot P99 time-to-95%-coverage ({len(rows)} slots):") + for slot, n, pubs, p50, p95, p99 in rows: + print(f" Slot {slot:3d}: P99={p99:6.0f} ms ({n} nodes, {pubs} pubs)") + + # Collect all per-node-per-slot P99 times for aggregate + all_p99 = [] + for _, _, _, _, _, p99 in rows: + all_p99.append(p99) + all_p99.sort() + n = len(all_p99) + print(f"\nAggregate P99 across all slots: {all_p99[min(int(0.99 * n), n - 1)]:.0f} ms") + + +if __name__ == "__main__": + main() diff --git a/scripts/stats-shadow.py b/scripts/stats-shadow.py new file mode 100755 index 00000000..5976f1a2 --- /dev/null +++ b/scripts/stats-shadow.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python3 +"""Print attestation propagation stats from a Shadow simulation run. + +Usage: + uv run scripts/stats-shadow.py # default: 64 nodes, all slots + uv run scripts/stats-shadow.py 4 # 4-node run + uv run scripts/stats-shadow.py 64 5 # 64 nodes, focus on slot 5 + uv run scripts/stats-shadow.py 64 --all # show all per-slot P99 values +""" + +import argparse +import re +import sys +from collections import defaultdict +from pathlib import Path + + +def parse_events(data_dir: str): + """Parse RECEIVE-ATTESTATION events from shadow stdout files.""" + hosts_dir = Path(data_dir) / "hosts" + if not hosts_dir.is_dir(): + hosts_dir = Path(data_dir) / "shadow.data" / "hosts" + if not hosts_dir.is_dir(): + print(f"ERROR: hosts directory not found under {data_dir}", file=sys.stderr) + sys.exit(1) + + node_events: dict[str, list[dict]] = defaultdict(list) + for host_dir in sorted(hosts_dir.iterdir()): + if not host_dir.is_dir(): + continue + for stdout_file in sorted(host_dir.glob("*.stdout")): + with open(stdout_file) as f: + for line in f: + if "RECEIVE-ATTESTATION" in line: + evt = _parse_line(line) + if evt: + evt["node"] = host_dir.name + node_events[host_dir.name].append(evt) + + total = sum(len(v) for v in node_events.values()) + if total == 0: + print("ERROR: No RECEIVE-ATTESTATION events found", file=sys.stderr) + sys.exit(1) + return node_events + + +def _parse_line(line: str) -> dict | None: + m = re.search( + r'\["LEAN-INTEROP-TEST",\s*(\d+),\s*"RECEIVE-ATTESTATION",\s*\[(\d+),\s*\[([^\]]+)\]', + line, + ) + if not m: + return None + parts = [p.strip().strip('"') for p in m.group(3).split(",")] + if len(parts) < 5: + return None + return { + "ts_ms": int(m.group(1)), + "validator_id": int(m.group(2)), + "slot": int(parts[3]), + "block_hash": parts[4], + } + + +def compute_p99(node_events, genesis_ms, target_slot=None): + """Compute per-slot P99 time-to-95%-coverage across all nodes.""" + slot_publishers: dict[int, set[int]] = defaultdict(set) + for events in node_events.values(): + for evt in events: + slot_publishers[evt["slot"]].add(evt["validator_id"]) + + rows = [] + for slot in sorted(slot_publishers.keys()): + if target_slot is not None and slot != target_slot: + continue + + pubs = slot_publishers[slot] + n_publishers = len(pubs) + if n_publishers < 3: + continue + + node_times = [] + for node, events in node_events.items(): + received = {} + for evt in events: + if evt["slot"] != slot: + continue + offset = evt["ts_ms"] - genesis_ms + if evt["validator_id"] not in received or offset < received[evt["validator_id"]]: + received[evt["validator_id"]] = offset + + if len(received) < 3: + continue + + times = sorted(received.values()) + idx = min(len(times) - 1, int(0.95 * n_publishers) - 1) + node_times.append(times[idx] - times[0]) + + if node_times: + s = sorted(node_times) + n = len(s) + rows.append((slot, n, n_publishers, s[n // 2], s[int(0.95 * n)], s[int(0.99 * n)])) + return rows + + +def main(): + parser = argparse.ArgumentParser(description="Print attestation propagation stats") + parser.add_argument("nodes", nargs="?", type=int, default=64, + help="Number of nodes (default: 64)") + parser.add_argument("slot", nargs="?", type=int, default=None, + help="Focus on a single slot (default: show summary)") + parser.add_argument("--all", action="store_true", + help="Show all per-slot P99 values") + args = parser.parse_args() + + data_dir = f"/tmp/qlean-sim-{args.nodes}/output/shadow.data" + node_events = parse_events(data_dir) + + # Estimate genesis time (round to nearest second) + genesis_ms = None + for events in node_events.values(): + for evt in events: + if genesis_ms is None or evt["ts_ms"] < genesis_ms: + genesis_ms = evt["ts_ms"] + genesis_ms = (genesis_ms // 1_000_000) * 1_000_000 + + nodes = len(node_events) + total = sum(len(v) for v in node_events.values()) + slots = sorted({e["slot"] for events in node_events.values() for e in events}) + print(f"Nodes: {nodes} Events: {total} Slots: {slots[0]}-{slots[-1]} ({len(slots)} slots)") + print() + + rows = compute_p99(node_events, genesis_ms, target_slot=args.slot) + + if args.slot is not None: + for slot, n, pubs, p50, p95, p99 in rows: + print(f"Slot {slot}: {n} nodes, {pubs} publishers") + print(f" P50: {p50:7.0f} ms") + print(f" P95: {p95:7.0f} ms") + print(f" P99: {p99:7.0f} ms") + return + + if args.all: + print(f"{'Slot':>5} {'Nodes':>5} {'Pubs':>5} {'P50(ms)':>8} {'P95(ms)':>8} {'P99(ms)':>8}") + print("-" * 47) + for slot, n, pubs, p50, p95, p99 in rows: + print(f"{slot:5d} {n:5d} {pubs:5d} {p50:8.0f} {p95:8.0f} {p99:8.0f}") + return + + # Summary: aggregate P99 across all (node, slot) pairs + all_times = [] + for events in node_events.values(): + for evt in events: + all_times.append(evt["ts_ms"] - genesis_ms) + all_times.sort() + n_all = len(all_times) + + print(f"Per-slot P99 time-to-95%-coverage ({len(rows)} slots):") + for slot, n, pubs, p50, p95, p99 in rows: + print(f" Slot {slot:3d}: P99={p99:6.0f} ms ({n} nodes, {pubs} pubs)") + + # Collect all per-node-per-slot P99 times for aggregate + all_p99 = [] + for _, _, _, _, _, p99 in rows: + all_p99.append(p99) + all_p99.sort() + n = len(all_p99) + print(f"\nAggregate P99 across all slots: {all_p99[min(int(0.99 * n), n - 1)]:.0f} ms") + + +if __name__ == "__main__": + main() diff --git a/src/app/impl/validator_keys_manifest_impl.cpp b/src/app/impl/validator_keys_manifest_impl.cpp index 80999156..c94e7fb0 100644 --- a/src/app/impl/validator_keys_manifest_impl.cpp +++ b/src/app/impl/validator_keys_manifest_impl.cpp @@ -24,9 +24,12 @@ namespace lean::app { auto yaml = yaml::read(config.genesisDir() / "annotated_validators.yaml"); auto yaml_items = yaml.map(config.nodeId()); for (auto &&yaml_item : yaml_items.list()) { - auto yaml_pubkey_hex = yaml_item.map("pubkey_hex"); + auto yaml_pubkey_hex = yaml_item.map("pubkey_hex").str(); + if (yaml_pubkey_hex.starts_with("0x")) { + yaml_pubkey_hex = yaml_pubkey_hex.substr(2); + } auto public_key = - crypto::xmss::XmssPublicKey::fromHex(yaml_pubkey_hex.str()).value(); + crypto::xmss::XmssPublicKey::fromHex(yaml_pubkey_hex).value(); auto privkey_file = yaml_item.map("privkey_file").str(); crypto::xmss::XmssKeypair keypair; if constexpr (QLEAN_ENABLE_SHADOW) { diff --git a/src/commands/generate_genesis.hpp b/src/commands/generate_genesis.hpp index a516aeef..56871527 100644 --- a/src/commands/generate_genesis.hpp +++ b/src/commands/generate_genesis.hpp @@ -159,6 +159,18 @@ inline int cmdGenerateGenesis(auto &&getArg) { } }); + build_yaml(genesis_directory / "annotated_validators.yaml", + [&](YAML::Node &yaml) { + for (auto &peer : peers) { + YAML::Node entry; + entry["index"] = peer.index; + entry["pubkey_hex"] = + "0x" + xmss_public_keys.at(peer.index).toHex(); + entry["privkey_file"] = xmss_private_key_name(peer.index); + yaml[node_id(peer.index)].push_back(entry); + } + }); + build_yaml(genesis_directory / "validator-config.yaml", [&](YAML::Node &yaml) { yaml["shuffle"] = "roundrobin"; diff --git a/src/modules/networking/networking.cpp b/src/modules/networking/networking.cpp index 347f166f..a98f9042 100644 --- a/src/modules/networking/networking.cpp +++ b/src/modules/networking/networking.cpp @@ -572,6 +572,11 @@ namespace lean::modules { peer_id.has_value() ? peer_id->toBase58() : "unknown", signed_attestation.validator_id); + SL_INFO(self->logger_, + "{}", + leanInteropTestLog("RECEIVE-ATTESTATION", + leanInteropTest(signed_attestation))); + auto &head = signed_attestation.data.head; if (not self->block_tree_->has(head.root)) { if (head.slot <= self->block_tree_->lastFinalized().slot) { diff --git a/src/modules/production/read_config_yaml.hpp b/src/modules/production/read_config_yaml.hpp index 90be20af..e2302515 100644 --- a/src/modules/production/read_config_yaml.hpp +++ b/src/modules/production/read_config_yaml.hpp @@ -51,23 +51,34 @@ namespace lean { for (auto &&[i, yaml_validator] : std::views::zip( std::views::iota(size_t{0}, yaml_genesis_validators.size()), yaml_genesis_validators)) { - if (not yaml_validator.IsMap()) { + crypto::xmss::XmssPublicKey attestation_pubkey; + crypto::xmss::XmssPublicKey proposal_pubkey; + if (yaml_validator.IsMap()) { + auto yaml_attestation_pubkey = yaml_validator["attestation_pubkey"]; + if (not yaml_attestation_pubkey.IsScalar()) { + return ConfigYamlError::INVALID; + } + BOOST_OUTCOME_TRY(auto pk, + crypto::xmss::XmssPublicKey::fromHex( + yaml_attestation_pubkey.as())); + attestation_pubkey = pk; + auto yaml_proposal_pubkey = yaml_validator["proposal_pubkey"]; + if (not yaml_proposal_pubkey.IsScalar()) { + return ConfigYamlError::INVALID; + } + BOOST_OUTCOME_TRY(pk, + crypto::xmss::XmssPublicKey::fromHex( + yaml_proposal_pubkey.as())); + proposal_pubkey = pk; + } else if (yaml_validator.IsScalar()) { + BOOST_OUTCOME_TRY(auto pk, + crypto::xmss::XmssPublicKey::fromHex( + yaml_validator.as())); + attestation_pubkey = pk; + proposal_pubkey = pk; + } else { return ConfigYamlError::INVALID; } - auto yaml_attestation_pubkey = yaml_validator["attestation_pubkey"]; - if (not yaml_attestation_pubkey.IsScalar()) { - return ConfigYamlError::INVALID; - } - BOOST_OUTCOME_TRY(auto attestation_pubkey, - crypto::xmss::XmssPublicKey::fromHex( - yaml_attestation_pubkey.as())); - auto yaml_proposal_pubkey = yaml_validator["proposal_pubkey"]; - if (not yaml_proposal_pubkey.IsScalar()) { - return ConfigYamlError::INVALID; - } - BOOST_OUTCOME_TRY(auto proposal_pubkey, - crypto::xmss::XmssPublicKey::fromHex( - yaml_proposal_pubkey.as())); validators.emplace_back(Validator{ .attestation_pubkey = attestation_pubkey, .proposal_pubkey = proposal_pubkey,