Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ __pycache__/
archive/
.omx/
.clawd-agents/
.jules/
.port_sessions/

# Rust build artifacts (also in rust/.gitignore)
target/

# Claude Code local artifacts (not shared)
.claude/settings.local.json
.claude/sessions/
.port_sessions/

# Editor and OS artifacts
*.swp
Expand All @@ -18,4 +19,3 @@ target/
.DS_Store
.vscode/
.idea/
.port_sessions/
16 changes: 7 additions & 9 deletions src/models.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import functools
from dataclasses import dataclass, field
from functools import cached_property
Comment thread
badMade marked this conversation as resolved.


@dataclass(frozen=True)
Expand All @@ -17,14 +17,12 @@ class PortingModule:
name: str
responsibility: str
source_hint: str
status: str = 'planned'
status: str = "planned"

# ⚡ Bolt Optimization: Cache the concatenated and lowercased search string.
# Why: Prevents repetitive string allocations and `.lower()` calls during command/tool routing.
# Impact: Reduces _score runtime by ~70% (from ~4.5ms to ~1.3ms per 1000 items in benchmarks).
@functools.cached_property
@cached_property
def search_text(self) -> str:
return f"{self.name} {self.source_hint} {self.responsibility}".lower()
# ⚡ Bolt: Cache lowercased concatenated strings to avoid redundant string allocations in routing loops
return f"{self.name}\0{self.source_hint}\0{self.responsibility}".lower()
Comment thread
badMade marked this conversation as resolved.


@dataclass(frozen=True)
Expand All @@ -38,7 +36,7 @@ class UsageSummary:
input_tokens: int = 0
output_tokens: int = 0

