Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ async def run_autonomous_agent(
# Check if all features are already complete (before starting a new session)
# Skip this check if running as initializer (needs to create features first)
if not is_initializer and iteration == 1:
passing, in_progress, total = count_passing_tests(project_dir)
passing, in_progress, total, _nhi = count_passing_tests(project_dir)
if total > 0 and passing == total:
print("\n" + "=" * 70)
print(" ALL FEATURES ALREADY COMPLETE!")
Expand Down Expand Up @@ -358,7 +358,7 @@ async def run_autonomous_agent(
print_progress_summary(project_dir)

# Check if all features are complete - exit gracefully if done
passing, in_progress, total = count_passing_tests(project_dir)
passing, in_progress, total, _nhi = count_passing_tests(project_dir)
if total > 0 and passing == total:
print("\n" + "=" * 70)
print(" ALL FEATURES COMPLETE!")
Expand Down
29 changes: 27 additions & 2 deletions api/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ class Feature(Base):

__tablename__ = "features"

# Composite index for common status query pattern (passes, in_progress)
# Composite index for common status query pattern (passes, in_progress, needs_human_input)
# Used by feature_get_stats, get_ready_features, and other status queries
__table_args__ = (
Index('ix_feature_status', 'passes', 'in_progress'),
Index('ix_feature_status', 'passes', 'in_progress', 'needs_human_input'),
)

id = Column(Integer, primary_key=True, index=True)
Expand All @@ -61,6 +61,11 @@ class Feature(Base):
# NULL/empty = no dependencies (backwards compatible)
dependencies = Column(JSON, nullable=True, default=None)

# Human input: agent can request structured input from a human
needs_human_input = Column(Boolean, nullable=False, default=False, index=True)
human_input_request = Column(JSON, nullable=True, default=None) # Agent's structured request
human_input_response = Column(JSON, nullable=True, default=None) # Human's response

def to_dict(self) -> dict:
"""Convert feature to dictionary for JSON serialization."""
return {
Expand All @@ -75,6 +80,10 @@ def to_dict(self) -> dict:
"in_progress": self.in_progress if self.in_progress is not None else False,
# Dependencies: NULL/empty treated as empty list for backwards compat
"dependencies": self.dependencies if self.dependencies else [],
# Human input fields
"needs_human_input": self.needs_human_input if self.needs_human_input is not None else False,
"human_input_request": self.human_input_request,
"human_input_response": self.human_input_response,
}

def get_dependencies_safe(self) -> list[int]:
Expand Down Expand Up @@ -302,6 +311,21 @@ def _is_network_path(path: Path) -> bool:
return False


def _migrate_add_human_input_columns(engine) -> None:
"""Add human input columns to existing databases that don't have them."""
with engine.connect() as conn:
result = conn.execute(text("PRAGMA table_info(features)"))
columns = [row[1] for row in result.fetchall()]

if "needs_human_input" not in columns:
conn.execute(text("ALTER TABLE features ADD COLUMN needs_human_input BOOLEAN DEFAULT 0"))
if "human_input_request" not in columns:
conn.execute(text("ALTER TABLE features ADD COLUMN human_input_request TEXT DEFAULT NULL"))
if "human_input_response" not in columns:
conn.execute(text("ALTER TABLE features ADD COLUMN human_input_response TEXT DEFAULT NULL"))
conn.commit()


def _migrate_add_schedules_tables(engine) -> None:
"""Create schedules and schedule_overrides tables if they don't exist."""
from sqlalchemy import inspect
Expand Down Expand Up @@ -425,6 +449,7 @@ def create_database(project_dir: Path) -> tuple:
_migrate_fix_null_boolean_fields(engine)
_migrate_add_dependencies_column(engine)
_migrate_add_testing_columns(engine)
_migrate_add_human_input_columns(engine)

# Migrate to add schedules tables
_migrate_add_schedules_tables(engine)
Expand Down
102 changes: 97 additions & 5 deletions mcp_server/feature_mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,20 @@ def feature_get_stats() -> str:
result = session.query(
func.count(Feature.id).label('total'),
func.sum(case((Feature.passes == True, 1), else_=0)).label('passing'),
func.sum(case((Feature.in_progress == True, 1), else_=0)).label('in_progress')
func.sum(case((Feature.in_progress == True, 1), else_=0)).label('in_progress'),
func.sum(case((Feature.needs_human_input == True, 1), else_=0)).label('needs_human_input')
).first()

total = result.total or 0
passing = int(result.passing or 0)
in_progress = int(result.in_progress or 0)
needs_human_input = int(result.needs_human_input or 0)
percentage = round((passing / total) * 100, 1) if total > 0 else 0.0

return json.dumps({
"passing": passing,
"in_progress": in_progress,
"needs_human_input": needs_human_input,
"total": total,
"percentage": percentage
})
Expand Down Expand Up @@ -221,6 +224,7 @@ def feature_get_summary(
"name": feature.name,
"passes": feature.passes,
"in_progress": feature.in_progress,
"needs_human_input": feature.needs_human_input if feature.needs_human_input is not None else False,
"dependencies": feature.dependencies or []
})
finally:
Expand Down Expand Up @@ -401,11 +405,11 @@ def feature_mark_in_progress(
"""
session = get_session()
try:
# Atomic claim: only succeeds if feature is not already claimed or passing
# Atomic claim: only succeeds if feature is not already claimed, passing, or blocked for human input
result = session.execute(text("""
UPDATE features
SET in_progress = 1
WHERE id = :id AND passes = 0 AND in_progress = 0
WHERE id = :id AND passes = 0 AND in_progress = 0 AND needs_human_input = 0
"""), {"id": feature_id})
session.commit()

Expand All @@ -418,6 +422,8 @@ def feature_mark_in_progress(
return json.dumps({"error": f"Feature with ID {feature_id} is already passing"})
if feature.in_progress:
return json.dumps({"error": f"Feature with ID {feature_id} is already in-progress"})
if getattr(feature, 'needs_human_input', False):
return json.dumps({"error": f"Feature with ID {feature_id} is blocked waiting for human input"})
return json.dumps({"error": "Failed to mark feature in-progress for unknown reason"})

# Fetch the claimed feature
Expand Down Expand Up @@ -455,11 +461,14 @@ def feature_claim_and_get(
if feature.passes:
return json.dumps({"error": f"Feature with ID {feature_id} is already passing"})

# Try atomic claim: only succeeds if not already claimed
if getattr(feature, 'needs_human_input', False):
return json.dumps({"error": f"Feature with ID {feature_id} is blocked waiting for human input"})

# Try atomic claim: only succeeds if not already claimed and not blocked for human input
result = session.execute(text("""
UPDATE features
SET in_progress = 1
WHERE id = :id AND passes = 0 AND in_progress = 0
WHERE id = :id AND passes = 0 AND in_progress = 0 AND needs_human_input = 0
"""), {"id": feature_id})
session.commit()

Expand Down Expand Up @@ -806,6 +815,8 @@ def feature_get_ready(
for f in all_features:
if f.passes or f.in_progress:
continue
if getattr(f, 'needs_human_input', False):
continue
deps = f.dependencies or []
if all(dep_id in passing_ids for dep_id in deps):
ready.append(f.to_dict())
Expand Down Expand Up @@ -888,6 +899,8 @@ def feature_get_graph() -> str:

if f.passes:
status = "done"
elif getattr(f, 'needs_human_input', False):
status = "needs_human_input"
elif blocking:
status = "blocked"
elif f.in_progress:
Expand Down Expand Up @@ -984,6 +997,85 @@ def feature_set_dependencies(
return json.dumps({"error": f"Failed to set dependencies: {str(e)}"})


@mcp.tool()
def feature_request_human_input(
feature_id: Annotated[int, Field(description="The ID of the feature that needs human input", ge=1)],
prompt: Annotated[str, Field(min_length=1, description="Explain what you need from the human and why")],
fields: Annotated[list[dict], Field(min_length=1, description="List of input fields to collect")]
) -> str:
"""Request structured input from a human for a feature that is blocked.

Use this ONLY when the feature genuinely cannot proceed without human intervention:
- Creating API keys or external accounts
- Choosing between design approaches that require human preference
- Configuring external services the agent cannot access
- Providing credentials or secrets

Do NOT use this for issues you can solve yourself (debugging, reading docs, etc.).

The feature will be moved out of in_progress and into a "needs human input" state.
Once the human provides their response, the feature returns to the pending queue
and will include the human's response when you pick it up again.

Args:
feature_id: The ID of the feature that needs human input
prompt: A clear explanation of what you need and why
fields: List of input fields, each with:
- id (str): Unique field identifier
- label (str): Human-readable label
- type (str): "text", "textarea", "select", or "boolean" (default: "text")
- required (bool): Whether the field is required (default: true)
- placeholder (str, optional): Placeholder text
- options (list, optional): For select type: [{value, label}]

Returns:
JSON with success confirmation or error message
"""
# Validate fields
for i, field in enumerate(fields):
if "id" not in field or "label" not in field:
return json.dumps({"error": f"Field at index {i} missing required 'id' or 'label'"})

request_data = {
"prompt": prompt,
"fields": fields,
}

session = get_session()
try:
# Atomically set needs_human_input, clear in_progress, store request, clear previous response
result = session.execute(text("""
UPDATE features
SET needs_human_input = 1,
in_progress = 0,
human_input_request = :request,
human_input_response = NULL
WHERE id = :id AND passes = 0
"""), {"id": feature_id, "request": json.dumps(request_data)})
session.commit()

if result.rowcount == 0:
feature = session.query(Feature).filter(Feature.id == feature_id).first()
if feature is None:
return json.dumps({"error": f"Feature with ID {feature_id} not found"})
if feature.passes:
return json.dumps({"error": f"Feature with ID {feature_id} is already passing"})
return json.dumps({"error": "Failed to request human input for unknown reason"})

feature = session.query(Feature).filter(Feature.id == feature_id).first()
return json.dumps({
"success": True,
"feature_id": feature_id,
"name": feature.name,
"message": f"Feature '{feature.name}' is now blocked waiting for human input"
})
except Exception as e:
session.rollback()
return json.dumps({"error": f"Failed to request human input: {str(e)}"})
finally:
session.close()


@mcp.tool()
def ask_user(
questions: Annotated[list[dict], Field(description="List of questions to ask, each with question, header, options (list of {label, description}), and multiSelect (bool)")]
Expand Down
8 changes: 7 additions & 1 deletion parallel_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,9 @@ def get_resumable_features(
for fd in feature_dicts:
if not fd.get("in_progress") or fd.get("passes"):
continue
# Skip if blocked for human input
if fd.get("needs_human_input"):
continue
# Skip if already running in this orchestrator instance
if fd["id"] in running_ids:
continue
Expand Down Expand Up @@ -536,11 +539,14 @@ def get_ready_features(
running_ids.update(batch_ids)

ready = []
skipped_reasons = {"passes": 0, "in_progress": 0, "running": 0, "failed": 0, "deps": 0}
skipped_reasons = {"passes": 0, "in_progress": 0, "running": 0, "failed": 0, "deps": 0, "needs_human_input": 0}
for fd in feature_dicts:
if fd.get("passes"):
skipped_reasons["passes"] += 1
continue
if fd.get("needs_human_input"):
skipped_reasons["needs_human_input"] += 1
continue
if fd.get("in_progress"):
skipped_reasons["in_progress"] += 1
continue
Expand Down
59 changes: 38 additions & 21 deletions progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,54 +62,71 @@ def has_features(project_dir: Path) -> bool:
return False


def count_passing_tests(project_dir: Path) -> tuple[int, int, int]:
def count_passing_tests(project_dir: Path) -> tuple[int, int, int, int]:
"""
Count passing, in_progress, and total tests via direct database access.
Count passing, in_progress, total, and needs_human_input tests via direct database access.

Args:
project_dir: Directory containing the project

Returns:
(passing_count, in_progress_count, total_count)
(passing_count, in_progress_count, total_count, needs_human_input_count)
"""
from autoforge_paths import get_features_db_path
db_file = get_features_db_path(project_dir)
if not db_file.exists():
return 0, 0, 0
return 0, 0, 0, 0

try:
with closing(_get_connection(db_file)) as conn:
cursor = conn.cursor()
# Single aggregate query instead of 3 separate COUNT queries
# Handle case where in_progress column doesn't exist yet (legacy DBs)
# Single aggregate query instead of separate COUNT queries
# Handle case where columns don't exist yet (legacy DBs)
try:
cursor.execute("""
SELECT
COUNT(*) as total,
SUM(CASE WHEN passes = 1 THEN 1 ELSE 0 END) as passing,
SUM(CASE WHEN in_progress = 1 THEN 1 ELSE 0 END) as in_progress
SUM(CASE WHEN in_progress = 1 THEN 1 ELSE 0 END) as in_progress,
SUM(CASE WHEN needs_human_input = 1 THEN 1 ELSE 0 END) as needs_human_input
FROM features
""")
row = cursor.fetchone()
total = row[0] or 0
passing = row[1] or 0
in_progress = row[2] or 0
needs_human_input = row[3] or 0
except sqlite3.OperationalError:
# Fallback for databases without in_progress column
cursor.execute("""
SELECT
COUNT(*) as total,
SUM(CASE WHEN passes = 1 THEN 1 ELSE 0 END) as passing
FROM features
""")
row = cursor.fetchone()
total = row[0] or 0
passing = row[1] or 0
in_progress = 0
return passing, in_progress, total
# Fallback for databases without newer columns
try:
cursor.execute("""
SELECT
COUNT(*) as total,
SUM(CASE WHEN passes = 1 THEN 1 ELSE 0 END) as passing,
SUM(CASE WHEN in_progress = 1 THEN 1 ELSE 0 END) as in_progress
FROM features
""")
row = cursor.fetchone()
total = row[0] or 0
passing = row[1] or 0
in_progress = row[2] or 0
needs_human_input = 0
except sqlite3.OperationalError:
cursor.execute("""
SELECT
COUNT(*) as total,
SUM(CASE WHEN passes = 1 THEN 1 ELSE 0 END) as passing
FROM features
""")
row = cursor.fetchone()
total = row[0] or 0
passing = row[1] or 0
in_progress = 0
needs_human_input = 0
return passing, in_progress, total, needs_human_input
except Exception as e:
print(f"[Database error in count_passing_tests: {e}]")
return 0, 0, 0
return 0, 0, 0, 0


def get_all_passing_features(project_dir: Path) -> list[dict]:
Expand Down Expand Up @@ -234,7 +251,7 @@ def print_session_header(session_num: int, is_initializer: bool) -> None:

def print_progress_summary(project_dir: Path) -> None:
"""Print a summary of current progress."""
passing, in_progress, total = count_passing_tests(project_dir)
passing, in_progress, total, _needs_human_input = count_passing_tests(project_dir)

if total > 0:
percentage = (passing / total) * 100
Expand Down
Loading