diff --git a/README.md b/README.md index 4f0c7bf..55bb889 100644 --- a/README.md +++ b/README.md @@ -393,11 +393,13 @@ Verification: Input: - GitHub webhook `pull_request.*` -- built-in PR monitor state changes +- built-in PR monitor state changes (status + reviews) - CLI thin client `clawhip github pr-status-changed ...` Behavior: - emit `github.pr-status-changed` +- emit `github.pr-review-submitted` when `emit_pr_reviews = true` on a monitored repo (per-PR polling of `/repos/{owner}/{repo}/pulls/{n}/reviews`, baseline-then-emit) +- review payload carries `payload.review.state` (one of `approved`, `changes_requested`, `commented`, `dismissed`), `payload.review.body`, and `payload.sender.login` - route via `github.*` - apply repo filter - prepend route mention if configured @@ -405,6 +407,7 @@ Behavior: Verification: - open real PR - merge / close PR +- submit a real review on a PR with `emit_pr_reviews = true` - confirm final Discord message body in target channel ### 6. Git commit preset family @@ -592,7 +595,9 @@ Verification: - `github.issue-opened` - `github.issue-commented` - `github.issue-closed` +- `github.issues-labeled` - `github.pr-status-changed` +- `github.pr-review-submitted` ### Git family - `git.commit` @@ -655,6 +660,35 @@ sink = "discord" channel = "1480171113253175356" format = "alert" allow_dynamic_tokens = false + +# Forward labeled-issue events to IYENsystem so its `iyen:auto-fix` / +# `iyen:declined` / `iyen:review` label triggers can fire (see +# [providers.iyensystem]). `github.issues-labeled` carries +# `payload.sender.login` and `payload.label.name` so IYENsystem's +# SafetyPolicy gate can validate the (actor, label) pair before enqueuing +# work. `iyen:review` on a PR (clawhip emits `issues-labeled` for both +# issues and PRs โ€” IYENsystem's PrReviewWorkflow gates on +# `payload.issue.pull_request` to distinguish) hands the PR to +# IYENsystem's review lane. +[[routes]] +event = "github.issues-labeled" +filter = { repo = "IYENTeam/example-repo" } +sink = "iyensystem" +allow_dynamic_tokens = false + +# Forward submitted PR reviews to IYENsystem so its ReviewResultHandler +# can decide on merges. `approved` reviews coming from outside the +# system (an OpenClaw / human reviewer, not IYENsystem's own +# reviewer-bot) hit the auto_merge_allowlist + human_approved gate and +# may trigger `merge_pr` + linked-issue close. `changes_requested` +# reviews log only โ€” IYENsystem doesn't auto-retry under the new +# label-driven design (review submissions are stateless). +# Requires the monitored repo entry to set `emit_pr_reviews = true`. +[[routes]] +event = "github.pr-review-submitted" +filter = { repo = "IYENTeam/example-repo" } +sink = "iyensystem" +allow_dynamic_tokens = false ``` Resolution rules: diff --git a/src/config.rs b/src/config.rs index 2f4c0c6..b9ad371 100644 --- a/src/config.rs +++ b/src/config.rs @@ -7,9 +7,9 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; -use crate::Result; use crate::events::MessageFormat; use crate::source::workspace::{default_workspace_debounce_ms, default_workspace_watch_dirs}; +use crate::Result; #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct AppConfig { @@ -39,6 +39,12 @@ pub struct ProvidersConfig { pub discord: DiscordConfig, #[serde(default)] pub slack: SlackConfig, + #[serde(default)] + pub openclaw: OpenClawConfig, + #[serde(default)] + pub iyensystem: IyenSystemConfig, + #[serde(default)] + pub hermes: HermesConfig, } #[derive(Debug, Clone, Serialize, Deserialize, Default)] @@ -52,6 +58,39 @@ pub struct DiscordConfig { #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct SlackConfig {} +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct OpenClawConfig { + pub gateway_url: Option, + pub gateway_token: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct IyenSystemConfig { + pub url: Option, + pub auth_token: Option, +} + +/// Configuration for routing events to a Hermes Agent gateway as a +/// decision authority. Mirrors [`OpenClawConfig`]: when both +/// `base_url` and `auth_token` are set, the dispatcher registers a +/// `hermes` sink and routes with `sink = "hermes"` become eligible. +/// +/// Optional knobs: +/// - `instructions`: override the IYEN-domain system prompt baked +/// into [`crate::sink::HermesSink`]. Used when an operator ships a +/// custom Hermes skill. +/// - `model`: pin a specific model id; otherwise Hermes uses its +/// configured default. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct HermesConfig { + pub base_url: Option, + pub auth_token: Option, + #[serde(default)] + pub instructions: Option, + #[serde(default)] + pub model: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DaemonConfig { #[serde(default = "default_bind_host")] @@ -68,9 +107,31 @@ impl DiscordConfig { } } +impl OpenClawConfig { + pub fn is_configured(&self) -> bool { + crate::sink::OpenClawSink::is_configured(&self.gateway_url, &self.gateway_token) + } +} + +impl IyenSystemConfig { + pub fn is_configured(&self) -> bool { + crate::sink::IyenSystemSink::is_configured(&self.url, &self.auth_token) + } +} + +impl HermesConfig { + pub fn is_configured(&self) -> bool { + crate::sink::HermesSink::is_configured(&self.base_url, &self.auth_token) + } +} + impl ProvidersConfig { fn is_empty(&self) -> bool { - self.discord.is_empty() && self.slack.is_empty() + self.discord.is_empty() + && self.slack.is_empty() + && !self.openclaw.is_configured() + && !self.iyensystem.is_configured() + && !self.hermes.is_configured() } } @@ -210,6 +271,8 @@ pub struct MonitorConfig { pub tmux: TmuxMonitorConfig, #[serde(default)] pub workspace: Vec, + #[serde(default)] + pub opencode: OpenCodeMonitorConfig, } impl Default for MonitorConfig { @@ -221,6 +284,7 @@ impl Default for MonitorConfig { git: GitMonitorConfig::default(), tmux: TmuxMonitorConfig::default(), workspace: Vec::new(), + opencode: OpenCodeMonitorConfig::default(), } } } @@ -252,6 +316,8 @@ pub struct GitRepoMonitor { pub emit_issue_opened: bool, #[serde(default)] pub emit_pr_status: bool, + #[serde(default)] + pub emit_pr_reviews: bool, pub channel: Option, pub mention: Option, pub format: Option, @@ -268,6 +334,7 @@ impl Default for GitRepoMonitor { emit_branch_changes: true, emit_issue_opened: true, emit_pr_status: false, + emit_pr_reviews: false, channel: None, mention: None, format: None, @@ -336,6 +403,25 @@ impl Default for WorkspaceMonitor { } } +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct OpenCodeMonitorConfig { + pub url: Option, + #[serde(default = "default_opencode_poll_interval")] + pub poll_interval_secs: u64, + #[serde(default = "default_opencode_idle_threshold")] + pub idle_threshold_secs: u64, + pub channel: Option, + pub mention: Option, + pub format: Option, +} + +fn default_opencode_poll_interval() -> u64 { + 10 +} +fn default_opencode_idle_threshold() -> u64 { + 600 +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CronConfig { #[serde(default = "default_cron_poll_interval_secs")] @@ -601,7 +687,10 @@ impl AppConfig { format!("route #{} ({}) must set a sink", index + 1, route.event).into(), ); } - if !matches!(sink, "discord" | "slack") { + if !matches!( + sink, + "discord" | "slack" | "openclaw" | "iyensystem" | "hermes" + ) { return Err(format!( "route #{} ({}) uses unsupported sink '{}'", index + 1, @@ -650,6 +739,36 @@ impl AppConfig { .into()); } } + "openclaw" => { + if !self.providers.openclaw.is_configured() { + return Err(format!( + "route #{} ({}) uses openclaw sink but [providers.openclaw] is not configured", + index + 1, + route.event + ) + .into()); + } + } + "iyensystem" => { + if !self.providers.iyensystem.is_configured() { + return Err(format!( + "route #{} ({}) uses iyensystem sink but [providers.iyensystem] is not configured", + index + 1, + route.event + ) + .into()); + } + } + "hermes" => { + if !self.providers.hermes.is_configured() { + return Err(format!( + "route #{} ({}) uses hermes sink but [providers.hermes] is not configured", + index + 1, + route.event + ) + .into()); + } + } _ => unreachable!(), } } @@ -1082,6 +1201,7 @@ mod tests { legacy_default_channel: None, }, slack: SlackConfig::default(), + ..ProvidersConfig::default() }, routes: vec![RouteRule { event: "tmux.keyword".into(), @@ -1319,6 +1439,7 @@ message = " ping " legacy_default_channel: None, }, slack: SlackConfig::default(), + ..ProvidersConfig::default() }, cron: CronConfig { poll_interval_secs: 30, diff --git a/src/daemon.rs b/src/daemon.rs index a99eac5..c6a3e18 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -21,10 +21,10 @@ use crate::events::{IncomingEvent, MessageFormat, normalize_event}; use crate::native_hooks::incoming_event_from_native_hook_json; use crate::render::{DefaultRenderer, Renderer}; use crate::router::Router; -use crate::sink::{DiscordSink, Sink, SlackSink}; +use crate::sink::{DiscordSink, HermesSink, IyenSystemSink, OpenClawSink, Sink, SlackSink}; use crate::source::{ - GitHubSource, GitSource, RegisteredTmuxSession, SharedTmuxRegistry, Source, TmuxSource, - WorkspaceSource, list_active_tmux_registrations, + GitHubSource, GitSource, OpenCodeSource, RegisteredTmuxSession, SharedTmuxRegistry, Source, + TmuxSource, WorkspaceSource, list_active_tmux_registrations, }; use crate::update::{self, SharedPendingUpdate}; @@ -54,6 +54,43 @@ pub async fn run( Box::new(DiscordSink::from_config(config.clone())?), ); sinks.insert("slack".into(), Box::new(SlackSink::default())); + if config.providers.openclaw.is_configured() { + let oc = &config.providers.openclaw; + sinks.insert( + "openclaw".into(), + Box::new(OpenClawSink::new( + oc.gateway_url.clone().unwrap_or_default(), + oc.gateway_token.clone().unwrap_or_default(), + )), + ); + println!("clawhip: openclaw sink registered"); + } + if config.providers.iyensystem.is_configured() { + let iy = &config.providers.iyensystem; + sinks.insert( + "iyensystem".into(), + Box::new(IyenSystemSink::new( + iy.url.clone().unwrap_or_default(), + iy.auth_token.clone().unwrap_or_default(), + )), + ); + println!("clawhip: iyensystem sink registered"); + } + if config.providers.hermes.is_configured() { + let hermes_cfg = &config.providers.hermes; + let mut sink = HermesSink::new( + hermes_cfg.base_url.clone().unwrap_or_default(), + hermes_cfg.auth_token.clone().unwrap_or_default(), + ); + if let Some(instructions) = hermes_cfg.instructions.clone() { + sink = sink.with_instructions(instructions); + } + if let Some(model) = hermes_cfg.model.clone() { + sink = sink.with_model(model); + } + sinks.insert("hermes".into(), Box::new(sink)); + println!("clawhip: hermes sink registered"); + } let renderer: Box = Box::new(DefaultRenderer); let router = Router::new(config.clone()); let tmux_registry: SharedTmuxRegistry = Arc::new(RwLock::new(HashMap::new())); @@ -81,6 +118,7 @@ pub async fn run( tx.clone(), ); spawn_source(WorkspaceSource::new(config.clone()), tx.clone()); + spawn_source(OpenCodeSource::new(config.clone()), tx.clone()); spawn_source(CronSource::new(config.clone(), cron_state_path), tx.clone()); let pending_update = update::new_shared_pending_update(); @@ -291,7 +329,26 @@ async fn post_github( let event = match event_name { "issues" if action == "opened" => { - Some(normalize_event(IncomingEvent::github_issue_opened( + let labels: Vec = payload + .pointer("/issue/labels") + .and_then(Value::as_array) + .map(|arr| { + arr.iter() + .filter_map(|l| l.get("name").and_then(Value::as_str).map(String::from)) + .collect() + }) + .unwrap_or_default(); + let body = payload + .pointer("/issue/body") + .and_then(Value::as_str) + .map(|b| { + if b.len() > 200 { + format!("{}...", &b[..200]) + } else { + b.to_string() + } + }); + Some(normalize_event(IncomingEvent::github_issue_opened_rich( payload .pointer("/repository/full_name") .and_then(Value::as_str) @@ -306,6 +363,50 @@ async fn post_github( .and_then(Value::as_str) .unwrap_or("Untitled issue") .to_string(), + payload + .pointer("/issue/html_url") + .and_then(Value::as_str) + .map(String::from), + labels, + body, + None, + ))) + } + "issues" if action == "reopened" => { + Some(normalize_event(IncomingEvent::github_issue_opened( + payload + .pointer("/repository/full_name") + .and_then(Value::as_str) + .unwrap_or("unknown/unknown") + .to_string(), + payload + .pointer("/issue/number") + .and_then(Value::as_u64) + .unwrap_or_default(), + payload + .pointer("/issue/title") + .and_then(Value::as_str) + .unwrap_or("Untitled") + .to_string(), + None, + ))) + } + "issues" if action == "closed" => { + Some(normalize_event(IncomingEvent::github_issue_closed( + payload + .pointer("/repository/full_name") + .and_then(Value::as_str) + .unwrap_or("unknown/unknown") + .to_string(), + payload + .pointer("/issue/number") + .and_then(Value::as_u64) + .unwrap_or_default(), + payload + .pointer("/issue/title") + .and_then(Value::as_str) + .unwrap_or("Untitled") + .to_string(), None, ))) } @@ -379,6 +480,7 @@ async fn post_github( "unknown".to_string(), "opened".to_string(), url, + "".into(), None, ))), "closed" => Some(normalize_event(IncomingEvent::github_pr_status_changed( @@ -388,6 +490,17 @@ async fn post_github( "open".to_string(), "closed".to_string(), url, + "".into(), + None, + ))), + "reopened" => Some(normalize_event(IncomingEvent::github_pr_status_changed( + repo, + number, + title, + "closed".to_string(), + "opened".to_string(), + url, + "".into(), None, ))), _ => None, diff --git a/src/discord.rs b/src/discord.rs index 589c0f2..8fa1d6c 100644 --- a/src/discord.rs +++ b/src/discord.rs @@ -105,6 +105,15 @@ impl DiscordClient { SinkTarget::SlackWebhook(_) => { return Err("cannot send Slack webhook via Discord client".into()); } + SinkTarget::OpenClaw => { + return Err("cannot send OpenClaw event via Discord client".into()); + } + SinkTarget::IyenSystem => { + return Err("cannot send IyenSystem event via Discord client".into()); + } + SinkTarget::Hermes => { + return Err("cannot send Hermes event via Discord client".into()); + } }; match result { @@ -286,6 +295,9 @@ fn target_rate_limit_key(target: &SinkTarget) -> String { SinkTarget::DiscordChannel(channel_id) => format!("discord:channel:{channel_id}"), SinkTarget::DiscordWebhook(webhook_url) => format!("discord:webhook:{webhook_url}"), SinkTarget::SlackWebhook(webhook_url) => format!("slack:webhook:{webhook_url}"), + SinkTarget::OpenClaw => "openclaw:gateway".to_string(), + SinkTarget::IyenSystem => "iyensystem:event".to_string(), + SinkTarget::Hermes => "hermes:gateway".to_string(), } } diff --git a/src/dispatch.rs b/src/dispatch.rs index 98a9277..7f858f7 100644 --- a/src/dispatch.rs +++ b/src/dispatch.rs @@ -145,6 +145,17 @@ impl Dispatcher { } }; + eprintln!( + "clawhip dispatch: event={} deliveries={} sinks={}", + event.canonical_kind(), + deliveries.len(), + deliveries + .iter() + .map(|d| d.sink.as_str()) + .collect::>() + .join(",") + ); + for delivery in deliveries { if self.should_batch_routine_delivery(&event, &delivery) && let Some(routine_batcher) = self.routine_batcher.as_mut() @@ -720,6 +731,9 @@ fn sink_target_key(target: &SinkTarget) -> String { SinkTarget::DiscordChannel(channel) => format!("discord-channel:{channel}"), SinkTarget::DiscordWebhook(webhook) => format!("discord-webhook:{webhook}"), SinkTarget::SlackWebhook(webhook) => format!("slack-webhook:{webhook}"), + SinkTarget::OpenClaw => "openclaw".to_string(), + SinkTarget::IyenSystem => "iyensystem".to_string(), + SinkTarget::Hermes => "hermes".to_string(), } } diff --git a/src/event/compat.rs b/src/event/compat.rs index a03c40b..3370083 100644 --- a/src/event/compat.rs +++ b/src/event/compat.rs @@ -507,6 +507,7 @@ mod tests { "".into(), "open".into(), "https://example.test/pr/48".into(), + "".into(), None, ); let merged = IncomingEvent::github_pr_status_changed( @@ -516,6 +517,7 @@ mod tests { "open".into(), "merged".into(), "https://example.test/pr/48".into(), + "".into(), None, ); diff --git a/src/events.rs b/src/events.rs index bd8fd55..66d0350 100644 --- a/src/events.rs +++ b/src/events.rs @@ -292,6 +292,35 @@ impl IncomingEvent { } } + pub fn github_issue_opened_rich( + repo: String, + number: u64, + title: String, + html_url: Option, + labels: Vec, + body_preview: Option, + channel: Option, + ) -> Self { + let mut p = json!({ "repo": repo, "number": number, "title": title }); + if let Some(url) = html_url { + p["html_url"] = json!(url); + } + if !labels.is_empty() { + p["labels"] = json!(labels); + } + if let Some(body) = body_preview { + p["body_preview"] = json!(body); + } + Self { + kind: "github.issue-opened".to_string(), + channel, + mention: None, + format: None, + template: None, + payload: p, + } + } + pub fn github_issue_commented( repo: String, number: u64, @@ -325,6 +354,70 @@ impl IncomingEvent { } } + /// Emitted when a label was added to an issue between two polls. + /// Payload shape mirrors what IYENsystem's `SafetyPolicy::label_trigger_allowed` + /// reads: top-level `repo`/`number`, plus `issue.title`, `label.name`, and + /// `sender.login` so the gate can validate (actor, label) pairs without + /// re-fetching from GitHub. + pub fn github_issues_labeled( + repo: String, + number: u64, + title: String, + label: String, + sender_login: Option, + channel: Option, + ) -> Self { + let mut payload = json!({ + "repo": repo, + "number": number, + "issue": { "title": title }, + "label": { "name": label }, + }); + if let Some(login) = sender_login { + payload["sender"] = json!({ "login": login }); + } + Self { + kind: "github.issues-labeled".to_string(), + channel, + mention: None, + format: None, + template: None, + payload, + } + } + + #[allow(clippy::too_many_arguments)] + pub fn github_pr_review_submitted( + repo: String, + pr_number: u64, + pr_title: String, + review_state: String, + review_body: Option, + sender_login: Option, + channel: Option, + ) -> Self { + let mut payload = json!({ + "repo": repo, + "number": pr_number, + "pr": { "title": pr_title }, + "review": { + "state": review_state, + "body": review_body.unwrap_or_default(), + }, + }); + if let Some(login) = sender_login { + payload["sender"] = json!({ "login": login }); + } + Self { + kind: "github.pr-review-submitted".to_string(), + channel, + mention: None, + format: None, + template: None, + payload, + } + } + pub fn git_commit( repo: String, branch: String, @@ -417,6 +510,7 @@ impl IncomingEvent { } } + #[allow(clippy::too_many_arguments)] pub fn github_pr_status_changed( repo: String, number: u64, @@ -424,6 +518,7 @@ impl IncomingEvent { old_status: String, new_status: String, url: String, + body: String, channel: Option, ) -> Self { Self { @@ -439,6 +534,7 @@ impl IncomingEvent { "old_status": old_status, "new_status": new_status, "url": url, + "body": body, }), } } @@ -1745,7 +1841,7 @@ mod tests { ); assert_eq!( event.render_default(&MessageFormat::Alert).unwrap(), - "๐Ÿšจ CI failed ยท clawhip#58 ยท CI / test ยท failure ยท abcdef1 ยท https://github.com/Yeachan-Heo/clawhip/actions/runs/1" + "๐Ÿšจ **GitHub CI failed**\nTarget: clawhip#58\nWorkflow: CI / test\nStatus: `failure`\nCommit: `abcdef1`\nURL: https://github.com/Yeachan-Heo/clawhip/actions/runs/1" ); assert_eq!(event.channel.as_deref(), Some("alerts")); } @@ -1771,7 +1867,7 @@ mod tests { ); assert_eq!( event.render_default(&MessageFormat::Alert).unwrap(), - "๐Ÿšจ CI started ยท clawhip#58 ยท CI / test ยท in_progress ยท abcdef1 ยท https://github.com/Yeachan-Heo/clawhip/actions/runs/1" + "๐Ÿšจ **GitHub CI started**\nTarget: clawhip#58\nWorkflow: CI / test\nStatus: `in_progress`\nCommit: `abcdef1`\nURL: https://github.com/Yeachan-Heo/clawhip/actions/runs/1" ); } diff --git a/src/hooks/mod.rs b/src/hooks/mod.rs index 1957ee2..accc605 100644 --- a/src/hooks/mod.rs +++ b/src/hooks/mod.rs @@ -292,25 +292,22 @@ mod tests { }) .expect("install"); + let root = dir.path().canonicalize().expect("canonical tempdir"); + assert!(report.generated_files.contains(&root.join(HOOK_SCRIPT))); assert!( report .generated_files - .contains(&dir.path().join(HOOK_SCRIPT)) + .contains(&root.join(CLAWHIP_PROJECT_FILE)) ); assert!( report .generated_files - .contains(&dir.path().join(CLAWHIP_PROJECT_FILE)) + .contains(&root.join(CODEX_HOOKS_FILE)) ); assert!( report .generated_files - .contains(&dir.path().join(CODEX_HOOKS_FILE)) - ); - assert!( - report - .generated_files - .contains(&dir.path().join(CLAUDE_SETTINGS_FILE)) + .contains(&root.join(CLAUDE_SETTINGS_FILE)) ); } diff --git a/src/main.rs b/src/main.rs index 6f0e423..a199062 100644 --- a/src/main.rs +++ b/src/main.rs @@ -122,7 +122,14 @@ async fn real_main() -> Result<()> { url, channel, } => IncomingEvent::github_pr_status_changed( - repo, number, title, old_status, new_status, url, channel, + repo, + number, + title, + old_status, + new_status, + url, + "".into(), + channel, ), }; send_incoming_event(&client, event).await diff --git a/src/render/default.rs b/src/render/default.rs index e08c56d..0978e5d 100644 --- a/src/render/default.rs +++ b/src/render/default.rs @@ -19,6 +19,9 @@ impl Renderer for DefaultRenderer { if event.canonical_kind().starts_with("workspace.") { return render_workspace_event(event.canonical_kind(), payload, format); } + if event.canonical_kind().starts_with("opencode.") { + return render_opencode_event(event.canonical_kind(), payload, format); + } if event.canonical_kind() == "git.commit" && let Some(rendered) = render_aggregated_git_commit(payload, format)? { @@ -72,18 +75,61 @@ impl Renderer for DefaultRenderer { | ("agent.finished", MessageFormat::Raw) | ("agent.failed", MessageFormat::Raw) => serde_json::to_string_pretty(payload)?, - ("github.issue-opened", MessageFormat::Compact) => format!( - "{}#{} opened: {}", - string_field(payload, "repo")?, - payload.field_u64("number")?, - string_field(payload, "title")? - ), - ("github.issue-opened", MessageFormat::Alert) => format!( - "๐Ÿšจ GitHub issue opened in {}: #{} {}", - string_field(payload, "repo")?, - payload.field_u64("number")?, - string_field(payload, "title")? - ), + ("github.issue-opened", MessageFormat::Compact) => { + let repo = string_field(payload, "repo")?; + let number = payload.field_u64("number")?; + let title = string_field(payload, "title")?; + let labels = payload + .get("labels") + .and_then(|v| v.as_array()) + .map(|arr| { + arr.iter() + .filter_map(|v| v.as_str()) + .map(|s| format!("`{s}`")) + .collect::>() + .join(" ") + }) + .unwrap_or_default(); + let url = payload + .get("html_url") + .and_then(|v| v.as_str()) + .unwrap_or(""); + let link = if url.is_empty() { + format!("**{repo}#{number}**") + } else { + format!("[**{repo}#{number}**]({url})") + }; + let label_str = if labels.is_empty() { + String::new() + } else { + format!(" {labels}") + }; + format!("๐Ÿ†• {link} {title}{label_str}") + } + ("github.issue-opened", MessageFormat::Alert) => { + let repo = string_field(payload, "repo")?; + let number = payload.field_u64("number")?; + let title = string_field(payload, "title")?; + let url = payload + .get("html_url") + .and_then(|v| v.as_str()) + .unwrap_or(""); + let body = payload + .get("body_preview") + .and_then(|v| v.as_str()) + .unwrap_or(""); + let link = if url.is_empty() { + format!("{repo}#{number}") + } else { + format!("[{repo}#{number}]({url})") + }; + let body_str = if body.is_empty() { + String::new() + } else { + format!("\n> {body}") + }; + format!("๐Ÿšจ **New Issue** {link}\n**{title}**{body_str}") + } ("github.issue-opened", MessageFormat::Inline) => format!( "[GitHub] {}#{} {}", string_field(payload, "repo")?, @@ -92,16 +138,17 @@ impl Renderer for DefaultRenderer { ), ("github.issue-opened", MessageFormat::Raw) => serde_json::to_string_pretty(payload)?, ("github.issue-commented", MessageFormat::Compact) => format!( - "{}#{} commented ({} comments): {}", + "๐Ÿ’ฌ {}#{} commented ({} comments): {}", string_field(payload, "repo")?, payload.field_u64("number")?, payload.field_u64("comments")?, string_field(payload, "title")? ), ("github.issue-commented", MessageFormat::Alert) => format!( - "๐Ÿšจ GitHub issue commented in {}: #{} {}", + "๐Ÿšจ **GitHub issue commented**\nRepo: `{}`\nIssue: #{}\nComments: {}\nTitle: {}", string_field(payload, "repo")?, payload.field_u64("number")?, + payload.field_u64("comments")?, string_field(payload, "title")? ), ("github.issue-commented", MessageFormat::Inline) => format!( @@ -114,13 +161,13 @@ impl Renderer for DefaultRenderer { serde_json::to_string_pretty(payload)? } ("github.issue-closed", MessageFormat::Compact) => format!( - "{}#{} closed: {}", + "โœ… {}#{} closed: {}", string_field(payload, "repo")?, payload.field_u64("number")?, string_field(payload, "title")? ), ("github.issue-closed", MessageFormat::Alert) => format!( - "๐Ÿšจ GitHub issue closed in {}: #{} {}", + "๐Ÿšจ **GitHub issue closed**\nRepo: `{}`\nIssue: #{}\nState: closed\nTitle: {}", string_field(payload, "repo")?, payload.field_u64("number")?, string_field(payload, "title")? @@ -174,29 +221,52 @@ impl Renderer for DefaultRenderer { ), ("git.branch-changed", MessageFormat::Raw) => serde_json::to_string_pretty(payload)?, - ("github.pr-status-changed", MessageFormat::Compact) => format!( - "PR {}#{} {} -> {}: {}", - string_field(payload, "repo")?, - payload.field_u64("number")?, - string_field(payload, "old_status")?, - string_field(payload, "new_status")?, - string_field(payload, "title")? - ), - ("github.pr-status-changed", MessageFormat::Alert) => format!( - "๐Ÿšจ PR status changed in {}: #{} {} -> {} ({})", - string_field(payload, "repo")?, - payload.field_u64("number")?, - string_field(payload, "old_status")?, - string_field(payload, "new_status")?, - string_field(payload, "title")? - ), - ("github.pr-status-changed", MessageFormat::Inline) => format!( - "[PR {}#{}] {} -> {}", - string_field(payload, "repo")?, - payload.field_u64("number")?, - string_field(payload, "old_status")?, - string_field(payload, "new_status")? - ), + ("github.pr-status-changed", MessageFormat::Compact) => { + let repo = string_field(payload, "repo")?; + let number = payload.field_u64("number")?; + let old_status = string_field(payload, "old_status")?; + let new_status = string_field(payload, "new_status")?; + let title = string_field(payload, "title")?; + let body = payload.get("body").and_then(Value::as_str).unwrap_or(""); + let url = optional_string_field(payload, "url"); + let target = github_target(&repo, number, url.as_deref(), false); + let fixes = extract_fix_suffix(body); + format!("๐Ÿ”€ PR {target} {old_status} -> {new_status}: {title}{fixes}") + } + ("github.pr-status-changed", MessageFormat::Alert) => { + let repo = string_field(payload, "repo")?; + let number = payload.field_u64("number")?; + let old_status = string_field(payload, "old_status")?; + let new_status = string_field(payload, "new_status")?; + let title = string_field(payload, "title")?; + let body = payload.get("body").and_then(Value::as_str).unwrap_or(""); + let url = optional_string_field(payload, "url"); + let fixes = extract_fix_suffix(body); + let mut lines = vec![ + "๐Ÿšจ **GitHub PR status changed**".to_string(), + format!("Repo: `{repo}`"), + format!("PR: #{}", number), + format!("Status: `{old_status}` โ†’ `{new_status}`"), + format!("Title: {title}"), + ]; + if let Some(url) = url { + lines.push(format!("URL: {url}")); + } + if !fixes.is_empty() { + lines.push(fixes.trim().to_string()); + } + lines.join("\n") + } + ("github.pr-status-changed", MessageFormat::Inline) => { + let repo = string_field(payload, "repo")?; + let number = payload.field_u64("number")?; + let old_status = string_field(payload, "old_status")?; + let new_status = string_field(payload, "new_status")?; + let title = string_field(payload, "title")?; + let body = payload.get("body").and_then(Value::as_str).unwrap_or(""); + let fixes = extract_fix_suffix(body); + format!("[PR {repo}#{number}] {old_status} -> {new_status}: {title}{fixes}") + } ("github.pr-status-changed", MessageFormat::Raw) => { serde_json::to_string_pretty(payload)? } @@ -214,10 +284,7 @@ impl Renderer for DefaultRenderer { | "github.ci-passed" | "github.ci-cancelled", MessageFormat::Alert, - ) => format!( - "๐Ÿšจ {}", - render_github_ci(payload, event.canonical_kind(), true)? - ), + ) => render_github_ci_alert(payload, event.canonical_kind())?, ( "github.ci-started" | "github.ci-failed" @@ -568,6 +635,18 @@ fn tmux_identity(payload: &Value) -> Option { } } +fn github_target(repo: &str, number: u64, url: Option<&str>, bold: bool) -> String { + let label = if bold { + format!("**{repo}#{number}**") + } else { + format!("{repo}#{number}") + }; + match url.filter(|url| !url.is_empty()) { + Some(url) => format!("[{label}]({url})"), + None => label, + } +} + fn render_github_ci(payload: &Value, kind: &str, include_url: bool) -> Result { if payload .get("batched") @@ -597,6 +676,36 @@ fn render_github_ci(payload: &Value, kind: &str, include_url: bool) -> Result Result { + if payload + .get("batched") + .and_then(Value::as_bool) + .unwrap_or(false) + { + return Ok(format!( + "๐Ÿšจ {}", + render_batched_github_ci(payload, kind, true)? + )); + } + + let workflow = string_field(payload, "workflow")?; + let state = optional_string_field(payload, "conclusion") + .or_else(|| optional_string_field(payload, "status")) + .ok_or_else(|| "missing GitHub CI state".to_string())?; + let sha = short_sha(&string_field(payload, "sha")?); + let mut lines = vec![ + format!("๐Ÿšจ **GitHub CI {}**", github_ci_action(kind)), + format!("Target: {}", github_ci_target(payload)?), + format!("Workflow: {workflow}"), + format!("Status: `{state}`"), + format!("Commit: `{sha}`"), + ]; + if let Some(url) = optional_string_field(payload, "url") { + lines.push(format!("URL: {url}")); + } + Ok(lines.join("\n")) +} + fn render_batched_github_ci(payload: &Value, kind: &str, include_url: bool) -> Result { let jobs = payload .get("jobs") @@ -719,6 +828,33 @@ fn short_sha(sha: &str) -> String { sha.chars().take(7).collect() } +fn extract_fix_suffix(body: &str) -> String { + let bytes = body.as_bytes(); + let mut i = 0usize; + let mut nums: Vec = Vec::new(); + while i < bytes.len() { + if bytes[i] == b'#' { + i += 1; + let start = i; + while i < bytes.len() && (bytes[i] as char).is_ascii_digit() { + i += 1; + } + if i > start + && let Ok(num) = std::str::from_utf8(&bytes[start..i]).map(|s| s.to_string()) + { + nums.push(num); + } + } else { + i += 1; + } + } + if nums.is_empty() { + String::new() + } else { + format!(" (fixes #{})", nums.join(", #")) + } +} + fn git_repo_label(payload: &Value) -> Result { let repo = string_field(payload, "repo")?; Ok(match worktree_display_name(payload) { @@ -887,6 +1023,50 @@ fn render_workspace_event(kind: &str, payload: &Value, format: &MessageFormat) - } } +fn render_opencode_event(kind: &str, payload: &Value, format: &MessageFormat) -> Result { + let session_id = optional_string_field(payload, "session_id").unwrap_or_default(); + let title = optional_string_field(payload, "title").unwrap_or_default(); + let summary = optional_string_field(payload, "summary").unwrap_or_else(|| kind.to_string()); + + let emoji = match kind { + "opencode.session.created" => "๐ŸŸข", + "opencode.session.ended" => "๐Ÿ”ด", + "opencode.session.idle" => "๐Ÿ’ค", + "opencode.message.assistant" => "๐Ÿค–", + "opencode.message.tool" => "๐Ÿ”ง", + _ => "๐Ÿ“‹", + }; + + let short_id = if session_id.len() > 12 { + &session_id[..12] + } else { + &session_id + }; + let kind_label = kind.strip_prefix("opencode.").unwrap_or(kind); + + match format { + MessageFormat::Compact => { + let mut lines = vec![format!("{emoji} **{kind_label}**")]; + if !title.is_empty() { + lines.push(format!("๐Ÿ“‹ {title}")); + } + lines.push(format!("๐Ÿ†” `{short_id}`")); + Ok(lines.join("\n")) + } + MessageFormat::Alert => { + let mut lines = vec![format!("{emoji} **{kind_label}**")]; + if !title.is_empty() { + lines.push(format!("๐Ÿ“‹ {title}")); + } + lines.push(format!("๐Ÿ†” `{short_id}`")); + lines.push(summary); + Ok(lines.join("\n")) + } + MessageFormat::Inline => Ok(format!("{emoji} [{short_id}] {summary}")), + MessageFormat::Raw => Ok(serde_json::to_string_pretty(payload)?), + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/router.rs b/src/router.rs index 00b0be5..f5c1673 100644 --- a/src/router.rs +++ b/src/router.rs @@ -161,9 +161,13 @@ impl Router { .await?; match delivery.target { SinkTarget::DiscordChannel(channel) => Ok((channel, delivery.format, content)), - SinkTarget::DiscordWebhook(_) | SinkTarget::SlackWebhook(_) => { - Err("matched route uses a webhook instead of a channel".into()) - } + SinkTarget::DiscordWebhook(_) + | SinkTarget::SlackWebhook(_) + | SinkTarget::OpenClaw + | SinkTarget::IyenSystem + | SinkTarget::Hermes => Err( + "matched route uses a webhook or non-channel sink (openclaw / iyensystem / hermes) instead of a channel".into(), + ), } } @@ -231,6 +235,9 @@ impl Router { ) .into() }), + "openclaw" => Ok(SinkTarget::OpenClaw), + "iyensystem" => Ok(SinkTarget::IyenSystem), + "hermes" => Ok(SinkTarget::Hermes), other => Err(format!( "unsupported sink '{other}' for event {}", event.canonical_kind() diff --git a/src/sink/hermes.rs b/src/sink/hermes.rs new file mode 100644 index 0000000..cb43d44 --- /dev/null +++ b/src/sink/hermes.rs @@ -0,0 +1,312 @@ +//! Hermes sink โ€” delivers normalized events to a Hermes Agent gateway as +//! an OpenAI-compatible run request. +//! +//! Hermes plays the same role OpenClawSink plays: it is the *decision* +//! authority for label-driven IYEN workflows. clawhip routes a GitHub +//! event here; Hermes inspects the issue/PR, decides the lane (auto-fix / +//! declined / review / leave-for-human), and applies the GitHub label +//! itself via tool calling. clawhip never receives the decision back โ€” +//! the lane label re-enters the system through GitHub โ†’ clawhip +//! GitHubSource โ†’ IyenSystemSink, exactly like the OpenClaw flow. +//! +//! Endpoint shape: Hermes exposes an OpenAI-compatible HTTP API. We use +//! `POST /v1/runs` because: +//! - it returns `{run_id}` immediately (202) and runs the agent in the +//! background, matching the fire-and-forget contract clawhip sinks +//! use today (compare: OpenClawSink::send drops the response body); +//! - it lets Hermes do tool calling (GitHub label apply) inside the +//! same run rather than forcing clawhip to parse a streaming reply +//! and apply the label itself โ€” which would push clawhip out of its +//! "router only" lane; +//! - it doesn't tie the decision lifetime to the HTTP request โ€” Hermes +//! can take as long as it needs without clawhip holding a TCP slot. +//! +//! What this sink deliberately does NOT do: +//! - parse Hermes's reasoning/decision (we never see it; the label +//! coming back through GitHub *is* the decision) +//! - apply GitHub labels (Hermes does that with its own bot identity, +//! same as OpenClaw) +//! - retry on Hermes errors (best-effort delivery is the dispatch +//! contract; if Hermes is down the route just drops, like OpenClaw) + +use async_trait::async_trait; +use reqwest::Client; +use serde_json::{Value, json}; +use std::time::Duration; + +use crate::Result; + +use super::{Sink, SinkMessage, SinkTarget}; + +/// Default Hermes skill / system prompt name. Hermes uses `instructions` +/// (in Responses API style) or a configured skill to scope toolsets and +/// behavior. We point at an IYEN-specific skill by default; deployments +/// can override via [`HermesSink::with_instructions`]. +const DEFAULT_HERMES_INSTRUCTIONS: &str = + "You are the IYEN triage decider. You receive a normalized GitHub event \ + payload from clawhip. For issue events, decide one of: \ + attach label `iyen:auto-fix` (delegate to IYENsystem to open a PR), \ + attach label `iyen:declined` (post a rejection comment, then close), \ + or take no action (leave for a human). For pull-request events, \ + decide: attach label `iyen:review` (delegate review to IYENsystem) \ + or take no action. Apply the chosen label via your GitHub tool using \ + the hermes-bot identity. Do not paste the user's prompt back; act, \ + then stop."; + +#[derive(Clone)] +pub struct HermesSink { + client: Client, + /// Base URL of the Hermes gateway (e.g. `http://127.0.0.1:8000`). + /// Trailing slash is tolerated โ€” see [`Self::endpoint`]. + base_url: String, + /// Bearer token for the Hermes gateway. Hermes accepts standard + /// `Authorization: Bearer โ€ฆ` for OpenAI-compatible endpoints. + auth_token: String, + /// IYEN-domain instructions / system prompt. Customize per + /// deployment by calling [`Self::with_instructions`] before + /// registering with the dispatcher. + instructions: String, + /// Optional model id. None โ‡’ Hermes uses its configured default + /// (matching `hermes chat` behavior). + model: Option, +} + +impl HermesSink { + pub fn new(base_url: String, auth_token: String) -> Self { + let client = Client::builder() + .connect_timeout(Duration::from_secs(5)) + .timeout(Duration::from_secs(15)) + .build() + .unwrap_or_else(|_| Client::new()); + + Self { + client, + base_url, + auth_token, + instructions: DEFAULT_HERMES_INSTRUCTIONS.to_string(), + model: None, + } + } + + /// Override the IYEN instructions. Useful when the operator ships a + /// custom Hermes skill or wants to point at a different prompt. + pub fn with_instructions(mut self, instructions: impl Into) -> Self { + self.instructions = instructions.into(); + self + } + + /// Pin the model id (e.g. `"openai/gpt-4o"`). When unset, Hermes + /// resolves the default model from its own configuration. + pub fn with_model(mut self, model: impl Into) -> Self { + self.model = Some(model.into()); + self + } + + pub fn is_configured(base_url: &Option, auth_token: &Option) -> bool { + base_url + .as_ref() + .map(|u| !u.trim().is_empty()) + .unwrap_or(false) + && auth_token + .as_ref() + .map(|t| !t.trim().is_empty()) + .unwrap_or(false) + } + + /// Resolve the `/v1/runs` URL, tolerating an optional trailing slash + /// on `base_url`. Mirrors [`super::iyensystem::IyenSystemSink::endpoint`]. + fn endpoint(&self) -> String { + format!("{}/v1/runs", self.base_url.trim_end_matches('/')) + } +} + +/// Build the JSON body for `POST /v1/runs`. Shape matches the OpenAI +/// Responses API: an `instructions` field for the system prompt plus +/// an `input` array of typed message parts. The full clawhip payload is +/// embedded as a JSON string so the model โ€” and any tools it calls โ€” +/// see the same data clawhip routed. +/// +/// Why a string instead of structured JSON in `input`: +/// - Responses API's `input` only accepts text/image content parts, +/// not arbitrary JSON +/// - Hermes's tool-calling code sees the same raw payload OpenClaw +/// would have seen, so the IYEN domain prompt can refer to keys +/// like `repo`, `number`, `action` consistently +fn build_hermes_body(message: &SinkMessage, instructions: &str, model: Option<&str>) -> Value { + let payload_text = serde_json::to_string_pretty(&message.payload) + .unwrap_or_else(|_| message.payload.to_string()); + + let user_text = format!( + "clawhip event: {}\n\nNormalized payload:\n{}\n\nRendered summary:\n{}", + message.event_kind, payload_text, message.content + ); + + let mut body = json!({ + "instructions": instructions, + "input": [ + { + "role": "user", + "content": [ + { "type": "input_text", "text": user_text } + ] + } + ], + // Hermes's /v1/runs returns 202 + run_id and runs the agent in + // the background. We do not stream events back; the *decision* + // re-enters clawhip as a GitHub label change. + "stream": false, + // Carry the trace metadata so Hermes can log/correlate without + // having to parse it out of the user text. + "metadata": { + "clawhip_event_kind": message.event_kind, + "source": "clawhip" + } + }); + + if let Some(model) = model { + body["model"] = json!(model); + } + + body +} + +#[async_trait] +impl Sink for HermesSink { + async fn send(&self, _target: &SinkTarget, message: &SinkMessage) -> Result<()> { + let url = self.endpoint(); + let body = build_hermes_body(message, &self.instructions, self.model.as_deref()); + + eprintln!( + "clawhip hermes sink: event_kind={} url={}", + message.event_kind, url + ); + + let response = self + .client + .post(&url) + .header("Authorization", format!("Bearer {}", self.auth_token)) + .json(&body) + .send() + .await + .map_err(|e| format!("Hermes request to {url} failed: {e}"))?; + + // Hermes returns 202 for accepted background runs; some configs + // may return 200. Anything else (4xx/5xx) is a delivery error. + if !response.status().is_success() { + let status = response.status(); + let body = response + .text() + .await + .unwrap_or_else(|_| "".to_string()); + return Err(format!("Hermes POST /v1/runs failed: {status} โ€” {body}").into()); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::events::MessageFormat; + + fn message(kind: &str, payload: Value) -> SinkMessage { + SinkMessage { + event_kind: kind.into(), + format: MessageFormat::Compact, + content: "rendered summary line".into(), + payload, + } + } + + #[test] + fn is_configured_requires_both_url_and_token() { + assert!(!HermesSink::is_configured(&None, &None)); + assert!(!HermesSink::is_configured( + &Some("http://localhost:8000".into()), + &None + )); + assert!(!HermesSink::is_configured(&None, &Some("tok".into()))); + assert!(HermesSink::is_configured( + &Some("http://localhost:8000".into()), + &Some("tok".into()) + )); + // Whitespace-only values are treated as unset, matching the + // sibling sink behavior. + assert!(!HermesSink::is_configured( + &Some(" ".into()), + &Some("tok".into()) + )); + assert!(!HermesSink::is_configured( + &Some("http://localhost:8000".into()), + &Some("".into()) + )); + } + + #[test] + fn endpoint_strips_trailing_slash_and_appends_v1_runs() { + let sink = HermesSink::new("http://127.0.0.1:8000/".into(), "tok".into()); + assert_eq!(sink.endpoint(), "http://127.0.0.1:8000/v1/runs"); + let sink2 = HermesSink::new("http://127.0.0.1:8000".into(), "tok".into()); + assert_eq!(sink2.endpoint(), "http://127.0.0.1:8000/v1/runs"); + } + + #[test] + fn body_carries_instructions_and_input_text_with_payload() { + let payload = json!({ + "repo": "Org/Repo", + "number": 42, + "title": "broken", + "sender": {"login": "alice"} + }); + let msg = message("github.issue-opened", payload.clone()); + let body = build_hermes_body(&msg, "test instructions", None); + + assert_eq!(body["instructions"], "test instructions"); + assert_eq!(body["stream"], false); + assert_eq!(body["metadata"]["clawhip_event_kind"], "github.issue-opened"); + assert_eq!(body["metadata"]["source"], "clawhip"); + + // `model` is omitted when the caller did not pin one โ€” Hermes + // falls back to its configured default. + assert!(body.get("model").is_none()); + + // The user-text MUST contain the event kind, payload, and + // rendered summary so the agent sees everything the dispatcher + // had access to. + let user_text = body["input"][0]["content"][0]["text"] + .as_str() + .expect("input[0].content[0].text must be a string"); + assert!(user_text.contains("github.issue-opened")); + assert!(user_text.contains("\"repo\": \"Org/Repo\"")); + assert!(user_text.contains("rendered summary line")); + } + + #[test] + fn body_includes_model_when_pinned() { + let msg = message("github.issue-opened", json!({})); + let body = build_hermes_body(&msg, "", Some("openai/gpt-4o")); + assert_eq!(body["model"], "openai/gpt-4o"); + } + + #[test] + fn with_instructions_overrides_default_prompt() { + let sink = HermesSink::new("http://x".into(), "t".into()) + .with_instructions("custom IYEN prompt"); + assert_eq!(sink.instructions, "custom IYEN prompt"); + // The default constant must NOT leak when an override is set โ€” + // otherwise operators couldn't ship custom skills. + assert_ne!(sink.instructions, DEFAULT_HERMES_INSTRUCTIONS); + } + + #[test] + fn default_instructions_mention_iyen_label_set() { + // This is a contract test: the default prompt must reference + // every label IYEN's workflows trigger on. If a new lane label + // is added (e.g. `iyen:hold`), this test forces an update to + // the default instructions. + assert!(DEFAULT_HERMES_INSTRUCTIONS.contains("iyen:auto-fix")); + assert!(DEFAULT_HERMES_INSTRUCTIONS.contains("iyen:declined")); + assert!(DEFAULT_HERMES_INSTRUCTIONS.contains("iyen:review")); + } +} diff --git a/src/sink/iyensystem.rs b/src/sink/iyensystem.rs new file mode 100644 index 0000000..a47b2cf --- /dev/null +++ b/src/sink/iyensystem.rs @@ -0,0 +1,196 @@ +use async_trait::async_trait; +use reqwest::Client; +use serde_json::{Value, json}; +use std::time::Duration; + +use crate::Result; + +use super::{Sink, SinkMessage, SinkTarget}; + +#[derive(Clone)] +pub struct IyenSystemSink { + client: Client, + url: String, + auth_token: String, +} + +impl IyenSystemSink { + pub fn new(url: String, auth_token: String) -> Self { + let client = Client::builder() + .connect_timeout(Duration::from_secs(5)) + .timeout(Duration::from_secs(15)) + .build() + .unwrap_or_else(|_| Client::new()); + + Self { + client, + url, + auth_token, + } + } + + pub fn is_configured(url: &Option, auth_token: &Option) -> bool { + url.as_ref().map(|u| !u.trim().is_empty()).unwrap_or(false) + && auth_token + .as_ref() + .map(|t| !t.trim().is_empty()) + .unwrap_or(false) + } + + fn endpoint(&self) -> String { + format!("{}/event", self.url.trim_end_matches('/')) + } +} + +fn extract_str<'a>(payload: &'a Value, key: &str) -> Option<&'a str> { + payload.get(key).and_then(|v| v.as_str()) +} + +fn extract_u64(payload: &Value, key: &str) -> Option { + payload.get(key).and_then(|v| v.as_u64()) +} + +fn derive_action(event_kind: &str, payload: &Value) -> String { + if let Some(action) = extract_str(payload, "action") { + return action.to_string(); + } + match event_kind.rsplit_once('.') { + Some((_, suffix)) => suffix.replace('-', "_"), + None => event_kind.replace('-', "_"), + } +} + +fn build_iyensystem_body(message: &SinkMessage) -> Value { + let repo = extract_str(&message.payload, "repo") + .unwrap_or("") + .to_string(); + let number = extract_u64(&message.payload, "number").unwrap_or(0); + let action = derive_action(&message.event_kind, &message.payload); + + json!({ + "event_type": message.event_kind, + "repo": repo, + "number": number, + "action": action, + "payload": message.payload, + }) +} + +#[async_trait] +impl Sink for IyenSystemSink { + async fn send(&self, _target: &SinkTarget, message: &SinkMessage) -> Result<()> { + let url = self.endpoint(); + let body = build_iyensystem_body(message); + + eprintln!( + "clawhip iyensystem sink: event_kind={} url={}", + message.event_kind, url + ); + + let response = self + .client + .post(&url) + .header("Authorization", format!("Bearer {}", self.auth_token)) + .json(&body) + .send() + .await + .map_err(|e| format!("IyenSystem request to {url} failed: {e}"))?; + + if !response.status().is_success() { + let status = response.status(); + let body = response + .text() + .await + .unwrap_or_else(|_| "".to_string()); + return Err(format!("IyenSystem POST /event failed: {status} โ€” {body}").into()); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::events::MessageFormat; + + fn message(kind: &str, payload: Value) -> SinkMessage { + SinkMessage { + event_kind: kind.into(), + format: MessageFormat::Compact, + content: "rendered".into(), + payload, + } + } + + #[test] + fn is_configured_requires_both_url_and_token() { + assert!(!IyenSystemSink::is_configured(&None, &None)); + assert!(!IyenSystemSink::is_configured( + &Some("http://localhost".into()), + &None + )); + assert!(!IyenSystemSink::is_configured(&None, &Some("token".into()))); + assert!(IyenSystemSink::is_configured( + &Some("http://localhost".into()), + &Some("token".into()) + )); + assert!(!IyenSystemSink::is_configured( + &Some(" ".into()), + &Some("token".into()) + )); + assert!(!IyenSystemSink::is_configured( + &Some("http://localhost".into()), + &Some("".into()) + )); + } + + #[test] + fn endpoint_strips_trailing_slash() { + let sink = IyenSystemSink::new("http://127.0.0.1:25295/".into(), "tok".into()); + assert_eq!(sink.endpoint(), "http://127.0.0.1:25295/event"); + let sink2 = IyenSystemSink::new("http://127.0.0.1:25295".into(), "tok".into()); + assert_eq!(sink2.endpoint(), "http://127.0.0.1:25295/event"); + } + + #[test] + fn body_carries_event_type_repo_number_action_and_full_payload() { + let payload = json!({ + "repo": "Org/Repo", + "number": 42, + "title": "broken", + "sender": {"login": "openclaw-bot"}, + }); + let msg = message("github.issue-opened", payload.clone()); + let body = build_iyensystem_body(&msg); + + assert_eq!(body["event_type"], "github.issue-opened"); + assert_eq!(body["repo"], "Org/Repo"); + assert_eq!(body["number"], 42); + assert_eq!(body["action"], "issue_opened"); + assert_eq!(body["payload"], payload); + } + + #[test] + fn explicit_action_in_payload_overrides_derived_action() { + let payload = json!({ + "repo": "Org/Repo", + "number": 7, + "action": "labeled", + "label": {"name": "iyen:auto-fix"}, + }); + let msg = message("github.issues-labeled", payload); + let body = build_iyensystem_body(&msg); + assert_eq!(body["action"], "labeled"); + } + + #[test] + fn missing_repo_and_number_default_to_safe_values() { + let payload = json!({}); + let msg = message("custom.heartbeat", payload); + let body = build_iyensystem_body(&msg); + assert_eq!(body["repo"], ""); + assert_eq!(body["number"], 0); + assert_eq!(body["action"], "heartbeat"); + } +} diff --git a/src/sink/mod.rs b/src/sink/mod.rs index d9eeea0..500c0dc 100644 --- a/src/sink/mod.rs +++ b/src/sink/mod.rs @@ -1,4 +1,7 @@ pub mod discord; +pub mod hermes; +pub mod iyensystem; +pub mod openclaw; pub mod slack; use async_trait::async_trait; @@ -8,6 +11,9 @@ use crate::events::MessageFormat; use serde_json::Value; pub use discord::DiscordSink; +pub use hermes::HermesSink; +pub use iyensystem::IyenSystemSink; +pub use openclaw::OpenClawSink; pub use slack::SlackSink; #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -15,6 +21,12 @@ pub enum SinkTarget { DiscordChannel(String), DiscordWebhook(String), SlackWebhook(String), + OpenClaw, + IyenSystem, + /// Hermes Agent gateway โ€” peer to OpenClaw as a decision authority + /// for IYEN label-driven lanes. Carries no per-target data because + /// the gateway URL/token live in `[providers.hermes]` config. + Hermes, } #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/src/sink/openclaw.rs b/src/sink/openclaw.rs new file mode 100644 index 0000000..c6c6f60 --- /dev/null +++ b/src/sink/openclaw.rs @@ -0,0 +1,163 @@ +use async_trait::async_trait; +use reqwest::Client; +use serde_json::json; +use std::time::Duration; + +use crate::Result; + +use super::{Sink, SinkMessage, SinkTarget}; + +#[derive(Clone)] +pub struct OpenClawSink { + client: Client, + gateway_url: String, + gateway_token: String, +} + +impl OpenClawSink { + pub fn new(gateway_url: String, gateway_token: String) -> Self { + let client = Client::builder() + .connect_timeout(Duration::from_secs(5)) + .timeout(Duration::from_secs(15)) + .build() + .unwrap_or_else(|_| Client::new()); + + Self { + client, + gateway_url, + gateway_token, + } + } + + pub fn is_configured(gateway_url: &Option, gateway_token: &Option) -> bool { + gateway_url + .as_ref() + .map(|u| !u.trim().is_empty()) + .unwrap_or(false) + && gateway_token + .as_ref() + .map(|t| !t.trim().is_empty()) + .unwrap_or(false) + } +} + +impl OpenClawSink { + /// Determine the hooks path based on event kind. + /// PR events go to /hooks/pr-review (agent action), + /// everything else goes to /hooks/wake (wake action). + fn hooks_path_for_event(event_kind: &str, payload: &serde_json::Value) -> &'static str { + // Check direct event kind + if event_kind.contains("pr-status-changed") { + return "/hooks/pr-review"; + } + if event_kind.contains("issue-opened") { + return "/hooks/issue-triage"; + } + // Check batched event_kinds array + if let Some(kinds) = payload.get("event_kinds").and_then(|v| v.as_array()) { + for kind in kinds { + if let Some(s) = kind.as_str() { + if s.contains("pr-status-changed") { + return "/hooks/pr-review"; + } + if s.contains("issue-opened") { + return "/hooks/issue-triage"; + } + } + } + } + "/hooks/wake" + } +} + +#[async_trait] +impl Sink for OpenClawSink { + async fn send(&self, _target: &SinkTarget, message: &SinkMessage) -> Result<()> { + let hooks_path = Self::hooks_path_for_event(&message.event_kind, &message.payload); + + let url = format!("{}{}", self.gateway_url.trim_end_matches('/'), hooks_path); + + let body = if hooks_path == "/hooks/pr-review" { + // Send structured JSON so messageTemplate can use {{repo}}, {{number}}, etc. + let mut pr_body = json!({ + "text": message.content, + "content": message.content, + "mode": "now" + }); + // Copy payload fields (repo, number, title, etc.) to top level + if let Some(obj) = message.payload.as_object() { + for (k, v) in obj { + pr_body[k] = v.clone(); + } + } + // Also check batched payloads for event_kinds + if let Some(kinds) = message + .payload + .get("event_kinds") + .and_then(|v| v.as_array()) + { + pr_body["event_kinds"] = json!(kinds); + } + pr_body + } else { + json!({ + "text": format!( + "[clawhip:{}] {}\n\nPayload: {}", + message.event_kind, + message.content, + serde_json::to_string_pretty(&message.payload).unwrap_or_default() + ), + "mode": "now" + }) + }; + + // Log which hooks path is being used + eprintln!( + "clawhip openclaw sink: event_kind={} hooks_path={} url={}", + message.event_kind, hooks_path, url + ); + + let response = self + .client + .post(&url) + .header("Authorization", format!("Bearer {}", self.gateway_token)) + .json(&body) + .send() + .await + .map_err(|e| format!("OpenClaw request to {hooks_path} failed: {e}"))?; + + if !response.status().is_success() { + let status = response.status(); + let body = response + .text() + .await + .unwrap_or_else(|_| "".to_string()); + return Err(format!("OpenClaw {hooks_path} failed: {status} โ€” {body}").into()); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn is_configured_requires_both_url_and_token() { + assert!(!OpenClawSink::is_configured(&None, &None)); + assert!(!OpenClawSink::is_configured( + &Some("http://localhost".into()), + &None + )); + assert!(!OpenClawSink::is_configured(&None, &Some("token".into()))); + assert!(OpenClawSink::is_configured( + &Some("http://localhost".into()), + &Some("token".into()) + )); + assert!(!OpenClawSink::is_configured( + &Some("".into()), + &Some("token".into()) + )); + } +} diff --git a/src/slack.rs b/src/slack.rs index 5888a34..ecfd166 100644 --- a/src/slack.rs +++ b/src/slack.rs @@ -19,9 +19,13 @@ impl SlackClient { pub async fn send(&self, target: &SinkTarget, message: &SinkMessage) -> Result<()> { match target { SinkTarget::SlackWebhook(webhook_url) => self.send_webhook(webhook_url, message).await, - SinkTarget::DiscordChannel(_) | SinkTarget::DiscordWebhook(_) => { - Err("cannot send Discord target via Slack client".into()) - } + SinkTarget::DiscordChannel(_) + | SinkTarget::DiscordWebhook(_) + | SinkTarget::OpenClaw + | SinkTarget::IyenSystem + | SinkTarget::Hermes => Err( + "cannot send Discord/OpenClaw/IyenSystem/Hermes target via Slack client".into(), + ), } } diff --git a/src/source/git.rs b/src/source/git.rs index f098e81..2bc1c45 100644 --- a/src/source/git.rs +++ b/src/source/git.rs @@ -646,7 +646,14 @@ mod tests { assert_eq!(branch_event.kind, "git.branch-changed"); assert_eq!(branch_event.payload["repo"], "clawhip"); assert_eq!(branch_event.payload["repo_path"], path_str(&root)); - assert_eq!(branch_event.payload["worktree_path"], path_str(&worktree)); + let expected_worktree = canonical_path_string(&worktree); + let expected_worktree_norm = expected_worktree.trim_start_matches("/private").to_string(); + assert_eq!( + branch_event.payload["worktree_path"] + .as_str() + .map(|path| path.trim_start_matches("/private")), + Some(expected_worktree_norm.as_str()) + ); assert_eq!(branch_event.payload["old_branch"], "feat/issue-115"); assert_eq!(branch_event.payload["new_branch"], "feat/issue-115-v2"); assert!(rx.try_recv().is_err()); @@ -660,7 +667,10 @@ mod tests { assert_eq!(commit_event.kind, "git.commit"); assert_eq!(commit_event.payload["repo"], "clawhip"); assert_eq!(commit_event.payload["repo_path"], path_str(&root)); - assert_eq!(commit_event.payload["worktree_path"], path_str(&worktree)); + assert_eq!( + commit_event.payload["worktree_path"], + canonical_path_string(&worktree) + ); assert_eq!(commit_event.payload["branch"], "feat/issue-115-v2"); assert_eq!(commit_event.payload["summary"], "worktree commit"); assert!(rx.try_recv().is_err()); @@ -741,4 +751,11 @@ mod tests { fn path_str(path: &Path) -> &str { path.to_str().unwrap() } + + fn canonical_path_string(path: &Path) -> String { + path.canonicalize() + .unwrap_or_else(|_| path.to_path_buf()) + .to_string_lossy() + .into_owned() + } } diff --git a/src/source/github.rs b/src/source/github.rs index 2b1b07b..bd51a1e 100644 --- a/src/source/github.rs +++ b/src/source/github.rs @@ -58,8 +58,20 @@ impl Source for GitHubSource { struct GitHubRepoState { issues: HashMap, + issues_ready: bool, prs: HashMap, + prs_ready: bool, + pr_reviews: HashMap>, + pr_reviews_ready: bool, ci: HashMap, + ci_ready: bool, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +struct ReviewSnapshot { + state: String, + body: String, + actor: Option, } #[derive(Clone)] @@ -67,6 +79,9 @@ struct IssueSnapshot { title: String, state: String, comments: u64, + html_url: String, + labels: Vec, + body: String, } #[derive(Clone)] @@ -76,6 +91,7 @@ struct PullRequestSnapshot { url: String, head_branch: String, head_sha: String, + body: String, } #[derive(Clone, Debug, PartialEq, Eq)] @@ -154,7 +170,7 @@ async fn poll_github( state: &mut HashMap, ) -> Result<()> { for repo in &config.monitors.git.repos { - if !repo.emit_issue_opened && !repo.emit_pr_status { + if !repo.emit_issue_opened && !repo.emit_pr_status && !repo.emit_pr_reviews { continue; } @@ -170,43 +186,92 @@ async fn poll_github( }; let previous = state.get(&repo.path); - let issues = match poll_issues(config, github_client, repo, &snapshot, previous, tx).await { - Ok(issues) => issues, - Err(error) => { - eprintln!( - "clawhip source GitHub issue processing failed for {}: {error}", - repo.path - ); - previous - .map(|entry| entry.issues.clone()) - .unwrap_or_default() - } - }; - let prs = + let (issues, issues_ready) = + match poll_issues(config, github_client, repo, &snapshot, previous, tx).await { + Ok(result) => result, + Err(error) => { + eprintln!( + "clawhip source GitHub issue processing failed for {}: {error}", + repo.path + ); + ( + previous + .map(|entry| entry.issues.clone()) + .unwrap_or_default(), + previous.map(|entry| entry.issues_ready).unwrap_or(false), + ) + } + }; + let (prs, prs_ready) = match poll_pull_requests(config, github_client, repo, &snapshot, previous, tx).await { - Ok(prs) => prs, + Ok(result) => result, Err(error) => { eprintln!( "clawhip source GitHub pull request processing failed for {}: {error}", repo.path ); - previous.map(|entry| entry.prs.clone()).unwrap_or_default() + ( + previous.map(|entry| entry.prs.clone()).unwrap_or_default(), + previous.map(|entry| entry.prs_ready).unwrap_or(false), + ) } }; - let ci = match poll_ci_statuses(config, github_client, repo, &snapshot, previous, &prs, tx) - .await + let (pr_reviews, pr_reviews_ready) = + match poll_pr_reviews(config, github_client, repo, &snapshot, previous, &prs, tx).await + { + Ok(result) => result, + Err(error) => { + eprintln!( + "clawhip source GitHub PR review processing failed for {}: {error}", + repo.path + ); + ( + previous + .map(|entry| entry.pr_reviews.clone()) + .unwrap_or_default(), + previous + .map(|entry| entry.pr_reviews_ready) + .unwrap_or(false), + ) + } + }; + let (ci, ci_ready) = match poll_ci_statuses( + config, + github_client, + repo, + &snapshot, + previous, + &prs, + tx, + ) + .await { - Ok(ci) => ci, + Ok(result) => result, Err(error) => { eprintln!( "clawhip source GitHub CI processing failed for {}: {error}", repo.path ); - previous.map(|entry| entry.ci.clone()).unwrap_or_default() + ( + previous.map(|entry| entry.ci.clone()).unwrap_or_default(), + previous.map(|entry| entry.ci_ready).unwrap_or(false), + ) } }; - state.insert(repo.path.clone(), GitHubRepoState { issues, prs, ci }); + state.insert( + repo.path.clone(), + GitHubRepoState { + issues, + issues_ready, + prs, + prs_ready, + pr_reviews, + pr_reviews_ready, + ci, + ci_ready, + }, + ); } Ok(()) @@ -219,38 +284,61 @@ async fn poll_issues( snapshot: &GitSnapshot, previous: Option<&GitHubRepoState>, tx: &mpsc::Sender, -) -> Result> { +) -> Result<(HashMap, bool)> { if !repo.emit_issue_opened { - return Ok(previous - .map(|entry| entry.issues.clone()) - .unwrap_or_default()); + return Ok(( + previous + .map(|entry| entry.issues.clone()) + .unwrap_or_default(), + previous.map(|entry| entry.issues_ready).unwrap_or(false), + )); } let Some(client) = github_client else { - return Ok(previous - .map(|entry| entry.issues.clone()) - .unwrap_or_default()); + return Ok(( + previous + .map(|entry| entry.issues.clone()) + .unwrap_or_default(), + previous.map(|entry| entry.issues_ready).unwrap_or(false), + )); }; match fetch_issues(client, &config.monitors.github_api_base, repo, snapshot).await { Ok(issues) => { - if let Some(previous) = previous { - for event in - collect_issue_events(repo, &snapshot.repo_name, &previous.issues, &issues) + if let Some(previous) = previous.filter(|entry| entry.issues_ready) { + for event in collect_issue_events( + client, + &config.monitors.github_api_base, + snapshot.github_repo.as_deref(), + repo, + &snapshot.repo_name, + &previous.issues, + &issues, + ) + .await { send_event(tx, event).await?; } + } else { + eprintln!( + "clawhip source GitHub issue baseline established for {}; suppressing initial {} issue events", + repo.path, + issues.len() + ); } - Ok(issues) + Ok((issues, true)) } Err(error) => { eprintln!( "clawhip source GitHub issue polling failed for {}: {error}", repo.path ); - Ok(previous - .map(|entry| entry.issues.clone()) - .unwrap_or_default()) + Ok(( + previous + .map(|entry| entry.issues.clone()) + .unwrap_or_default(), + previous.map(|entry| entry.issues_ready).unwrap_or(false), + )) } } } @@ -262,18 +350,24 @@ async fn poll_pull_requests( snapshot: &GitSnapshot, previous: Option<&GitHubRepoState>, tx: &mpsc::Sender, -) -> Result> { +) -> Result<(HashMap, bool)> { if !repo.emit_pr_status { - return Ok(previous.map(|entry| entry.prs.clone()).unwrap_or_default()); + return Ok(( + previous.map(|entry| entry.prs.clone()).unwrap_or_default(), + previous.map(|entry| entry.prs_ready).unwrap_or(false), + )); } let Some(client) = github_client else { - return Ok(previous.map(|entry| entry.prs.clone()).unwrap_or_default()); + return Ok(( + previous.map(|entry| entry.prs.clone()).unwrap_or_default(), + previous.map(|entry| entry.prs_ready).unwrap_or(false), + )); }; match fetch_pull_requests(client, &config.monitors.github_api_base, repo, snapshot).await { Ok(prs) => { - if let Some(previous) = previous { + if let Some(previous) = previous.filter(|entry| entry.prs_ready) { for (number, pr) in &prs { match previous.prs.get(number) { Some(old) if old.status == pr.status => {} @@ -288,6 +382,7 @@ async fn poll_pull_requests( .unwrap_or_else(|| "".to_string()), pr.status.clone(), pr.url.clone(), + pr.body.clone(), repo.channel.clone(), ) .with_mention(repo.mention.clone()) @@ -297,17 +392,137 @@ async fn poll_pull_requests( } } } + } else { + eprintln!( + "clawhip source GitHub PR baseline established for {}; suppressing initial {} PR events", + repo.path, + prs.len() + ); } - Ok(prs) + Ok((prs, true)) } Err(error) => { eprintln!( "clawhip source GitHub polling failed for {}: {error}", repo.path ); - Ok(previous.map(|entry| entry.prs.clone()).unwrap_or_default()) + Ok(( + previous.map(|entry| entry.prs.clone()).unwrap_or_default(), + previous.map(|entry| entry.prs_ready).unwrap_or(false), + )) + } + } +} + +async fn poll_pr_reviews( + config: &AppConfig, + github_client: Option<&reqwest::Client>, + repo: &GitRepoMonitor, + snapshot: &GitSnapshot, + previous: Option<&GitHubRepoState>, + prs: &HashMap, + tx: &mpsc::Sender, +) -> Result<(HashMap>, bool)> { + if !repo.emit_pr_reviews { + return Ok(( + previous + .map(|entry| entry.pr_reviews.clone()) + .unwrap_or_default(), + previous + .map(|entry| entry.pr_reviews_ready) + .unwrap_or(false), + )); + } + + let Some(client) = github_client else { + return Ok(( + previous + .map(|entry| entry.pr_reviews.clone()) + .unwrap_or_default(), + previous + .map(|entry| entry.pr_reviews_ready) + .unwrap_or(false), + )); + }; + + let Some(github_repo) = snapshot.github_repo.as_deref() else { + return Ok((HashMap::new(), false)); + }; + + let mut current = HashMap::new(); + let mut fetch_failed = false; + for (number, pr) in prs { + if pr.status == "merged" { + continue; + } + match fetch_pr_reviews( + client, + &config.monitors.github_api_base, + github_repo, + *number, + ) + .await + { + Ok(reviews) => { + current.insert(*number, reviews); + } + Err(error) => { + eprintln!( + "clawhip source GitHub PR review fetch failed for {github_repo}#{number}: {error}" + ); + fetch_failed = true; + if let Some(prev_reviews) = previous.and_then(|entry| entry.pr_reviews.get(number)) + { + current.insert(*number, prev_reviews.clone()); + } + } + } + } + + let baseline_ready = previous + .map(|entry| entry.pr_reviews_ready) + .unwrap_or(false); + if !baseline_ready { + eprintln!( + "clawhip source GitHub PR review baseline established for {}; suppressing initial reviews", + repo.path + ); + return Ok((current, true)); + } + + for (number, reviews) in ¤t { + let Some(pr) = prs.get(number) else { continue }; + let prev_reviews = previous + .and_then(|entry| entry.pr_reviews.get(number)) + .cloned() + .unwrap_or_default(); + for (review_id, review) in reviews { + if prev_reviews.contains_key(review_id) { + continue; + } + send_event( + tx, + IncomingEvent::github_pr_review_submitted( + snapshot.repo_name.clone(), + *number, + pr.title.clone(), + review.state.clone(), + if review.body.is_empty() { + None + } else { + Some(review.body.clone()) + }, + review.actor.clone(), + repo.channel.clone(), + ) + .with_mention(repo.mention.clone()) + .with_format(repo.format.clone()), + ) + .await?; } } + + Ok((current, !fetch_failed)) } async fn poll_ci_statuses( @@ -318,13 +533,19 @@ async fn poll_ci_statuses( previous: Option<&GitHubRepoState>, prs: &HashMap, tx: &mpsc::Sender, -) -> Result> { +) -> Result<(HashMap, bool)> { if !repo.emit_pr_status { - return Ok(previous.map(|entry| entry.ci.clone()).unwrap_or_default()); + return Ok(( + previous.map(|entry| entry.ci.clone()).unwrap_or_default(), + previous.map(|entry| entry.ci_ready).unwrap_or(false), + )); } let Some(client) = github_client else { - return Ok(previous.map(|entry| entry.ci.clone()).unwrap_or_default()); + return Ok(( + previous.map(|entry| entry.ci.clone()).unwrap_or_default(), + previous.map(|entry| entry.ci_ready).unwrap_or(false), + )); }; let open_prs = prs @@ -343,19 +564,28 @@ async fn poll_ci_statuses( .await { Ok(ci) => { - let empty = HashMap::new(); - let previous_ci = previous.map(|entry| &entry.ci).unwrap_or(&empty); - for event in collect_ci_events(repo, &snapshot.repo_name, previous_ci, &ci) { - send_event(tx, event).await?; + if let Some(previous) = previous.filter(|entry| entry.ci_ready) { + for event in collect_ci_events(repo, &snapshot.repo_name, &previous.ci, &ci) { + send_event(tx, event).await?; + } + } else { + eprintln!( + "clawhip source GitHub CI baseline established for {}; suppressing initial {} CI events", + repo.path, + ci.len() + ); } - Ok(ci) + Ok((ci, true)) } Err(error) => { eprintln!( "clawhip source GitHub CI polling failed for {}: {error}", repo.path ); - Ok(previous.map(|entry| entry.ci.clone()).unwrap_or_default()) + Ok(( + previous.map(|entry| entry.ci.clone()).unwrap_or_default(), + previous.map(|entry| entry.ci_ready).unwrap_or(false), + )) } } } @@ -393,7 +623,10 @@ async fn github_get( Ok(response) } -fn collect_issue_events( +async fn collect_issue_events( + client: &reqwest::Client, + api_base: &str, + github_repo: Option<&str>, repo: &GitRepoMonitor, repo_name: &str, previous: &HashMap, @@ -403,10 +636,13 @@ fn collect_issue_events( for (number, issue) in current { match previous.get(number) { None => events.push( - IncomingEvent::github_issue_opened( + IncomingEvent::github_issue_opened_rich( repo_name.to_string(), *number, issue.title.clone(), + Some(issue.html_url.clone()), + issue.labels.clone(), + body_preview(&issue.body), repo.channel.clone(), ) .with_mention(repo.mention.clone()) @@ -438,6 +674,31 @@ fn collect_issue_events( .with_format(repo.format.clone()), ); } + let added: Vec<&String> = issue + .labels + .iter() + .filter(|name| !old.labels.contains(name)) + .collect(); + for label_name in added { + let actor = match github_repo { + Some(gh) => { + fetch_label_actor(client, api_base, gh, *number, label_name).await + } + None => None, + }; + events.push( + IncomingEvent::github_issues_labeled( + repo_name.to_string(), + *number, + issue.title.clone(), + label_name.clone(), + actor, + repo.channel.clone(), + ) + .with_mention(repo.mention.clone()) + .with_format(repo.format.clone()), + ); + } } } } @@ -452,36 +713,27 @@ fn collect_ci_events( ) -> Vec { let mut events = Vec::new(); for (key, ci) in current { - let changed = previous - .get(key) - .map(|old| old.status != ci.status || old.conclusion != ci.conclusion) - .unwrap_or(true); + let Some(old) = previous.get(key) else { + // GitHub's Actions APIs are eventually consistent and may surface + // older completed runs after clawhip restarts, after pagination + // churn, or after a transient API failure. Treating every newly + // discovered terminal run as a fresh event replays stale CI pass/fail + // notifications. Only emit a first-seen CI event while the run is + // still active; terminal pass/fail/cancel notifications require a + // prior observed state transition. + if is_terminal_ci(&ci.status) { + continue; + } + events.push(ci_event(repo, repo_name, ci)); + continue; + }; + + let changed = old.status != ci.status || old.conclusion != ci.conclusion; if !changed { continue; } - let mut event = IncomingEvent::github_ci( - ci.event_kind(), - repo_name.to_string(), - ci.pr_number, - ci.workflow.clone(), - ci.status.clone(), - ci.conclusion.clone(), - ci.sha.clone(), - ci.url.clone(), - ci.branch.clone(), - repo.channel.clone(), - ) - .with_mention(repo.mention.clone()) - .with_format(repo.format.clone()); - if let Some(payload) = event.payload.as_object_mut() { - if let Some(run_id) = &ci.run_id { - payload.insert("run_id".to_string(), json!(run_id)); - } - payload.insert("run_job_count".to_string(), json!(ci.run_job_count)); - payload.insert("run_all_terminal".to_string(), json!(ci.run_all_terminal)); - } - events.push(event); + events.push(ci_event(repo, repo_name, ci)); } events.sort_by(|left, right| { @@ -497,6 +749,87 @@ fn collect_ci_events( events } +fn ci_event(repo: &GitRepoMonitor, repo_name: &str, ci: &GitHubCISnapshot) -> IncomingEvent { + let mut event = IncomingEvent::github_ci( + ci.event_kind(), + repo_name.to_string(), + ci.pr_number, + ci.workflow.clone(), + ci.status.clone(), + ci.conclusion.clone(), + ci.sha.clone(), + ci.url.clone(), + ci.branch.clone(), + repo.channel.clone(), + ) + .with_mention(repo.mention.clone()) + .with_format(repo.format.clone()); + if let Some(payload) = event.payload.as_object_mut() { + if let Some(run_id) = &ci.run_id { + payload.insert("run_id".to_string(), json!(run_id)); + } + payload.insert("run_job_count".to_string(), json!(ci.run_job_count)); + payload.insert("run_all_terminal".to_string(), json!(ci.run_all_terminal)); + } + event +} + +fn is_terminal_ci(status: &str) -> bool { + status == "completed" +} + +/// Look up who applied a specific label to an issue, by scanning the +/// issue's events feed for the most recent `labeled` action carrying +/// that label name. Returns `None` when the call fails or the actor +/// is unknown โ€” callers must NOT block emission on this lookup. +/// +/// The events endpoint returns oldest-first; we walk in reverse so +/// repeated label cycles (label โ†’ unlabel โ†’ label) attribute to the +/// most recent labeler. +async fn fetch_label_actor( + client: &reqwest::Client, + api_base: &str, + github_repo: &str, + issue_number: u64, + label_name: &str, +) -> Option { + #[derive(Deserialize)] + struct IssueEvent { + event: String, + #[serde(default)] + actor: Option, + #[serde(default)] + label: Option, + } + + #[derive(Deserialize)] + struct EventActor { + login: String, + } + + let response = github_get( + client, + api_base, + &format!("repos/{github_repo}/issues/{issue_number}/events"), + &[("per_page", "100")], + &format!("issue events for {github_repo}#{issue_number}"), + ) + .await + .ok()?; + let events: Vec = response.json().await.ok()?; + events + .into_iter() + .rev() + .find(|e| { + e.event == "labeled" + && e.label + .as_ref() + .map(|l| l.name == label_name) + .unwrap_or(false) + }) + .and_then(|e| e.actor.map(|a| a.login)) +} + async fn fetch_issues( client: &reqwest::Client, api_base: &str, @@ -526,6 +859,9 @@ async fn fetch_issues( title: issue.title, state: issue.state, comments: issue.comments, + html_url: issue.html_url, + labels: issue.labels.into_iter().map(|label| label.name).collect(), + body: issue.body, }, ) }) @@ -567,12 +903,54 @@ async fn fetch_pull_requests( url: pull.html_url, head_branch: pull.head.reference, head_sha: pull.head.sha, + body: pull.body, + }, + ) + }) + .collect()) +} + +async fn fetch_pr_reviews( + client: &reqwest::Client, + api_base: &str, + github_repo: &str, + pr_number: u64, +) -> Result> { + let response = github_get( + client, + api_base, + &format!("repos/{github_repo}/pulls/{pr_number}/reviews"), + &[("per_page", "100")], + &format!("PR reviews for {github_repo}#{pr_number}"), + ) + .await?; + let reviews: Vec = response.json().await?; + Ok(reviews + .into_iter() + .filter(|review| !review.state.is_empty() && review.state != "PENDING") + .map(|review| { + ( + review.id, + ReviewSnapshot { + state: normalize_review_state(&review.state), + body: review.body, + actor: review.user.map(|user| user.login), }, ) }) .collect()) } +fn normalize_review_state(raw: &str) -> String { + match raw.to_uppercase().as_str() { + "APPROVED" => "approved".to_string(), + "CHANGES_REQUESTED" => "changes_requested".to_string(), + "COMMENTED" => "commented".to_string(), + "DISMISSED" => "dismissed".to_string(), + other => other.to_lowercase(), + } +} + async fn fetch_ci_statuses( client: &reqwest::Client, api_base: &str, @@ -746,9 +1124,31 @@ struct GitHubIssue { state: String, comments: u64, #[serde(default)] + html_url: String, + #[serde(default)] + body: String, + #[serde(default)] + labels: Vec, + #[serde(default)] pull_request: Option, } +#[derive(Deserialize)] +struct GitHubLabel { + name: String, +} + +fn body_preview(body: &str) -> Option { + let collapsed = body.split_whitespace().collect::>().join(" "); + if collapsed.is_empty() { + None + } else if collapsed.chars().count() > 180 { + Some(collapsed.chars().take(177).collect::() + "โ€ฆ") + } else { + Some(collapsed) + } +} + impl GitHubIssue { fn is_pull_request(&self) -> bool { self.pull_request.is_some() @@ -763,6 +1163,8 @@ struct GitHubPullRequest { html_url: String, merged_at: Option, head: GitHubPullRequestHead, + #[serde(default)] + body: String, } #[derive(Deserialize)] @@ -772,6 +1174,22 @@ struct GitHubPullRequestHead { sha: String, } +#[derive(Deserialize)] +struct GitHubPullRequestReview { + id: u64, + #[serde(default)] + state: String, + #[serde(default)] + body: String, + #[serde(default)] + user: Option, +} + +#[derive(Deserialize)] +struct GitHubReviewUser { + login: String, +} + #[derive(Deserialize)] struct GitHubCheckRunsResponse { check_runs: Vec, @@ -849,11 +1267,24 @@ mod tests { title: "live issue".into(), state: "open".into(), comments: 0, + html_url: "https://example.test/issues/2".into(), + labels: Vec::new(), + body: String::new(), }, )] .into_iter() .collect(); - let events = collect_issue_events(&repo, "clawhip", &previous, ¤t); + let client = build_github_client(None).unwrap(); + let events = collect_issue_events( + &client, + "http://127.0.0.1:1", + None, + &repo, + "clawhip", + &previous, + ¤t, + ) + .await; assert_eq!(events.len(), 1); assert_eq!(events[0].canonical_kind(), "github.issue-opened"); assert_eq!(events[0].payload["repo"], "clawhip"); @@ -886,8 +1317,8 @@ mod tests { assert!(content.contains("live issue")); } - #[test] - fn issue_comment_and_close_events_are_emitted() { + #[tokio::test] + async fn issue_comment_and_close_events_are_emitted() { let repo = GitRepoMonitor { path: "/tmp/clawhip".into(), name: Some("clawhip".into()), @@ -899,6 +1330,9 @@ mod tests { title: "live issue".into(), state: "open".into(), comments: 0, + html_url: String::new(), + labels: Vec::new(), + body: String::new(), }, )] .into_iter() @@ -909,11 +1343,24 @@ mod tests { title: "live issue".into(), state: "closed".into(), comments: 1, + html_url: String::new(), + labels: Vec::new(), + body: String::new(), }, )] .into_iter() .collect(); - let events = collect_issue_events(&repo, "clawhip", &previous, ¤t); + let client = build_github_client(None).unwrap(); + let events = collect_issue_events( + &client, + "http://127.0.0.1:1", + None, + &repo, + "clawhip", + &previous, + ¤t, + ) + .await; assert!( events .iter() @@ -926,6 +1373,136 @@ mod tests { ); } + #[tokio::test] + async fn newly_added_label_emits_issues_labeled_event_with_actor_login() { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let server = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut buf = vec![0_u8; 4096]; + let _ = stream.read(&mut buf).await.unwrap(); + let body = json!([ + { "event": "renamed", "actor": {"login": "human-author"} }, + { "event": "labeled", "label": {"name": "iyen:auto-fix"}, "actor": {"login": "openclaw-bot"} } + ]) + .to_string(); + let response = format!( + "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\n\r\n{}", + body.len(), + body + ); + stream.write_all(response.as_bytes()).await.unwrap(); + }); + + let repo = GitRepoMonitor { + path: "/tmp/clawhip".into(), + ..GitRepoMonitor::default() + }; + let previous = [( + 42_u64, + IssueSnapshot { + title: "bug".into(), + state: "open".into(), + comments: 0, + html_url: String::new(), + labels: vec!["bug".into()], + body: String::new(), + }, + )] + .into_iter() + .collect(); + let current = [( + 42_u64, + IssueSnapshot { + title: "bug".into(), + state: "open".into(), + comments: 0, + html_url: String::new(), + labels: vec!["bug".into(), "iyen:auto-fix".into()], + body: String::new(), + }, + )] + .into_iter() + .collect(); + + let client = build_github_client(None).unwrap(); + let api_base = format!("http://{addr}"); + let events = collect_issue_events( + &client, + &api_base, + Some("Org/Repo"), + &repo, + "Repo", + &previous, + ¤t, + ) + .await; + + server.await.unwrap(); + assert_eq!(events.len(), 1); + let event = &events[0]; + assert_eq!(event.canonical_kind(), "github.issues-labeled"); + assert_eq!(event.payload["repo"], "Repo"); + assert_eq!(event.payload["number"], 42); + assert_eq!(event.payload["label"]["name"], "iyen:auto-fix"); + assert_eq!(event.payload["sender"]["login"], "openclaw-bot"); + assert_eq!(event.payload["issue"]["title"], "bug"); + } + + #[tokio::test] + async fn unchanged_labels_do_not_emit_labeled_event() { + let repo = GitRepoMonitor { + path: "/tmp/clawhip".into(), + ..GitRepoMonitor::default() + }; + let labels = vec!["bug".into(), "iyen:auto-fix".into()]; + let previous = [( + 42_u64, + IssueSnapshot { + title: "bug".into(), + state: "open".into(), + comments: 0, + html_url: String::new(), + labels: labels.clone(), + body: String::new(), + }, + )] + .into_iter() + .collect(); + let current = [( + 42_u64, + IssueSnapshot { + title: "bug".into(), + state: "open".into(), + comments: 0, + html_url: String::new(), + labels, + body: String::new(), + }, + )] + .into_iter() + .collect(); + let client = build_github_client(None).unwrap(); + let events = collect_issue_events( + &client, + "http://127.0.0.1:1", + Some("Org/Repo"), + &repo, + "Repo", + &previous, + ¤t, + ) + .await; + assert!( + !events + .iter() + .any(|e| e.canonical_kind() == "github.issues-labeled"), + "label set unchanged โ†’ no labeled event should be emitted" + ); + } + fn ci_snapshot( pr_number: u64, workflow: &str, @@ -999,18 +1576,14 @@ mod tests { let (tx, mut rx) = mpsc::channel(4); let prs = HashMap::new(); - let ci = poll_ci_statuses(&config, Some(&client), &repo, &snapshot, None, &prs, &tx) - .await - .unwrap(); + let (ci, ci_ready) = + poll_ci_statuses(&config, Some(&client), &repo, &snapshot, None, &prs, &tx) + .await + .unwrap(); assert_eq!(ci.len(), 1); - let event = rx.recv().await.unwrap(); - assert_eq!(event.canonical_kind(), "github.ci-failed"); - assert_eq!(event.payload["repo"], json!("claw-code")); - assert_eq!(event.payload["workflow"], json!("Rust CI")); - assert_eq!(event.payload["branch"], json!("main")); - assert_eq!(event.payload["run_id"], json!("24007460067")); - assert!(event.payload.get("number").is_none()); + assert!(ci_ready); + assert!(rx.try_recv().is_err()); let req = server.await.unwrap(); assert!(req.contains("GET /repos/ultraworkers/claw-code/actions/runs?")); @@ -1097,6 +1670,7 @@ mod tests { url: "https://github.com/org/repo/pull/42".into(), head_branch: "feat/pr".into(), head_sha: "prsha".into(), + body: "PR body".into(), }; let open_prs = vec![(42_u64, &pr)]; @@ -1165,6 +1739,22 @@ mod tests { ); } + #[test] + fn newly_discovered_terminal_ci_state_is_suppressed() { + let repo = GitRepoMonitor { + path: "/tmp/clawhip".into(), + ..GitRepoMonitor::default() + }; + let previous = HashMap::new(); + let current_ci = ci_snapshot(58, "CI / test", "completed", Some("success")); + let current = [(current_ci.dedupe_key(), current_ci)] + .into_iter() + .collect(); + + let events = collect_ci_events(&repo, "clawhip", &previous, ¤t); + assert!(events.is_empty()); + } + #[test] fn unchanged_ci_state_is_suppressed() { let repo = GitRepoMonitor { @@ -1356,4 +1946,53 @@ mod tests { source_task.abort(); let _ = source_task.await; } + + #[test] + fn normalize_review_state_maps_github_uppercase_to_clawhip_lowercase() { + assert_eq!(normalize_review_state("APPROVED"), "approved"); + assert_eq!( + normalize_review_state("CHANGES_REQUESTED"), + "changes_requested" + ); + assert_eq!(normalize_review_state("COMMENTED"), "commented"); + assert_eq!(normalize_review_state("DISMISSED"), "dismissed"); + assert_eq!(normalize_review_state("approved"), "approved"); + } + + #[tokio::test] + async fn fetch_pr_reviews_filters_pending_and_normalizes_state() { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let server = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut buf = vec![0_u8; 4096]; + let _ = stream.read(&mut buf).await.unwrap(); + let body = json!([ + { "id": 1, "state": "APPROVED", "body": "lgtm", "user": {"login": "alice"} }, + { "id": 2, "state": "CHANGES_REQUESTED", "body": "fix this", "user": {"login": "bob"} }, + { "id": 3, "state": "PENDING", "body": "draft", "user": {"login": "carol"} } + ]) + .to_string(); + let response = format!( + "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\n\r\n{}", + body.len(), + body + ); + stream.write_all(response.as_bytes()).await.unwrap(); + }); + + let client = build_github_client(None).unwrap(); + let api_base = format!("http://{addr}"); + let reviews = fetch_pr_reviews(&client, &api_base, "owner/repo", 7) + .await + .unwrap(); + server.await.unwrap(); + + assert_eq!(reviews.len(), 2, "PENDING reviews must be filtered out"); + assert_eq!(reviews.get(&1).unwrap().state, "approved"); + assert_eq!(reviews.get(&2).unwrap().state, "changes_requested"); + assert_eq!(reviews.get(&1).unwrap().actor.as_deref(), Some("alice")); + } } diff --git a/src/source/mod.rs b/src/source/mod.rs index 9322548..e70dc73 100644 --- a/src/source/mod.rs +++ b/src/source/mod.rs @@ -5,11 +5,13 @@ use crate::events::IncomingEvent; pub mod git; pub mod github; +pub mod opencode; pub mod tmux; pub mod workspace; pub use git::GitSource; pub use github::GitHubSource; +pub use opencode::OpenCodeSource; pub use tmux::{ RegisteredTmuxSession, SharedTmuxRegistry, TmuxSource, list_active_tmux_registrations, }; diff --git a/src/source/opencode.rs b/src/source/opencode.rs new file mode 100644 index 0000000..01dbdce --- /dev/null +++ b/src/source/opencode.rs @@ -0,0 +1,378 @@ +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use std::time::Duration; + +use reqwest::Client; +use serde::Deserialize; +use serde_json::{Value, json}; +use tokio::sync::mpsc; +use tokio::time::sleep; + +use crate::Result; +use crate::config::AppConfig; +use crate::events::{IncomingEvent, MessageFormat}; +use crate::source::Source; + +pub struct OpenCodeSource { + config: Arc, +} + +impl OpenCodeSource { + pub fn new(config: Arc) -> Self { + Self { config } + } +} + +#[async_trait::async_trait] +impl Source for OpenCodeSource { + fn name(&self) -> &str { + "opencode" + } + + async fn run(&self, tx: mpsc::Sender) -> Result<()> { + let url = match &self.config.monitors.opencode.url { + Some(url) => url.clone(), + None => return Ok(()), // no config โ†’ silent exit + }; + + let poll_interval = + Duration::from_secs(self.config.monitors.opencode.poll_interval_secs.max(1)); + let idle_threshold = Duration::from_secs(self.config.monitors.opencode.idle_threshold_secs); + let channel = self.config.monitors.opencode.channel.clone(); + let mention = self.config.monitors.opencode.mention.clone(); + let format = self.config.monitors.opencode.format.clone(); + + let client = Client::builder() + .timeout(Duration::from_secs(10)) + .build() + .map_err(|e| format!("opencode http client: {e}"))?; + + let mut state = OpenCodeState::default(); + + loop { + if let Err(e) = poll_opencode( + &client, + &url, + &tx, + &mut state, + idle_threshold, + &channel, + &mention, + &format, + ) + .await + { + eprintln!("clawhip opencode poll error: {e}"); + } + sleep(poll_interval).await; + } + } +} + +#[derive(Default)] +struct OpenCodeState { + known_sessions: HashMap, + idle_alerted: HashSet, + warmed_up: bool, +} + +struct SessionSnapshot { + updated_ms: u64, + message_count: usize, + title: String, +} + +#[derive(Deserialize)] +#[allow(dead_code)] // shape mirrors opencode's `/session` JSON; unused fields kept for forward-compat. +struct SessionInfo { + id: String, + #[serde(default)] + title: String, + #[serde(default)] + time: SessionTime, + #[serde(default)] + summary: Option, +} + +#[derive(Deserialize, Default)] +#[allow(dead_code)] // mirrors opencode's session.time JSON shape. +struct SessionTime { + #[serde(default)] + created: u64, + #[serde(default)] + updated: u64, +} + +#[derive(Deserialize)] +struct SessionMessage { + #[serde(default)] + role: String, + #[serde(default)] + parts: Vec, +} + +#[derive(Deserialize)] +struct MessagePart { + #[serde(rename = "type", default)] + kind: String, + #[serde(default)] + text: Option, + #[serde(rename = "toolInvocation", default)] + tool_invocation: Option, +} + +#[allow(clippy::too_many_arguments)] +async fn poll_opencode( + client: &Client, + base_url: &str, + tx: &mpsc::Sender, + state: &mut OpenCodeState, + idle_threshold: Duration, + channel: &Option, + mention: &Option, + format: &Option, +) -> Result<()> { + let sessions: Vec = client + .get(format!("{base_url}/session")) + .send() + .await + .map_err(|e| format!("opencode list sessions: {e}"))? + .json() + .await + .map_err(|e| format!("opencode parse sessions: {e}"))?; + + let current_ids: HashSet = sessions.iter().map(|s| s.id.clone()).collect(); + + let is_warmup = !state.warmed_up; + + // Detect ended sessions (skip during warmup) + if !is_warmup { + let ended: Vec = state + .known_sessions + .keys() + .filter(|id| !current_ids.contains(*id)) + .cloned() + .collect(); + for id in ended { + let snap = state.known_sessions.remove(&id).unwrap(); + state.idle_alerted.remove(&id); + let event = make_event( + "opencode.session.ended", + json!({ + "session_id": id, + "title": snap.title, + "summary": "opencode session ended", + }), + channel, + mention, + format, + ); + let _ = tx.send(event).await; + } + } + + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + + for session in sessions { + let is_new = !state.known_sessions.contains_key(&session.id); + + if is_new { + state.idle_alerted.remove(&session.id); + + // Only emit created event after warmup + if !is_warmup { + let event = make_event( + "opencode.session.created", + json!({ + "session_id": &session.id, + "title": &session.title, + "summary": format!("new session: {}", session.title), + }), + channel, + mention, + format, + ); + let _ = tx.send(event).await; + } + + // Fetch current message count so we don't replay old messages + let msg_count = fetch_messages(client, base_url, &session.id) + .await + .map(|msgs| msgs.len()) + .unwrap_or(0); + + state.known_sessions.insert( + session.id.clone(), + SessionSnapshot { + updated_ms: session.time.updated, + message_count: msg_count, + title: session.title.clone(), + }, + ); + + // During warmup, mark already-idle sessions so we don't alert + if is_warmup { + let elapsed = now_ms.saturating_sub(session.time.updated); + if elapsed > idle_threshold.as_millis() as u64 { + state.idle_alerted.insert(session.id.clone()); + } + } + } + + let snap = state.known_sessions.get_mut(&session.id).unwrap(); + + // Check for updates โ€” only clear idle alert if there are actually new messages + if session.time.updated > snap.updated_ms { + let mut had_new_messages; + snap.updated_ms = session.time.updated; + snap.title = session.title.clone(); + + // Fetch messages to see what changed + had_new_messages = false; + if let Ok(messages) = fetch_messages(client, base_url, &session.id).await { + let new_count = messages.len(); + if new_count > snap.message_count { + had_new_messages = true; + // Report new messages + for msg in messages.iter().skip(snap.message_count) { + if msg.role == "assistant" { + let text = msg + .parts + .iter() + .filter_map(|p| { + if p.kind == "text" { + p.text.clone() + } else { + None + } + }) + .collect::>() + .join("\n"); + let tools: Vec = msg + .parts + .iter() + .filter_map(|p| { + if p.kind == "tool-invocation" { + p.tool_invocation + .as_ref() + .and_then(|ti| ti.get("toolName")) + .and_then(|v| v.as_str()) + .map(String::from) + } else { + None + } + }) + .collect(); + + if !tools.is_empty() { + let event = make_event( + "opencode.message.tool", + json!({ + "session_id": &session.id, + "title": &session.title, + "tools": tools, + "summary": format!("tools: {}", tools.join(", ")), + }), + channel, + mention, + format, + ); + let _ = tx.send(event).await; + } + + if !text.is_empty() { + let truncated = if text.len() > 200 { + format!("{}โ€ฆ", &text[..200]) + } else { + text.clone() + }; + let event = make_event( + "opencode.message.assistant", + json!({ + "session_id": &session.id, + "title": &session.title, + "text": truncated, + "summary": format!("assistant: {}", if text.len() > 80 { &text[..80] } else { &text }), + }), + channel, + mention, + format, + ); + let _ = tx.send(event).await; + } + } + } + snap.message_count = new_count; + } + } + + // Only clear idle alert if there were actually new messages + if had_new_messages { + state.idle_alerted.remove(&session.id); + } + } + + // Idle detection + let elapsed_ms = now_ms.saturating_sub(snap.updated_ms); + if elapsed_ms > idle_threshold.as_millis() as u64 + && !state.idle_alerted.contains(&session.id) + { + state.idle_alerted.insert(session.id.clone()); + let idle_mins = elapsed_ms / 60_000; + let event = make_event( + "opencode.session.idle", + json!({ + "session_id": &session.id, + "title": &session.title, + "idle_minutes": idle_mins, + "summary": format!("session idle for {}m: {}", idle_mins, session.title), + }), + channel, + mention, + format, + ); + let _ = tx.send(event).await; + } + } + + if is_warmup { + state.warmed_up = true; + eprintln!( + "clawhip opencode warmup complete: {} existing sessions", + state.known_sessions.len() + ); + } + + Ok(()) +} + +async fn fetch_messages( + client: &Client, + base_url: &str, + session_id: &str, +) -> Result> { + let messages: Vec = client + .get(format!("{base_url}/session/{session_id}/message")) + .send() + .await + .map_err(|e| format!("opencode messages: {e}"))? + .json() + .await + .map_err(|e| format!("opencode parse messages: {e}"))?; + Ok(messages) +} + +fn make_event( + kind: &str, + payload: Value, + channel: &Option, + mention: &Option, + format: &Option, +) -> IncomingEvent { + IncomingEvent::workspace(kind.to_string(), payload, channel.clone()) + .with_mention(mention.clone()) + .with_format(format.clone()) +} diff --git a/src/source/workspace.rs b/src/source/workspace.rs index 45401fc..96bce8e 100644 --- a/src/source/workspace.rs +++ b/src/source/workspace.rs @@ -448,6 +448,7 @@ fn diff_workspace_state( other if other.ends_with("idle-notif-cooldown.json") => { diff_idle_notif_state(matched, previous, current) } + ".status-file" | ".close-status" => diff_status_tag_file(matched, previous, current), _ => None, }?; @@ -824,6 +825,47 @@ fn diff_idle_notif_state( )]) } +fn diff_status_tag_file( + matched: &WorkspaceMatch<'_>, + previous: Option<&Value>, + current: Option<&Value>, +) -> Option> { + let current = current?; + let current_str = current.as_str().unwrap_or("").trim(); + let previous_str = previous.and_then(|v| v.as_str()).unwrap_or("").trim(); + if current_str == previous_str || current_str.is_empty() { + return None; + } + + // Match STATUS: or CLOSE: tags + let status = if current_str.contains("STATUS: CONTINUE") + || current_str.contains("CLOSE: CONTINUE") + { + Some("continue") + } else if current_str.contains("STATUS: BLOCKED") || current_str.contains("CLOSE: BLOCKED") { + Some("blocked") + } else if current_str.contains("STATUS: DONE") || current_str.contains("CLOSE: DONE") { + Some("done") + } else { + None + }; + + let status = status?; + let kind = format!("workspace.status.{}.{}", matched.workspace_name, status); + + let mut payload = base_payload(matched, current) + .with_string("summary", Some(format!("agent status: {}", status))) + .into_object(); + payload.insert("content".into(), Value::String(current_str.to_string())); + payload.insert("status".into(), Value::String(status.to_string())); + + Some(vec![workspace_event( + matched, + &kind, + Value::Object(payload), + )]) +} + fn workspace_event(matched: &WorkspaceMatch<'_>, kind: &str, payload: Value) -> IncomingEvent { IncomingEvent::workspace(kind.to_string(), payload, matched.monitor.channel.clone()) .with_mention(matched.monitor.mention.clone()) @@ -841,12 +883,22 @@ fn base_payload(matched: &WorkspaceMatch<'_>, current: &Value) -> PayloadBuilder .with_string("state_family", Some(matched.state_family.clone())) .with_string("state_dir", Some(matched.watch_dir.display().to_string())) .with_string("state_file", Some(matched.state_file.clone())) + .with_string("path", Some(matched.state_file.clone())) .with_string("tool", string_value(current, "tool")) } fn read_json(path: &Path) -> Option { let raw = std::fs::read_to_string(path).ok()?; - serde_json::from_str(&raw).ok() + // Try JSON first; fall back to plain-text String for non-JSON files + // (e.g. .status-file, .close-status) + serde_json::from_str(&raw).ok().or_else(|| { + let trimmed = raw.trim(); + if trimmed.is_empty() { + None + } else { + Some(Value::String(trimmed.to_string())) + } + }) } fn debounce_for_path(config: &AppConfig, path: &Path) -> Duration { diff --git a/src/tmux_wrapper.rs b/src/tmux_wrapper.rs index 0ae3554..38baeca 100644 --- a/src/tmux_wrapper.rs +++ b/src/tmux_wrapper.rs @@ -730,10 +730,20 @@ mod tests { let monitor_args = TmuxMonitorArgs::from_new_args(&args, &config); assert_eq!(monitor_args.channel.as_deref(), Some("metadata-route")); - assert_eq!( - monitor_args.routing.worktree_path.as_deref(), - Some(repo.path().to_string_lossy().as_ref()) - ); + let expected_worktree = repo + .path() + .canonicalize() + .expect("canonical repo") + .to_string_lossy() + .into_owned(); + // Normalize potential macOS /private prefix differences in canonical paths + let expected_worktree_norm = expected_worktree.trim_start_matches("/private").to_string(); + let actual_worktree_norm = monitor_args + .routing + .worktree_path + .as_deref() + .map(|path| path.trim_start_matches("/private")); + assert_eq!(actual_worktree_norm, Some(expected_worktree_norm.as_str())); assert!(monitor_args.routing.repo_name.is_some()); }