def add_turn(self, prompt: str, output: str) -> 'UsageSummary':
def add_turn(self, prompt: str, output: str) -> "UsageSummary":
return UsageSummary(
input_tokens=self.input_tokens + len(prompt.split()),
output_tokens=self.output_tokens + len(output.split()),
Expand All @@ -52,6 +50,6 @@ class PortingBacklog:

def summary_lines(self) -> list[str]:
return [
f'- {module.name} [{module.status}] — {module.responsibility} (from {module.source_hint})'
f"- {module.name} [{module.status}] — {module.responsibility} (from {module.source_hint})"
for module in self.modules
]
199 changes: 130 additions & 69 deletions src/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,64 +38,70 @@ class RuntimeSession:

def as_markdown(self) -> str:
lines = [
'# Runtime Session',
'',
f'Prompt: {self.prompt}',
'',
'## Context',
"# Runtime Session",
"",
f"Prompt: {self.prompt}",
"",
"## Context",
render_context(self.context),
'',
'## Setup',
f'- Python: {self.setup.python_version} ({self.setup.implementation})',
f'- Platform: {self.setup.platform_name}',
f'- Test command: {self.setup.test_command}',
'',
'## Startup Steps',
*(f'- {step}' for step in self.setup.startup_steps()),
'',
'## System Init',
"",
"## Setup",
f"- Python: {self.setup.python_version} ({self.setup.implementation})",
f"- Platform: {self.setup.platform_name}",
f"- Test command: {self.setup.test_command}",
"",
"## Startup Steps",
*(f"- {step}" for step in self.setup.startup_steps()),
"",
"## System Init",
self.system_init_message,
'',
'## Routed Matches',
"",
"## Routed Matches",
]
if self.routed_matches:
lines.extend(
f'- [{match.kind}] {match.name} ({match.score}) — {match.source_hint}'
f"- [{match.kind}] {match.name} ({match.score}) — {match.source_hint}"
for match in self.routed_matches
)
else:
lines.append('- none')
lines.extend([
'',
'## Command Execution',
*(self.command_execution_messages or ('none',)),
'',
'## Tool Execution',
*(self.tool_execution_messages or ('none',)),
'',
'## Stream Events',
*(f"- {event['type']}: {event}" for event in self.stream_events),
'',
'## Turn Result',
self.turn_result.output,
'',
f'Persisted session path: {self.persisted_session_path}',
'',
self.history.as_markdown(),
])
return '\n'.join(lines)
lines.append("- none")
lines.extend(
[
"",
"## Command Execution",
*(self.command_execution_messages or ("none",)),
"",
"## Tool Execution",
*(self.tool_execution_messages or ("none",)),
"",
"## Stream Events",
*(f"- {event['type']}: {event}" for event in self.stream_events),
"",
"## Turn Result",
self.turn_result.output,
"",
f"Persisted session path: {self.persisted_session_path}",
"",
self.history.as_markdown(),
]
)
return "\n".join(lines)


class PortRuntime:
def route_prompt(self, prompt: str, limit: int = 5) -> list[RoutedMatch]:
tokens = {token.lower() for token in prompt.replace('/', ' ').replace('-', ' ').split() if token}
tokens = {
token.lower()
for token in prompt.replace("/", " ").replace("-", " ").split()
if token
}
by_kind = {
'command': self._collect_matches(tokens, PORTED_COMMANDS, 'command'),
'tool': self._collect_matches(tokens, PORTED_TOOLS, 'tool'),
"command": self._collect_matches(tokens, PORTED_COMMANDS, "command"),
"tool": self._collect_matches(tokens, PORTED_TOOLS, "tool"),
}

selected: list[RoutedMatch] = []
for kind in ('command', 'tool'):
for kind in ("command", "tool"):
if by_kind[kind]:
selected.append(by_kind[kind].pop(0))

Expand All @@ -112,30 +118,59 @@ def bootstrap_session(self, prompt: str, limit: int = 5) -> RuntimeSession:
setup = setup_report.setup
history = HistoryLog()
engine = QueryEnginePort.from_workspace()
history.add('context', f'python_files={context.python_file_count}, archive_available={context.archive_available}')
history.add('registry', f'commands={len(PORTED_COMMANDS)}, tools={len(PORTED_TOOLS)}')
history.add(
"context",
f"python_files={context.python_file_count}, archive_available={context.archive_available}",
)
history.add(
"registry", f"commands={len(PORTED_COMMANDS)}, tools={len(PORTED_TOOLS)}"
)
matches = self.route_prompt(prompt, limit=limit)
registry = build_execution_registry()
command_execs = tuple(registry.command(match.name).execute(prompt) for match in matches if match.kind == 'command' and registry.command(match.name))
tool_execs = tuple(registry.tool(match.name).execute(prompt) for match in matches if match.kind == 'tool' and registry.tool(match.name))
command_execs = tuple(
registry.command(match.name).execute(prompt)
for match in matches
if match.kind == "command" and registry.command(match.name)
)
Comment thread
badMade marked this conversation as resolved.
tool_execs = tuple(
registry.tool(match.name).execute(prompt)
for match in matches
if match.kind == "tool" and registry.tool(match.name)
)
Comment thread
badMade marked this conversation as resolved.
denials = tuple(self._infer_permission_denials(matches))
stream_events = tuple(engine.stream_submit_message(
prompt,
matched_commands=tuple(match.name for match in matches if match.kind == 'command'),
matched_tools=tuple(match.name for match in matches if match.kind == 'tool'),
denied_tools=denials,
))
stream_events = tuple(
engine.stream_submit_message(
prompt,
matched_commands=tuple(
match.name for match in matches if match.kind == "command"
),
matched_tools=tuple(
match.name for match in matches if match.kind == "tool"
),
denied_tools=denials,
)
)
turn_result = engine.submit_message(
prompt,
matched_commands=tuple(match.name for match in matches if match.kind == 'command'),
matched_tools=tuple(match.name for match in matches if match.kind == 'tool'),
matched_commands=tuple(
match.name for match in matches if match.kind == "command"
),
matched_tools=tuple(
match.name for match in matches if match.kind == "tool"
),
denied_tools=denials,
)
persisted_session_path = engine.persist_session()
history.add('routing', f'matches={len(matches)} for prompt={prompt!r}')
history.add('execution', f'command_execs={len(command_execs)} tool_execs={len(tool_execs)}')
history.add('turn', f'commands={len(turn_result.matched_commands)} tools={len(turn_result.matched_tools)} denials={len(turn_result.permission_denials)} stop={turn_result.stop_reason}')
history.add('session_store', persisted_session_path)
history.add("routing", f"matches={len(matches)} for prompt={prompt!r}")
history.add(
"execution",
f"command_execs={len(command_execs)} tool_execs={len(tool_execs)}",
)
history.add(
"turn",
f"commands={len(turn_result.matched_commands)} tools={len(turn_result.matched_tools)} denials={len(turn_result.permission_denials)} stop={turn_result.stop_reason}",
)
history.add("session_store", persisted_session_path)
return RuntimeSession(
prompt=prompt,
context=context,
Expand All @@ -151,34 +186,60 @@ def bootstrap_session(self, prompt: str, limit: int = 5) -> RuntimeSession:
persisted_session_path=persisted_session_path,
)

def run_turn_loop(self, prompt: str, limit: int = 5, max_turns: int = 3, structured_output: bool = False) -> list[TurnResult]:
def run_turn_loop(
self,
prompt: str,
limit: int = 5,
max_turns: int = 3,
structured_output: bool = False,
) -> list[TurnResult]:
engine = QueryEnginePort.from_workspace()
engine.config = QueryEngineConfig(max_turns=max_turns, structured_output=structured_output)
engine.config = QueryEngineConfig(
max_turns=max_turns, structured_output=structured_output
)
matches = self.route_prompt(prompt, limit=limit)
command_names = tuple(match.name for match in matches if match.kind == 'command')
tool_names = tuple(match.name for match in matches if match.kind == 'tool')
command_names = tuple(
match.name for match in matches if match.kind == "command"
)
tool_names = tuple(match.name for match in matches if match.kind == "tool")
results: list[TurnResult] = []
for turn in range(max_turns):
turn_prompt = prompt if turn == 0 else f'{prompt} [turn {turn + 1}]'
turn_prompt = prompt if turn == 0 else f"{prompt} [turn {turn + 1}]"
result = engine.submit_message(turn_prompt, command_names, tool_names, ())
results.append(result)
if result.stop_reason != 'completed':
if result.stop_reason != "completed":
break
return results

def _infer_permission_denials(self, matches: list[RoutedMatch]) -> list[PermissionDenial]:
def _infer_permission_denials(
self, matches: list[RoutedMatch]
) -> list[PermissionDenial]:
denials: list[PermissionDenial] = []
for match in matches:
if match.kind == 'tool' and 'bash' in match.name.lower():
denials.append(PermissionDenial(tool_name=match.name, reason='destructive shell execution remains gated in the Python port'))
if match.kind == "tool" and "bash" in match.name.lower():
denials.append(
PermissionDenial(
tool_name=match.name,
reason="destructive shell execution remains gated in the Python port",
)
)
return denials

def _collect_matches(self, tokens: set[str], modules: tuple[PortingModule, ...], kind: str) -> list[RoutedMatch]:
def _collect_matches(
self, tokens: set[str], modules: tuple[PortingModule, ...], kind: str
) -> list[RoutedMatch]:
matches: list[RoutedMatch] = []
for module in modules:
score = self._score(tokens, module)
if score > 0:
matches.append(RoutedMatch(kind=kind, name=module.name, source_hint=module.source_hint, score=score))
matches.append(
RoutedMatch(
kind=kind,
name=module.name,
source_hint=module.source_hint,
score=score,
)
)
matches.sort(key=lambda item: (-item.score, item.name))
return matches

Expand Down