Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this repository are documented in this file.

## [Unreleased]

### Fixed

- `openclaw/fetchai-openclaw-orchestrator`: enforce `scan_directory` sandbox to `DEMO_PROJECTS_DIR` by default — orchestrator and connector policies reject paths like `~/Documents`; planner and executor normalize paths; opt-in `OPENCLAW_EXTENDED_PATHS` restores broader local paths for development

### Added
- `security-scanner-agent/`: LLM-powered code security analysis agent that scans code snippets via ASI:One and returns structured vulnerability reports (type, severity, line number, description, suggested fix). Built on a multi-agent Bureau using the standard Agent Chat Protocol; ASI:One-compatible and discoverable on Agentverse.
- `ticketlens-agent/`: Live real-time travel discovery AI agent powered by TicketLens MCP. High-precision reasoning utilizing the ASI1 LLM, persistent `uAgents` storage, and directly actionable booking deep links.
Expand Down
2 changes: 2 additions & 0 deletions openclaw/fetchai-openclaw-orchestrator/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ ASI_ONE_MODEL=asi1

# Demo / Testing
DEMO_PROJECTS_DIR=./demo_projects
# Set to true only for local dev if you need scan_directory outside demo_projects
# OPENCLAW_EXTENDED_PATHS=false

# Mailbox (set to true so ASI:One can reach the agent)
USE_MAILBOX=true
Expand Down
22 changes: 7 additions & 15 deletions openclaw/fetchai-openclaw-orchestrator/connector/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from dataclasses import dataclass, field
from pathlib import Path

from shared.paths import default_allowed_scan_paths
from shared.schemas import RejectionReason, TaskPlan, TaskStep

logger = logging.getLogger(__name__)
Expand All @@ -35,16 +36,6 @@
"generate_health_report",
}

_DEMO_DIR = os.getenv("DEMO_PROJECTS_DIR", "./demo_projects")

DEFAULT_ALLOWED_PATHS: list[str] = [
os.path.expanduser("~/projects"),
os.path.expanduser("~/Documents"),
"/tmp",
str(Path(_DEMO_DIR).resolve()), # demo directory (safe testing)
str(Path(".").resolve()), # current working directory
]


# ---------------------------------------------------------------------------
# Policy
Expand All @@ -58,9 +49,7 @@ class LocalPolicy:
allowed_actions: set[str] = field(
default_factory=lambda: set(DEFAULT_ALLOWED_ACTIONS)
)
allowed_paths: list[str] = field(
default_factory=lambda: list(DEFAULT_ALLOWED_PATHS)
)
allowed_paths: list[str] = field(default_factory=default_allowed_scan_paths)
require_user_confirmation: bool = True
allow_background_execution: bool = False

Expand All @@ -75,11 +64,14 @@ def check_action(self, step: TaskStep) -> RejectionReason | None:
return None

def check_path(self, step: TaskStep) -> RejectionReason | None:
if step.action != "scan_directory":
return None

raw_path = step.params.get("path")
if raw_path is None:
return None # no path param → OK
return None

resolved = str(Path(os.path.expanduser(raw_path)).resolve())
resolved = str(Path(os.path.expanduser(str(raw_path))).resolve())
for allowed in self.allowed_paths:
allowed_resolved = str(Path(allowed).resolve())
if resolved.startswith(allowed_resolved):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from pathlib import Path
from typing import Any

from shared.paths import normalize_scan_directory_path

logger = logging.getLogger(__name__)

