Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/executor-cli/src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub async fn run(
let request = TaskRequest {
payload: TaskPayload::ShellCommand { command: cmd },
workspace,
detach: false,
};

let meta = executor.start(request).await?;
Expand Down
5 changes: 5 additions & 0 deletions crates/executor-cli/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub async fn run(
workspace: Option<String>,
max_turns: Option<u32>,
allowed_tools: Vec<String>,
detach: bool,
) -> anyhow::Result<()> {
let executor = dispatch::create_executor(config, executor_name)?;

Expand All @@ -19,6 +20,7 @@ pub async fn run(
allowed_tools,
},
workspace,
detach,
};

let meta = executor.start(request).await?;
Expand All @@ -29,6 +31,9 @@ pub async fn run(
println!(" Executor: {} ({})", meta.executor_name, meta.executor_type);
println!(" PID: {}", meta.pid.map(|p| p.to_string()).unwrap_or_else(|| "N/A".into()));
println!(" Status: {}", meta.status);
if detach {
println!(" Mode: detached (fire-and-forget)");
}

Ok(())
}
7 changes: 6 additions & 1 deletion crates/executor-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ enum Commands {
/// Allowed tools (can be repeated)
#[arg(long)]
allowed_tools: Vec<String>,

/// Detach immediately after launching (fire-and-forget)
#[arg(long, short = 'd')]
detach: bool,
},

/// Run an arbitrary shell command on an executor
Expand Down Expand Up @@ -181,8 +185,9 @@ async fn main() -> anyhow::Result<()> {
workspace,
max_turns,
allowed_tools,
detach,
} => {
commands::start::run(&config, &executor, prompt, workspace, max_turns, allowed_tools)
commands::start::run(&config, &executor, prompt, workspace, max_turns, allowed_tools, detach)
.await
}
Commands::Run {
Expand Down
3 changes: 3 additions & 0 deletions crates/executor-core/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ impl TaskPayload {
pub struct TaskRequest {
pub payload: TaskPayload,
pub workspace: Option<String>,
/// Fire-and-forget: return task ID immediately without waiting for PID.
#[serde(default)]
pub detach: bool,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
Expand Down
158 changes: 108 additions & 50 deletions crates/executor-ssh/src/ssh_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,20 @@ impl SshExecutor {
Ok(output)
}

/// Send a command to the remote host without waiting for output.
/// Used in detach mode to avoid blocking on SSH channel read.
fn exec_remote_fire_and_forget(&self, sess: &Session, cmd: &str) -> Result<(), ExecutorError> {
debug!("Remote exec (fire-and-forget): {}", cmd);
let mut channel = sess
.channel_session()
.map_err(|e| ExecutorError::SshCommand(format!("Channel: {}", e)))?;
channel
.exec(cmd)
.map_err(|e| ExecutorError::SshCommand(format!("Exec '{}': {}", cmd, e)))?;
// Don't read output or wait for close — return immediately
Ok(())
}

/// Remote directory for task metadata/logs.
fn remote_task_dir(&self, task_id: &TaskId) -> String {
format!("/tmp/openclaw-tasks/{}", task_id)
Expand All @@ -119,15 +133,12 @@ impl Executor for SshExecutor {
let sess = self.connect()?;

let task_dir = self.remote_task_dir(&task_id);
self.exec_remote(&sess, &format!("mkdir -p {}", task_dir))?;

let workspace = request.workspace.as_deref().unwrap_or("~");
let log_file = format!("{}/claude.log", task_dir);
let pid_file = format!("{}/claude.pid", task_dir);
let exit_file = format!("{}/claude.exitcode", task_dir);

// Build the inner command based on payload type, then wrap in a subshell
// that writes exit code: ( cd <dir> && <cmd> > log 2>&1; echo $? > exitcode ) & echo $! > pid
// Build the inner command based on payload type
let inner_cmd = match &request.payload {
TaskPayload::ClaudeCode {
prompt,
Expand Down Expand Up @@ -156,53 +167,84 @@ impl Executor for SshExecutor {
}
};

let full_cmd = format!(
"( cd {} && {} > {} 2>&1; echo $? > {} ) & echo $! > {}",
workspace, inner_cmd, log_file, exit_file, pid_file
);

info!("Starting task {} on {}: {}", task_id, self.name(), full_cmd);
self.exec_remote(&sess, &full_cmd)?;

// Read the PID
let pid_str = self
.exec_remote(&sess, &format!("cat {}", pid_file))?
.trim()
.to_string();
let pid: u32 = pid_str
.parse()
.map_err(|_| ExecutorError::Process(format!("Invalid PID: '{}'", pid_str)))?;

info!("Task {} started with PID {} on {}", task_id, pid, self.name());

// Create and save metadata locally
let mut meta = TaskMetadata::new(
task_id.clone(),
self.config.name.clone(),
"ssh".to_string(),
request.payload.type_str().to_string(),
request.payload.description().to_string(),
request.workspace,
);
meta.mark_running(pid);

// Write .meta.json locally
let local_dir = self.local_meta_dir();
std::fs::create_dir_all(&local_dir)?;
meta.write_to_dir(&local_dir)?;

// Write .meta.json on remote too
let meta_json = serde_json::to_string_pretty(&meta)
.map_err(|e| ExecutorError::SshCommand(format!("Serialize meta: {}", e)))?;
self.exec_remote(
&sess,
&format!(
"cat > {}/{}.meta.json << 'METAEOF'\n{}\nMETAEOF",
task_dir, task_id, meta_json
),
)?;
if request.detach {
// Detach mode: single combined command, fire-and-forget.
// mkdir + nohup launch in one shot, don't wait for output.
let full_cmd = format!(
"mkdir -p {} && ( cd {} && nohup {} > {} 2>&1; echo $? > {} ) & echo $! > {}",
task_dir, workspace, inner_cmd, log_file, exit_file, pid_file
);

info!("Starting detached task {} on {}", task_id, self.name());
self.exec_remote_fire_and_forget(&sess, &full_cmd)?;

// Write local metadata with Starting status (no PID yet)
let meta = TaskMetadata::new(
task_id.clone(),
self.config.name.clone(),
"ssh".to_string(),
request.payload.type_str().to_string(),
request.payload.description().to_string(),
request.workspace,
);

let local_dir = self.local_meta_dir();
std::fs::create_dir_all(&local_dir)?;
meta.write_to_dir(&local_dir)?;

Ok(meta)
Ok(meta)
} else {
// Normal mode: full round-trip with PID readback and remote metadata
self.exec_remote(&sess, &format!("mkdir -p {}", task_dir))?;

let full_cmd = format!(
"( cd {} && {} > {} 2>&1; echo $? > {} ) & echo $! > {}",
workspace, inner_cmd, log_file, exit_file, pid_file
);

info!("Starting task {} on {}: {}", task_id, self.name(), full_cmd);
self.exec_remote(&sess, &full_cmd)?;

// Read the PID
let pid_str = self
.exec_remote(&sess, &format!("cat {}", pid_file))?
.trim()
.to_string();
let pid: u32 = pid_str
.parse()
.map_err(|_| ExecutorError::Process(format!("Invalid PID: '{}'", pid_str)))?;

info!("Task {} started with PID {} on {}", task_id, pid, self.name());

// Create and save metadata locally
let mut meta = TaskMetadata::new(
task_id.clone(),
self.config.name.clone(),
"ssh".to_string(),
request.payload.type_str().to_string(),
request.payload.description().to_string(),
request.workspace,
);
meta.mark_running(pid);

// Write .meta.json locally
let local_dir = self.local_meta_dir();
std::fs::create_dir_all(&local_dir)?;
meta.write_to_dir(&local_dir)?;

// Write .meta.json on remote too
let meta_json = serde_json::to_string_pretty(&meta)
.map_err(|e| ExecutorError::SshCommand(format!("Serialize meta: {}", e)))?;
self.exec_remote(
&sess,
&format!(
"cat > {}/{}.meta.json << 'METAEOF'\n{}\nMETAEOF",
task_dir, task_id, meta_json
),
)?;

Ok(meta)
}
}

async fn status(&self, task_id: &TaskId) -> Result<TaskMetadata, ExecutorError> {
Expand All @@ -216,6 +258,22 @@ impl Executor for SshExecutor {
return Err(ExecutorError::TaskNotFound(task_id.to_string()));
};

// For detached tasks that are still Starting, try to fetch PID from remote
if meta.status == TaskStatus::Starting && meta.pid.is_none() {
let task_dir = self.remote_task_dir(task_id);
let pid_file = format!("{}/claude.pid", task_dir);

if let Ok(sess) = self.connect() {
if let Ok(pid_str) = self.exec_remote(&sess, &format!("cat {} 2>/dev/null", pid_file)) {
let pid_str = pid_str.trim();
if let Ok(pid) = pid_str.parse::<u32>() {
meta.mark_running(pid);
meta.write_to_dir(&local_dir)?;
}
}
}
}

// Check if the process is still running on remote
if meta.status == TaskStatus::Running {
if let Some(pid) = meta.pid {
Expand Down