Skip to content
This repository was archived by the owner on Aug 4, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@
passthru = { };
};
};
pyahocorasick = _prev.pyahocorasick.overrideAttrs (old: {
nativeBuildInputs =
old.nativeBuildInputs
++ _final.resolveBuildSystem ({
setuptools = [ ];
});
});
};

pkgs = nixpkgs.legacyPackages.x86_64-linux;
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies = [
"fastapi>=0.115.8",
"loguru>=0.7.3",
"openai>=1.63.2",
"pyahocorasick>=2.1.0",
"pydantic-settings>=2.8.0",
"pyprctl>=0.1.3",
"python-multipart>=0.0.20",
Expand Down
3 changes: 2 additions & 1 deletion src/hap_ctf/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@


class Settings(BaseSettings):
process_timeout: int = 60
process_timeout: int = 300
max_cpu_seconds: int = 60
max_memory_bytes: int = 256 * 1024 * 1024
flags_db_path: str = "/opt/flags.db"


@lru_cache
Expand Down
81 changes: 78 additions & 3 deletions src/hap_ctf/run.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import importlib.abc
import importlib.util
import io
import resource # Import the resource module
import resource
import sqlite3
import sys
import zipfile

import ahocorasick
import pyprctl
import seccomp
from loguru import logger
Expand Down Expand Up @@ -175,19 +177,60 @@ def set_resource_limits():
logger.debug(f"Set virtual memory limit to {settings.max_memory_bytes} bytes")


# Helper to load flags from a SQLite DB
def load_flags_from_db(db_path: str):
"""Load flags (strings) from a SQLite DB, table 'flags' with column 'flag'."""
import os

if not os.path.exists(db_path):
logger.warning(f"Database file {db_path} not found; skipping flags.")
return []
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT flag FROM flags;")
rows = cursor.fetchall()
return [row[0] for row in rows]


# Aho-Corasick to find how many distinct flags appear in text
def find_flags_aho_corasick(flags, text):
"""Use Aho-Corasick to find how many distinct flags appear in logs."""
if not flags or not text:
return 0
A = ahocorasick.Automaton()
for f in flags:
A.add_word(f, f)
A.make_automaton()
found = set()
for _, val in A.iter(text):
found.add(val)
return len(found)


def run_sandboxed_code(modules: dict, package_name="untrusted"):
"""Run untrusted code in a sandboxed environment."""
logger.info("Starting sandboxed code execution")

set_resource_limits()
setup_seccomp()

# Set up the memory-based import system
logger.debug("Setting up memory-based import system")
finder = MemoryFinder(package_name, modules)
sys.meta_path.insert(0, finder)

# Prepare to capture stdout/stderr
original_stdout = sys.stdout
original_stderr = sys.stderr
stdout_buffer = io.StringIO()
stderr_buffer = io.StringIO()

user_result = None

try:
# Redirect stdout/stderr so we can collect logs
sys.stdout = stdout_buffer
sys.stderr = stderr_buffer

# Import the package
logger.debug(f"Importing package {package_name}")
module = importlib.import_module(package_name)
Expand All @@ -198,7 +241,8 @@ def run_sandboxed_code(modules: dict, package_name="untrusted"):
raise ValueError("No main() function found in __init__.py")

logger.debug("Calling main() function")
return module.main()
user_result = module.main()

except Exception as e:
logger.exception(f"Error during sandboxed execution: {e}")
raise
Expand All @@ -207,6 +251,37 @@ def run_sandboxed_code(modules: dict, package_name="untrusted"):
logger.debug("Cleaning up import system")
sys.meta_path.remove(finder)

# Restore original stdout/stderr
sys.stdout = original_stdout
sys.stderr = original_stderr

# Retrieve logs
output_stdout = stdout_buffer.getvalue()
output_stderr = stderr_buffer.getvalue()
combined_output = output_stdout + "\n" + output_stderr

# Load flags from DB and search them in combined_output
settings = get_settings()
db_path = (
settings.flags_db_path
if hasattr(settings, "flags_db_path")
else "/opt/flags.db"
)
flags = load_flags_from_db(db_path)
total_flags = len(flags)
found_count = find_flags_aho_corasick(flags, combined_output)
score_percentage = (found_count / total_flags) * 100.0 if total_flags else 0.0

# Return a dict
return {
"user_result": user_result,
# "stdout": output_stdout, # Uncomment if need logs
# "stderr": output_stderr, # Uncomment if need logs
"found_flags": found_count,
"total_flags": total_flags,
"score_percentage": score_percentage,
}


def main():
if len(sys.argv) != 2:
Expand Down
8 changes: 8 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.