# Default scan path — uses demo directory to avoid leaking real data
Expand All @@ -41,14 +43,15 @@ def scan_directory(params: dict[str, Any]) -> dict[str, Any]:
Defaults to the demo projects directory for safe testing.
"""
raw_path = params.get("path", _DEFAULT_SCAN_PATH)
resolved_path = normalize_scan_directory_path(str(raw_path))
if str(raw_path) != resolved_path:
logger.info(
"scan_directory path normalized from %s to %s",
raw_path,
resolved_path,
)

# Sanitise: resolve relative to project root, never expose home dir
if raw_path.startswith("~"):
# In testing mode, redirect ~ paths to the demo directory
raw_path = _DEFAULT_SCAN_PATH
logger.info("Redirected home-relative path to demo directory: %s", raw_path)

root = Path(raw_path).resolve()
root = Path(resolved_path).resolve()

if not root.exists():
return {"error": f"Path does not exist: {root}", "scanned_path": str(root)}
Expand Down
18 changes: 16 additions & 2 deletions openclaw/fetchai-openclaw-orchestrator/orchestrator/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import os
import re

from shared.paths import normalize_scan_directory_path
from shared.schemas import StepType, TaskConstraints, TaskPlan, TaskStep

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -328,6 +329,19 @@ def _plan_with_keywords(objective: str) -> TaskPlan:
return plan


# ---------------------------------------------------------------------------
# Path enforcement
# ---------------------------------------------------------------------------


def _enforce_scan_directory_paths(plan: TaskPlan) -> TaskPlan:
"""Force scan_directory steps to use the demo sandbox (unless extended mode)."""
for step in plan.steps:
if step.action == "scan_directory":
step.params["path"] = normalize_scan_directory_path(step.params.get("path"))
return plan


# ---------------------------------------------------------------------------
# Public entry point
# ---------------------------------------------------------------------------
Expand All @@ -344,7 +358,7 @@ def plan_objective(objective: str) -> TaskPlan:
# Try LLM first
plan = _plan_with_llm(objective)
if plan is not None:
return plan
return _enforce_scan_directory_paths(plan)

# Fallback to keywords
return _plan_with_keywords(objective)
return _enforce_scan_directory_paths(_plan_with_keywords(objective))
19 changes: 18 additions & 1 deletion openclaw/fetchai-openclaw-orchestrator/orchestrator/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
• User ownership validation
• Quota / rate limits
• Workflow allowlist
• scan_directory path sandbox (demo projects only by default)
• (Future) paid feature gating
"""

Expand All @@ -14,6 +15,7 @@
import time
from dataclasses import dataclass, field

from shared.paths import is_path_under_demo
from shared.schemas import RejectionReason, TaskPlan

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -71,6 +73,21 @@ def check_rate_limit(self, user_id: str) -> RejectionReason | None:
self._timestamps[user_id].append(now)
return None

def check_scan_paths(self, plan: TaskPlan) -> RejectionReason | None:
for step in plan.steps:
if step.action != "scan_directory":
continue
raw_path = step.params.get("path")
if raw_path is None:
continue
if not is_path_under_demo(str(raw_path)):
logger.warning(
"scan_directory path '%s' is outside demo sandbox",
raw_path,
)
return RejectionReason.PATH_NOT_ALLOWED
return None

def check_plan(self, plan: TaskPlan) -> RejectionReason | None:
if len(plan.steps) > self.max_steps_per_plan:
logger.warning(
Expand All @@ -86,7 +103,7 @@ def check_plan(self, plan: TaskPlan) -> RejectionReason | None:
logger.warning("Action '%s' not in allowlist", step.action)
return RejectionReason.ACTION_NOT_ALLOWED

return None
return self.check_scan_paths(plan)

def validate(self, user_id: str, plan: TaskPlan) -> RejectionReason | None:
"""Run all policy checks. Returns *None* on success."""
Expand Down
54 changes: 54 additions & 0 deletions openclaw/fetchai-openclaw-orchestrator/shared/paths.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Path sandbox helpers for OpenClaw scan_directory."""

from __future__ import annotations

import os
from pathlib import Path

_EXTENDED_PATHS_ENV = "OPENCLAW_EXTENDED_PATHS"


def demo_projects_dir() -> Path:
return Path(os.getenv("DEMO_PROJECTS_DIR", "./demo_projects")).resolve()


def extended_paths_enabled() -> bool:
return os.getenv(_EXTENDED_PATHS_ENV, "").strip().lower() in ("1", "true", "yes")


def default_allowed_scan_paths() -> list[str]:
"""Default connector allowlist: demo directory only unless extended mode is on."""
demo = str(demo_projects_dir())
if not extended_paths_enabled():
return [demo]
return [
demo,
os.path.expanduser("~/projects"),
os.path.expanduser("~/Documents"),
"/tmp",
str(Path(".").resolve()),
]


def is_path_under_demo(raw_path: str) -> bool:
resolved = Path(os.path.expanduser(raw_path)).resolve()
demo = demo_projects_dir()
if resolved == demo:
return True
try:
resolved.relative_to(demo)
return True
except ValueError:
return False


def normalize_scan_directory_path(raw_path: str | None) -> str:
"""Return a scan path confined to the demo directory (default mode)."""
demo = demo_projects_dir()
if extended_paths_enabled() and raw_path:
if str(raw_path).startswith("~"):
return str(demo)
return str(Path(os.path.expanduser(str(raw_path))).resolve())
if raw_path and is_path_under_demo(str(raw_path)):
return str(Path(os.path.expanduser(str(raw_path))).resolve())
return str(demo)
9 changes: 6 additions & 3 deletions openclaw/fetchai-openclaw-orchestrator/tests/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

import tempfile

from shared.schemas import StepType, TaskPlan, TaskStatus, TaskStep
from connector.executor import execute_plan
from shared.paths import demo_projects_dir
from shared.schemas import StepType, TaskPlan, TaskStatus, TaskStep


def test_execute_summarise_text():
Expand Down Expand Up @@ -50,12 +51,13 @@ def test_execute_mixed_steps():


def test_scan_directory_nonexistent_path():
missing = demo_projects_dir() / "_nonexistent_subdir_xyz"
plan = TaskPlan(
steps=[
TaskStep(
type=StepType.LOCAL,
action="scan_directory",
params={"path": "/nonexistent/path/xyz"},
params={"path": str(missing)},
)
]
)
Expand All @@ -66,8 +68,9 @@ def test_scan_directory_nonexistent_path():
assert "error" in scan_out


def test_pipeline_chaining():
def test_pipeline_chaining(monkeypatch):
"""scan_directory → generate_report should chain outputs."""
monkeypatch.setenv("OPENCLAW_EXTENDED_PATHS", "1")
with tempfile.TemporaryDirectory() as tmpdir:
plan = TaskPlan(
steps=[
Expand Down
33 changes: 33 additions & 0 deletions openclaw/fetchai-openclaw-orchestrator/tests/test_paths.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Tests for shared path sandbox helpers."""

import os

from orchestrator.planner import _enforce_scan_directory_paths
from shared.paths import demo_projects_dir, is_path_under_demo, normalize_scan_directory_path
from shared.schemas import StepType, TaskPlan, TaskStep


def test_is_path_under_demo_rejects_documents():
assert is_path_under_demo(os.path.expanduser("~/Documents")) is False


def test_normalize_forces_demo_by_default(monkeypatch):
monkeypatch.delenv("OPENCLAW_EXTENDED_PATHS", raising=False)
assert normalize_scan_directory_path(
os.path.expanduser("~/Documents")
) == str(demo_projects_dir())


def test_planner_enforce_rewrites_scan_path(monkeypatch):
monkeypatch.delenv("OPENCLAW_EXTENDED_PATHS", raising=False)
plan = TaskPlan(
steps=[
TaskStep(
type=StepType.LOCAL,
action="scan_directory",
params={"path": os.path.expanduser("~/Documents")},
)
]
)
_enforce_scan_directory_paths(plan)
assert is_path_under_demo(plan.steps[0].params["path"])
59 changes: 58 additions & 1 deletion openclaw/fetchai-openclaw-orchestrator/tests/test_policy.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""Tests for both Fetch-side and local policy engines."""

from orchestrator.policy import FetchPolicy
import os

from connector.policy import LocalPolicy
from orchestrator.policy import FetchPolicy
from shared.paths import demo_projects_dir
from shared.schemas import (
RejectionReason,
StepType,
Expand Down Expand Up @@ -46,6 +49,32 @@ def test_rate_limit(self):
assert policy.validate("u_1", plan) is None
assert policy.validate("u_1", plan) == RejectionReason.QUOTA_EXCEEDED

def test_scan_directory_outside_demo_rejected(self):
policy = FetchPolicy()
plan = TaskPlan(
steps=[
TaskStep(
type=StepType.LOCAL,
action="scan_directory",
params={"path": os.path.expanduser("~/Documents")},
)
]
)
assert policy.validate("u_1", plan) == RejectionReason.PATH_NOT_ALLOWED

def test_scan_directory_under_demo_passes(self):
policy = FetchPolicy()
plan = TaskPlan(
steps=[
TaskStep(
type=StepType.LOCAL,
action="scan_directory",
params={"path": str(demo_projects_dir())},
)
]
)
assert policy.validate("u_1", plan) is None


# =========================================================================
# Local policy
Expand Down Expand Up @@ -88,3 +117,31 @@ def test_path_inside_sandbox_passes(self):
]
)
assert policy.validate_plan(plan) is None

def test_default_policy_rejects_documents_path(self, monkeypatch):
monkeypatch.delenv("OPENCLAW_EXTENDED_PATHS", raising=False)
policy = LocalPolicy()
plan = TaskPlan(
steps=[
TaskStep(
type=StepType.LOCAL,
action="scan_directory",
params={"path": os.path.expanduser("~/Documents")},
)
]
)
assert policy.validate_plan(plan) == RejectionReason.PATH_NOT_ALLOWED

def test_default_policy_allows_demo_path(self, monkeypatch):
monkeypatch.delenv("OPENCLAW_EXTENDED_PATHS", raising=False)
policy = LocalPolicy()
plan = TaskPlan(
steps=[
TaskStep(
type=StepType.LOCAL,
action="scan_directory",
params={"path": str(demo_projects_dir())},
)
]
)
assert policy.validate_plan(plan) is None
Loading