From 38e27b12593c6a28eda88feae6e8df75b45c3efd Mon Sep 17 00:00:00 2001 From: dman-os <61868957+dman-os@users.noreply.github.com> Date: Thu, 2 Apr 2026 02:06:05 +0300 Subject: [PATCH 1/4] fix: keyring bug --- Cargo.lock | 428 ++++++++++++++++++++++++++++++++++++ src/daybook_core/Cargo.toml | 17 +- src/daybook_core/rt.rs | 34 ++- src/daybook_core/secrets.rs | 108 ++++----- 4 files changed, 535 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 30de7575..176b5c28 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -58,6 +58,17 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "aes" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" +dependencies = [ + "cfg-if 1.0.4", + "cipher", + "cpufeatures 0.2.17", +] + [[package]] name = "ahash" version = "0.8.12" @@ -362,6 +373,18 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "async-broadcast" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "435a87a52755b8f27fcf321ac4f04b2802e337c8c4872923137471ec39c37532" +dependencies = [ + "event-listener", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-channel" version = "2.5.0" @@ -387,6 +410,35 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-io" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "456b8a8feb6f42d237746d4b3e9a178494627745c3c56c6ea55d92ba50d026fc" +dependencies = [ + "autocfg", + "cfg-if 1.0.4", + "concurrent-queue", + "futures-io", + "futures-lite", + "parking", + "polling", + "rustix 1.1.3", + "slab", + "windows-sys 0.61.2", +] + +[[package]] +name = "async-lock" +version = "3.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290f7f2596bd5b78a9fec8088ccd89180d7f9f55b94b0576823bbbdc72ee8311" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + [[package]] name = "async-nats" version = "0.44.2" @@ -424,6 +476,53 @@ dependencies = [ "url", ] +[[package]] +name = "async-process" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc50921ec0055cdd8a16de48773bfeec5c972598674347252c0399676be7da75" +dependencies = [ + "async-channel", + "async-io", + "async-lock", + "async-signal", + "async-task", + "blocking", + "cfg-if 1.0.4", + "event-listener", + "futures-lite", + "rustix 1.1.3", +] + +[[package]] +name = "async-recursion" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + +[[package]] +name = "async-signal" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43c070bbf59cd3570b6b2dd54cd772527c7c3620fce8be898406dd3ed6adc64c" +dependencies = [ + "async-io", + "async-lock", + "atomic-waker", + "cfg-if 1.0.4", + "futures-core", + "futures-io", + "rustix 1.1.3", + "signal-hook-registry", + "slab", + "windows-sys 0.61.2", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -446,6 +545,12 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "async-trait" version = "0.1.89" @@ -943,6 +1048,15 @@ dependencies = [ "hybrid-array", ] +[[package]] +name = "block-padding" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93" +dependencies = [ + "generic-array", +] + [[package]] name = "block2" version = "0.6.2" @@ -952,6 +1066,19 @@ dependencies = [ "objc2", ] +[[package]] +name = "blocking" +version = "1.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e83f8d02be6967315521be875afa792a316e28d57b5a2d401897e2a7921b7f21" +dependencies = [ + "async-channel", + "async-task", + "futures-io", + "futures-lite", + "piper", +] + [[package]] name = "bon" version = "3.7.2" @@ -1180,6 +1307,15 @@ dependencies = [ "rustversion", ] +[[package]] +name = "cbc" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6" +dependencies = [ + "cipher", +] + [[package]] name = "cc" version = "1.2.53" @@ -1256,6 +1392,16 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6bdf600c45bd958cf2945c445264471cca8b6c8e67bc87b71affd6d7e5682621" +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common 0.1.6", + "inout", +] + [[package]] name = "clang-sys" version = "1.8.1" @@ -2381,6 +2527,35 @@ dependencies = [ "wit-bindgen 0.52.0", ] +[[package]] +name = "dbus" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b3aa68d7e7abee336255bd7248ea965cc393f3e70411135a6f6a4b651345d4" +dependencies = [ + "libc", + "libdbus-sys", + "windows-sys 0.59.0", +] + +[[package]] +name = "dbus-secret-service" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "708b509edf7889e53d7efb0ffadd994cc6c2345ccb62f55cfd6b0682165e4fa6" +dependencies = [ + "aes", + "block-padding", + "cbc", + "dbus", + "fastrand", + "hkdf", + "num", + "once_cell", + "sha2 0.10.9", + "zeroize", +] + [[package]] name = "deadpool" version = "0.12.3" @@ -2802,6 +2977,12 @@ dependencies = [ "cfg-if 1.0.4", ] +[[package]] +name = "endi" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66b7e2430c6dff6a955451e2cfc438f09cea1965a9d6f87f7e3b90decc014099" + [[package]] name = "enum-as-inner" version = "0.6.1" @@ -2845,6 +3026,27 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "enumflags2" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1027f7680c853e056ebcec683615fb6fbbc07dbaa13b4d5d9442b146ded4ecef" +dependencies = [ + "enumflags2_derive", + "serde", +] + +[[package]] +name = "enumflags2_derive" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67c78a4d8fdf9953a5c9d458f9efe940fd97a0cab0941c075a813ac594733827" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "equator" version = "0.4.2" @@ -4492,6 +4694,16 @@ dependencies = [ "web-time", ] +[[package]] +name = "inout" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01" +dependencies = [ + "block-padding", + "generic-array", +] + [[package]] name = "inplace-vec-builder" version = "0.1.1" @@ -5188,7 +5400,14 @@ version = "3.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eebcc3aff044e5944a8fbaf69eb277d11986064cba30c468730e8b9909fb551c" dependencies = [ + "byteorder", + "dbus-secret-service", + "linux-keyutils", "log", + "secret-service", + "security-framework 2.11.1", + "security-framework 3.5.1", + "windows-sys 0.60.2", "zeroize", ] @@ -5231,6 +5450,15 @@ version = "0.2.181" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "459427e2af2b9c839b132acb702a1c654d95e10f8c326bfc2ad11310e458b1c5" +[[package]] +name = "libdbus-sys" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "328c4789d42200f1eeec05bd86c9c13c7f091d2ba9a6ea35acdf51f31bc0f043" +dependencies = [ + "pkg-config", +] + [[package]] name = "libfuzzer-sys" version = "0.4.10" @@ -5289,6 +5517,16 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "linux-keyutils" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "761e49ec5fd8a5a463f9b84e877c373d888935b71c6be78f3767fe2ae6bed18e" +dependencies = [ + "bitflags 2.9.4", + "libc", +] + [[package]] name = "linux-raw-sys" version = "0.4.15" @@ -5486,6 +5724,15 @@ dependencies = [ "rustix 1.1.3", ] +[[package]] +name = "memoffset" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a" +dependencies = [ + "autocfg", +] + [[package]] name = "metal" version = "0.18.0" @@ -5997,6 +6244,19 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" +[[package]] +name = "nix" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" +dependencies = [ + "bitflags 2.9.4", + "cfg-if 1.0.4", + "cfg_aliases", + "libc", + "memoffset", +] + [[package]] name = "nkeys" version = "0.4.5" @@ -6726,6 +6986,16 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "ordered-stream" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aa2b01e1d916879f73a53d01d1d6cee68adbb31d6d9177a8cfce093cced1d50" +dependencies = [ + "futures-core", + "pin-project-lite", +] + [[package]] name = "ort" version = "2.0.0-rc.11" @@ -7012,6 +7282,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c835479a4443ded371d6c535cbfd8d31ad92c5d23ae9770a61bc155e4992a3c1" +dependencies = [ + "atomic-waker", + "fastrand", + "futures-io", +] + [[package]] name = "pkarr" version = "5.0.2" @@ -7154,6 +7435,20 @@ dependencies = [ "miniz_oxide 0.8.5", ] +[[package]] +name = "polling" +version = "3.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d0e4f59085d47d8241c88ead0f274e8a0cb551f3625263c05eb8dd897c34218" +dependencies = [ + "cfg-if 1.0.4", + "concurrent-queue", + "hermit-abi", + "pin-project-lite", + "rustix 1.1.3", + "windows-sys 0.61.2", +] + [[package]] name = "portable-atomic" version = "1.11.1" @@ -8541,6 +8836,25 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "secret-service" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4d35ad99a181be0a60ffcbe85d680d98f87bdc4d7644ade319b87076b9dbfd4" +dependencies = [ + "aes", + "cbc", + "futures-util", + "generic-array", + "hkdf", + "num", + "once_cell", + "rand 0.8.5", + "serde", + "sha2 0.10.9", + "zbus", +] + [[package]] name = "security-framework" version = "2.11.1" @@ -10263,6 +10577,17 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "uds_windows" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2f6fb2847f6742cd76af783a2a2c49e9375d0a111c7bef6f71cd9e738c72d6e" +dependencies = [ + "memoffset", + "tempfile", + "windows-sys 0.60.2", +] + [[package]] name = "ulid" version = "1.2.1" @@ -12605,6 +12930,16 @@ dependencies = [ "time", ] +[[package]] +name = "xdg-home" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec1cdab258fb55c0da61328dc52c8764709b249011b2cad0454c72f0bf10a1f6" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "xml-rs" version = "0.8.28" @@ -12699,6 +13034,62 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2164e798d9e3d84ee2c91139ace54638059a3b23e361f5c11781c2c6459bde0f" +[[package]] +name = "zbus" +version = "4.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb97012beadd29e654708a0fdb4c84bc046f537aecfde2c3ee0a9e4b4d48c725" +dependencies = [ + "async-broadcast", + "async-process", + "async-recursion", + "async-trait", + "enumflags2", + "event-listener", + "futures-core", + "futures-sink", + "futures-util", + "hex", + "nix", + "ordered-stream", + "rand 0.8.5", + "serde", + "serde_repr", + "sha1", + "static_assertions", + "tracing", + "uds_windows", + "windows-sys 0.52.0", + "xdg-home", + "zbus_macros", + "zbus_names", + "zvariant", +] + +[[package]] +name = "zbus_macros" +version = "4.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "267db9407081e90bbfa46d841d3cbc60f59c0351838c4bc65199ecd79ab1983e" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.106", + "zvariant_utils", +] + +[[package]] +name = "zbus_names" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b9b1fef7d021261cc16cba64c351d291b715febe0fa10dc3a443ac5a5022e6c" +dependencies = [ + "serde", + "static_assertions", + "zvariant", +] + [[package]] name = "zerocopy" version = "0.7.35" @@ -12869,3 +13260,40 @@ checksum = "410e9ecef634c709e3831c2cfdb8d9c32164fae1c67496d5b68fff728eec37fe" dependencies = [ "zune-core 0.5.1", ] + +[[package]] +name = "zvariant" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2084290ab9a1c471c38fc524945837734fbf124487e105daec2bb57fd48c81fe" +dependencies = [ + "endi", + "enumflags2", + "serde", + "static_assertions", + "zvariant_derive", +] + +[[package]] +name = "zvariant_derive" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73e2ba546bda683a90652bac4a279bc146adad1386f25379cf73200d2002c449" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.106", + "zvariant_utils", +] + +[[package]] +name = "zvariant_utils" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c51bcff7cc3dbb5055396bcf774748c3dab426b4b8659046963523cee4808340" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] diff --git a/src/daybook_core/Cargo.toml b/src/daybook_core/Cargo.toml index 149db6ef..c39d9e3c 100644 --- a/src/daybook_core/Cargo.toml +++ b/src/daybook_core/Cargo.toml @@ -139,11 +139,26 @@ iroh-gossip = "0.96.0" directories.workspace = true fs4.workspace = true rand.workspace = true -keyring = "3.6.3" +keyring = { version = "3.6.3", default-features = false } irpc-iroh = "0.12.0" irpc = "0.12.0" tempfile = { workspace = true, optional = true } +[target.'cfg(target_os = "linux")'.dependencies] +keyring = { version = "3.6.3", default-features = false, features = [ + "linux-native-sync-persistent", + "crypto-rust", +] } + +[target.'cfg(target_os = "macos")'.dependencies] +keyring = { version = "3.6.3", default-features = false, features = ["apple-native"] } + +[target.'cfg(target_os = "ios")'.dependencies] +keyring = { version = "3.6.3", default-features = false, features = ["apple-native"] } + +[target.'cfg(target_os = "windows")'.dependencies] +keyring = { version = "3.6.3", default-features = false, features = ["windows-native"] } + [build-dependencies] uniffi = { workspace = true, features = ["build"], optional = true } zstd.workspace = true diff --git a/src/daybook_core/rt.rs b/src/daybook_core/rt.rs index 2d98c1e6..6788c886 100644 --- a/src/daybook_core/rt.rs +++ b/src/daybook_core/rt.rs @@ -1217,9 +1217,39 @@ impl Rt { waiting_on_dispatch_ids, on_success_hooks, }); - self.dispatch_repo + if let Err(add_err) = self + .dispatch_repo .add(dispatch_id.clone(), Arc::clone(&active_dispatch)) - .await?; + .await + { + let ActiveDispatchDeets::Wflow { + wflow_job_id, + entry_id, + .. + } = &active_dispatch.deets; + if entry_id.is_some() { + if let Some(wflow_job_id) = wflow_job_id.as_ref() { + if let Err(cancel_err) = self + .wflow_ingress + .cancel_job( + Arc::from(wflow_job_id.as_ref()), + format!( + "rollback scheduling for dispatch {dispatch_id} after dispatch store failure" + ), + ) + .await + { + warn!( + %dispatch_id, + %wflow_job_id, + ?cancel_err, + "failed to rollback queued wflow job after dispatch add failure" + ); + } + } + } + return Err(add_err); + } let mut tags = vec![ "/type/dispatch".to_string(), format!("/dispatch/{dispatch_id}"), diff --git a/src/daybook_core/secrets.rs b/src/daybook_core/secrets.rs index acb8b941..014ce3e5 100644 --- a/src/daybook_core/secrets.rs +++ b/src/daybook_core/secrets.rs @@ -18,7 +18,7 @@ impl SecretRepo { let repo_id = repo_id.to_string(); let fallback_secret = load_or_init_fallback_secret(sql, &repo_id).await?; let fallback_secret_hex = data_encoding::HEXLOWER.encode(&fallback_secret.to_bytes()); - if is_keyring_disabled() { + if !should_use_keyring() { let public = fallback_secret.public(); return Ok(RepoIdentity { repo_id, @@ -27,57 +27,43 @@ impl SecretRepo { }); } let service_name = format!("daybook.repo.{repo_id}"); - let secret = match keyring::Entry::new(&service_name, Self::KEYRING_USERNAME) { - Ok(entry) => match entry.get_password() { - Ok(secret_hex) => match decode_secret_hex(&secret_hex) { - Ok(keyring_secret) => { - if keyring_secret.to_bytes() != fallback_secret.to_bytes() { - warn!( - "keyring and fallback iroh secrets diverged; using fallback secret" - ); - if let Err(err) = entry.set_password(&fallback_secret_hex) { - warn!(?err, "failed repairing keyring secret from fallback value"); - } - fallback_secret.clone() - } else { - keyring_secret - } - } - Err(err) => { - warn!( - ?err, - "invalid iroh secret key in keyring, repairing from fallback" - ); - if let Err(set_err) = entry.set_password(&fallback_secret_hex) { - warn!( - ?set_err, - "failed repairing keyring secret from fallback value" - ); - } + let entry = keyring::Entry::new(&service_name, Self::KEYRING_USERNAME).wrap_err_with(|| { + format!("failed creating keyring entry for iroh secret key ({service_name})") + })?; + let secret = match entry.get_password() { + Ok(secret_hex) => match decode_secret_hex(&secret_hex) { + Ok(keyring_secret) => { + if keyring_secret.to_bytes() != fallback_secret.to_bytes() { + warn!("keyring and fallback iroh secrets diverged; repairing keyring from fallback"); + entry + .set_password(&fallback_secret_hex) + .wrap_err("failed repairing keyring secret from fallback value")?; fallback_secret.clone() + } else { + keyring_secret } - }, - Err(keyring::Error::NoEntry) => { - if let Err(err) = entry.set_password(&fallback_secret_hex) { - warn!(?err, "failed backfilling keyring from fallback secret"); - } - fallback_secret.clone() } Err(err) => { warn!( ?err, - "error reading iroh secret key from keyring, using fallback" + "invalid iroh secret key in keyring, repairing from fallback" ); + entry + .set_password(&fallback_secret_hex) + .wrap_err("failed repairing keyring secret from fallback value")?; fallback_secret.clone() } }, - Err(err) => { - warn!( - ?err, - "error creating keyring entry, using fallback secret store" - ); + Err(keyring::Error::NoEntry) => { + entry + .set_password(&fallback_secret_hex) + .wrap_err("failed backfilling keyring from fallback secret")?; fallback_secret.clone() } + Err(err) => { + return Err(eyre::eyre!(err)) + .wrap_err("failed reading iroh secret key from keyring"); + } }; let public = secret.public(); @@ -103,16 +89,17 @@ impl SecretRepo { .bind(&encoded) .execute(sql) .await?; - if !is_keyring_disabled() { + if should_use_keyring() { let service_name = format!("daybook.repo.{repo_id}"); - if let Ok(entry) = keyring::Entry::new(&service_name, Self::KEYRING_USERNAME) { - if let Err(err) = entry.set_password(&encoded) { - warn!( - ?err, - "failed setting keyring secret from provisioned clone identity" - ); - } - } + let entry = + keyring::Entry::new(&service_name, Self::KEYRING_USERNAME).wrap_err_with(|| { + format!( + "failed creating keyring entry for provisioned clone identity ({service_name})" + ) + })?; + entry + .set_password(&encoded) + .wrap_err("failed setting keyring secret from provisioned clone identity")?; } let public = secret.public(); Ok(RepoIdentity { @@ -129,6 +116,29 @@ fn is_keyring_disabled() -> bool { .unwrap_or(false) } +fn has_persistent_keyring_backend() -> bool { + match keyring::default::default_credential_builder().persistence() { + keyring::credential::CredentialPersistence::EntryOnly + | keyring::credential::CredentialPersistence::ProcessOnly => false, + keyring::credential::CredentialPersistence::UntilReboot + | keyring::credential::CredentialPersistence::UntilDelete => true, + _ => false, + } +} + +fn should_use_keyring() -> bool { + if is_keyring_disabled() { + return false; + } + if !has_persistent_keyring_backend() { + warn!( + "keyring backend is not persistent on this target/build; using fallback secret store" + ); + return false; + } + true +} + fn fallback_secret_key(repo_id: &str) -> String { format!("iroh_secret_key_fallback_v1:{repo_id}") } From c325d5da912aa02c570b3e95a6d858f114e3b57f Mon Sep 17 00:00:00 2001 From: dman-os <61868957+dman-os@users.noreply.github.com> Date: Sat, 4 Apr 2026 03:14:17 +0300 Subject: [PATCH 2/4] feat(wflow): sleep and messages --- src/daybook_core/secrets.rs | 7 +- src/test_wflows/lib.rs | 22 +++ src/wash_plugin_wflow/lib.rs | 171 +++++++++++++++++ src/wash_plugin_wflow/wit/main.wit | 4 + src/wflow/ingress.rs | 28 ++- src/wflow/test.rs | 17 ++ src/wflow/test/recv_message.rs | 95 ++++++++++ src/wflow/test/sleep_then_succeed.rs | 52 +++++ src/wflow_core/partition/effects.rs | 22 +++ src/wflow_core/partition/job_events.rs | 30 +++ src/wflow_core/partition/log.rs | 2 + src/wflow_core/partition/reduce.rs | 210 ++++++++++++++++++++- src/wflow_core/partition/state.rs | 22 ++- src/wflow_sdk/lib.rs | 39 ++++ src/wflow_tokio/partition/effect_worker.rs | 100 +++++++++- src/wflow_tokio/partition/reducer.rs | 33 +++- 16 files changed, 840 insertions(+), 14 deletions(-) create mode 100644 src/wflow/test/recv_message.rs create mode 100644 src/wflow/test/sleep_then_succeed.rs diff --git a/src/daybook_core/secrets.rs b/src/daybook_core/secrets.rs index 014ce3e5..4a434084 100644 --- a/src/daybook_core/secrets.rs +++ b/src/daybook_core/secrets.rs @@ -27,9 +27,10 @@ impl SecretRepo { }); } let service_name = format!("daybook.repo.{repo_id}"); - let entry = keyring::Entry::new(&service_name, Self::KEYRING_USERNAME).wrap_err_with(|| { - format!("failed creating keyring entry for iroh secret key ({service_name})") - })?; + let entry = + keyring::Entry::new(&service_name, Self::KEYRING_USERNAME).wrap_err_with(|| { + format!("failed creating keyring entry for iroh secret key ({service_name})") + })?; let secret = match entry.get_password() { Ok(secret_hex) => match decode_secret_hex(&secret_hex) { Ok(keyring_secret) => { diff --git a/src/test_wflows/lib.rs b/src/test_wflows/lib.rs index d275d230..fe9e293e 100644 --- a/src/test_wflows/lib.rs +++ b/src/test_wflows/lib.rs @@ -32,6 +32,7 @@ mod wit { use crate::interlude::*; use crate::wit::exports::townframe::wflow::bundle::JobResult; +use std::time::Duration; use wflow_sdk::{JobErrorX, Json, WflowCtx}; wit::export!(Component with_types_in wit); @@ -44,6 +45,8 @@ impl wit::exports::townframe::wflow::bundle::Guest for Component { "fails_once" => |cx, args: FailsOnceArgs| fails_once(cx, args), "fails_until_told" => |cx, args: FailsUntilToldArgs| fails_until_told(cx, args), "effect_chain" => |cx, args: EffectChainArgs| effect_chain(cx, args), + "sleep_then_succeed" => |cx, args: SleepThenSucceedArgs| sleep_then_succeed(cx, args), + "recv_message" => |cx, args: RecvMessageArgs| recv_message(cx, args), }) } } @@ -143,3 +146,22 @@ fn effect_chain(cx: WflowCtx, args: EffectChainArgs) -> Result<(), JobErrorX> { } Ok(()) } + +#[derive(Debug, Serialize, Deserialize)] +struct SleepThenSucceedArgs { + millis: u64, +} + +fn sleep_then_succeed(cx: WflowCtx, args: SleepThenSucceedArgs) -> Result<(), JobErrorX> { + cx.sleep(Duration::from_millis(args.millis))?; + Ok(()) +} + +#[derive(Debug, Serialize, Deserialize)] +struct RecvMessageArgs {} + +fn recv_message(cx: WflowCtx, _args: RecvMessageArgs) -> Result<(), JobErrorX> { + let Json(value) = cx.recv::()?; + let _ = value; + Ok(()) +} diff --git a/src/wash_plugin_wflow/lib.rs b/src/wash_plugin_wflow/lib.rs index 37a464ad..b6840697 100644 --- a/src/wash_plugin_wflow/lib.rs +++ b/src/wash_plugin_wflow/lib.rs @@ -79,15 +79,33 @@ enum JobTrap { value_json: Arc, attempt_id: u64, }, + WaitStep { + step_id: u64, + attempt_id: u64, + start_at: Timestamp, + deets: WaitTrapDeets, + }, RunComplete(Result), } +#[derive(Debug)] +enum WaitTrapDeets { + Timer { wait_id: u64, fire_at: Timestamp }, + Message { wait_id: u64 }, +} + #[derive(Debug, Clone, Copy)] enum SessionResume { Continue, Stop, } +fn wait_id_for(step_id: u64, attempt_id: u64) -> u64 { + let lo = step_id & 0xFFFF_FFFF; + let hi = (attempt_id & 0xFFFF_FFFF) << 32; + hi | lo +} + struct SessionHandle { job_id: Arc, ctx_id: Arc, @@ -244,6 +262,130 @@ impl host::Host for SharedWashCtx { Some(SessionResume::Stop) | None => Ok(Err("session stopped".to_string())), } } + + async fn sleep( + &mut self, + job_id: partition_host::JobId, + step_id: host::StepId, + duration_ms: u64, + ) -> wasmtime::Result> { + let plugin = WflowPlugin::from_ctx(self); + let Some(job) = plugin + .active_jobs + .read() + .expect(ERROR_MUTEX) + .get(job_id.as_str()) + .cloned() + else { + anyhow::bail!("job not active"); + }; + let (attempt_id, start_at) = { + let active_step = job.active_step.lock().expect(ERROR_MUTEX); + let Some(active_step) = active_step.as_ref() else { + anyhow::bail!("step not active"); + }; + if active_step.step_id != step_id { + anyhow::bail!("given step_id is not active"); + } + (active_step.attempt_id, active_step.start_at) + }; + + let fire_at = start_at + .checked_add(Duration::from_millis(duration_ms)) + .expect("ts overflow"); + let trap = JobTrap::WaitStep { + step_id, + attempt_id, + start_at, + deets: WaitTrapDeets::Timer { + wait_id: wait_id_for(step_id, attempt_id), + fire_at, + }, + }; + + if job.yield_tx.send(trap).is_err() { + anyhow::bail!("session parent dropped"); + } + + let mut resume_rx = job.resume_rx.lock().await; + let cmd = tokio::select! { + _ = job.pause_cancel.cancelled() => { + return Ok(Err("session cancelled".to_string())); + } + cmd = resume_rx.recv() => cmd + }; + match cmd { + Some(SessionResume::Continue) => Ok(Ok(())), + Some(SessionResume::Stop) | None => Ok(Err("session stopped".to_string())), + } + } + + async fn recv_message( + &mut self, + job_id: partition_host::JobId, + step_id: host::StepId, + ) -> wasmtime::Result> { + let plugin = WflowPlugin::from_ctx(self); + let Some(job) = plugin + .active_jobs + .read() + .expect(ERROR_MUTEX) + .get(job_id.as_str()) + .cloned() + else { + anyhow::bail!("job not active"); + }; + let (attempt_id, start_at) = { + let active_step = job.active_step.lock().expect(ERROR_MUTEX); + let Some(active_step) = active_step.as_ref() else { + anyhow::bail!("step not active"); + }; + if active_step.step_id != step_id { + anyhow::bail!("given step_id is not active"); + } + (active_step.attempt_id, active_step.start_at) + }; + + let trap = JobTrap::WaitStep { + step_id, + attempt_id, + start_at, + deets: WaitTrapDeets::Message { + wait_id: wait_id_for(step_id, attempt_id), + }, + }; + + if job.yield_tx.send(trap).is_err() { + anyhow::bail!("session parent dropped"); + } + + let mut resume_rx = job.resume_rx.lock().await; + let cmd = tokio::select! { + _ = job.pause_cancel.cancelled() => { + return Ok(Err("session cancelled".to_string())); + } + cmd = resume_rx.recv() => cmd + }; + match cmd { + Some(SessionResume::Continue) => { + let journal = job.journal.lock().expect(ERROR_MUTEX); + let Some(step_state) = journal.steps.get(step_id as usize) else { + anyhow::bail!("step missing from journal"); + }; + let wflow_core::partition::state::JobStepState::Effect { attempts } = step_state; + let Some(last_attempt) = attempts.last() else { + anyhow::bail!("step has no attempts in journal"); + }; + let wflow_core::partition::job_events::JobEffectResultDeets::Success { value_json } = + &last_attempt.deets + else { + anyhow::bail!("step has no success value"); + }; + Ok(Ok(value_json.to_string())) + } + Some(SessionResume::Stop) | None => Ok(Err("session stopped".to_string())), + } + } } impl partition_host::Host for SharedWashCtx { @@ -254,6 +396,15 @@ impl partition_host::Host for SharedWashCtx { ) -> wasmtime::Result<()> { todo!() } + + async fn send_message( + &mut self, + _id: partition_host::PartitionId, + _job_id: partition_host::JobId, + _payload_json: String, + ) -> wasmtime::Result<()> { + todo!() + } } impl metastore::Host for SharedWashCtx { @@ -372,6 +523,26 @@ impl WflowPlugin { deets: job_events::JobEffectResultDeets::Success { value_json }, }, )), + JobTrap::WaitStep { + step_id, + attempt_id, + start_at, + deets, + } => Ok(job_events::JobRunResult::StepWait( + job_events::JobWaitResult { + step_id, + attempt_id, + start_at, + deets: match deets { + WaitTrapDeets::Timer { wait_id, fire_at } => { + job_events::JobWaitResultDeets::Timer { wait_id, fire_at } + } + WaitTrapDeets::Message { wait_id } => { + job_events::JobWaitResultDeets::Message { wait_id } + } + }, + }, + )), JobTrap::RunComplete(Ok(value_json)) => Ok(job_events::JobRunResult::Success { value_json: value_json.into(), }), diff --git a/src/wash_plugin_wflow/wit/main.wit b/src/wash_plugin_wflow/wit/main.wit index 01f1f471..9b1e98ff 100644 --- a/src/wash_plugin_wflow/wit/main.wit +++ b/src/wash_plugin_wflow/wit/main.wit @@ -66,6 +66,8 @@ interface host { next-step: func(job-id: job-id) -> result; persist-step: func(job-id: job-id, step-id: step-id, value-json: json) -> result<_, string>; + sleep: func(job-id: job-id, step-id: step-id, duration-ms: u64) -> result<_, string>; + recv-message: func(job-id: job-id, step-id: step-id) -> result; } // for starting invocations @@ -88,6 +90,7 @@ interface ingress { interface partition-host { use types.{job-id}; use metastore.{wflow-meta}; + use types.{json}; type partition-id = u64; @@ -97,6 +100,7 @@ interface partition-host { } add-job: func(partition-id: partition-id, args: add-job-args); + send-message: func(partition-id: partition-id, job-id: job-id, payload-json: json); } interface metastore { diff --git a/src/wflow/ingress.rs b/src/wflow/ingress.rs index 4faf6140..4bed5bd3 100644 --- a/src/wflow/ingress.rs +++ b/src/wflow/ingress.rs @@ -1,7 +1,7 @@ use crate::interlude::*; use wflow_core::metastore; -use wflow_core::partition::job_events::{JobCancelEvent, JobInitEvent}; +use wflow_core::partition::job_events::{JobCancelEvent, JobInitEvent, JobMessageEvent}; use wflow_core::partition::log::PartitionLogEntry; use wflow_tokio::partition::PartitionLogRef; @@ -29,6 +29,14 @@ pub trait WflowIngress: Send + Sync { /// Request cancellation of a job. Appends JobCancel to partition log. async fn cancel_job(&self, job_id: Arc, reason: String) -> Res; + + /// Send an external message to a running job. + async fn send_message( + &self, + job_id: Arc, + message_id: Arc, + payload_json: String, + ) -> Res; } /// Implementation that appends directly to partition log @@ -86,4 +94,22 @@ impl WflowIngress for PartitionLogIngress { .await?; Ok(entry_id) } + + async fn send_message( + &self, + job_id: Arc, + message_id: Arc, + payload_json: String, + ) -> Res { + let mut log = self.log.clone(); + let entry_id = log + .append(&PartitionLogEntry::JobMessage(JobMessageEvent { + job_id, + message_id, + timestamp: Timestamp::now(), + payload_json: payload_json.into(), + })) + .await?; + Ok(entry_id) + } } diff --git a/src/wflow/test.rs b/src/wflow/test.rs index 1a531e11..a33dbc26 100644 --- a/src/wflow/test.rs +++ b/src/wflow/test.rs @@ -13,6 +13,10 @@ mod fails_until_told; mod keyvalue_plugin; #[cfg(test)] mod recover_from_log; +#[cfg(test)] +mod recv_message; +#[cfg(test)] +mod sleep_then_succeed; use wash_runtime::{host::HostApi, plugin, types, wit::WitInterface}; use wflow_core::kvstore::log::KvStoreLog; @@ -301,6 +305,19 @@ impl WflowTestContext { self.ingress.cancel_job(job_id, reason).await } + /// Send a message to a workflow job + pub async fn send_job_message( + &self, + job_id: Arc, + message_id: Arc, + payload_json: String, + ) -> Res { + use crate::WflowIngress; + self.ingress + .send_message(job_id, message_id, payload_json) + .await + } + /// Wait until there are no active jobs, with a timeout pub async fn wait_until_no_active_jobs(&self, timeout_secs: u64) -> Res<()> { use tokio::time::{Duration, Instant}; diff --git a/src/wflow/test/recv_message.rs b/src/wflow/test/recv_message.rs new file mode 100644 index 00000000..22a0793e --- /dev/null +++ b/src/wflow/test/recv_message.rs @@ -0,0 +1,95 @@ +use crate::interlude::*; + +use crate::test::{test_wflows_wasm_path, InitialWorkload, WflowTestContext}; + +#[tokio::test(flavor = "multi_thread")] +async fn test_recv_message_waits_then_receives() -> Res<()> { + utils_rs::testing::setup_tracing_once(); + + let test_cx = WflowTestContext::builder() + .initial_workloads(vec![InitialWorkload { + wasm_path: test_wflows_wasm_path()?, + wflow_keys: vec!["recv_message".to_string()], + }]) + .build() + .await? + .start() + .await?; + + let job_id: Arc = "test-recv-message-1".into(); + let args_json = serde_json::to_string(&serde_json::json!({}))?; + test_cx + .schedule_job(Arc::clone(&job_id), "recv_message", args_json) + .await?; + + test_cx + .wait_until_entry(0, 10, |_entry_id, entry| { + use wflow_core::partition::job_events::JobRunResult; + use wflow_core::partition::log::PartitionLogEntry; + matches!( + entry, + PartitionLogEntry::JobEffectResult(event) + if event.job_id == job_id && matches!(event.result, JobRunResult::StepWait(_)) + ) + }) + .await?; + + let payload = serde_json::json!({"kind":"ping","value":7}); + test_cx + .send_job_message( + Arc::clone(&job_id), + "msg-1".into(), + serde_json::to_string(&payload)?, + ) + .await?; + + test_cx.wait_until_no_active_jobs(10).await?; + + test_cx + .wait_until_entry(0, 10, |_entry_id, entry| { + use wflow_core::partition::job_events::JobRunResult; + use wflow_core::partition::log::PartitionLogEntry; + matches!( + entry, + PartitionLogEntry::JobEffectResult(event) + if event.job_id == job_id && matches!(event.result, JobRunResult::Success { .. }) + ) + }) + .await?; + + test_cx.stop().await?; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_recv_message_buffered_before_wait_step() -> Res<()> { + utils_rs::testing::setup_tracing_once(); + + let test_cx = WflowTestContext::builder() + .initial_workloads(vec![InitialWorkload { + wasm_path: test_wflows_wasm_path()?, + wflow_keys: vec!["recv_message".to_string()], + }]) + .build() + .await? + .start() + .await?; + + let job_id: Arc = "test-recv-message-buffered-1".into(); + let args_json = serde_json::to_string(&serde_json::json!({}))?; + test_cx + .schedule_job(Arc::clone(&job_id), "recv_message", args_json) + .await?; + let payload = serde_json::json!({"kind":"buffered","value":42}); + test_cx + .send_job_message( + Arc::clone(&job_id), + "msg-buffered-1".into(), + serde_json::to_string(&payload)?, + ) + .await?; + + test_cx.wait_until_no_active_jobs(10).await?; + test_cx.stop().await?; + Ok(()) +} diff --git a/src/wflow/test/sleep_then_succeed.rs b/src/wflow/test/sleep_then_succeed.rs new file mode 100644 index 00000000..03193394 --- /dev/null +++ b/src/wflow/test/sleep_then_succeed.rs @@ -0,0 +1,52 @@ +use crate::interlude::*; + +use crate::test::{test_wflows_wasm_path, InitialWorkload, WflowTestContext}; + +#[tokio::test(flavor = "multi_thread")] +async fn test_sleep_then_succeed() -> Res<()> { + utils_rs::testing::setup_tracing_once(); + + let test_cx = WflowTestContext::builder() + .initial_workloads(vec![InitialWorkload { + wasm_path: test_wflows_wasm_path()?, + wflow_keys: vec!["sleep_then_succeed".to_string()], + }]) + .build() + .await? + .start() + .await?; + + let job_id: Arc = "test-sleep-then-succeed-1".into(); + let args_json = serde_json::to_string(&serde_json::json!({ + "millis": 200_u64 + }))?; + test_cx + .schedule_job(Arc::clone(&job_id), "sleep_then_succeed", args_json) + .await?; + + test_cx + .wait_until_entry(0, 10, |_entry_id, entry| { + use wflow_core::partition::job_events::JobRunResult; + use wflow_core::partition::log::PartitionLogEntry; + matches!( + entry, + PartitionLogEntry::JobEffectResult(event) + if event.job_id == job_id && matches!(event.result, JobRunResult::StepWait(_)) + ) + }) + .await?; + + test_cx + .wait_until_entry(0, 10, |_entry_id, entry| { + use wflow_core::partition::log::PartitionLogEntry; + matches!( + entry, + PartitionLogEntry::JobTimerFired(event) if event.job_id == job_id + ) + }) + .await?; + + test_cx.wait_until_no_active_jobs(10).await?; + test_cx.stop().await?; + Ok(()) +} diff --git a/src/wflow_core/partition/effects.rs b/src/wflow_core/partition/effects.rs index 638e5a86..2aef47db 100644 --- a/src/wflow_core/partition/effects.rs +++ b/src/wflow_core/partition/effects.rs @@ -20,6 +20,9 @@ pub struct PartitionEffect { pub enum PartitionEffectDeets { RunJob(RunJobAttemptDeets), AbortRun { reason: Arc }, + WaitTimer(WaitTimerDeets), + WaitMessage(WaitMessageDeets), + CancelWait(CancelWaitDeets), } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -34,3 +37,22 @@ impl From for PartitionEffectDeets { Self::RunJob(value) } } + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct WaitTimerDeets { + pub wait_id: u64, + pub fire_at: Timestamp, + pub step_id: u64, + pub attempt_id: u64, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct WaitMessageDeets { + pub wait_id: u64, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct CancelWaitDeets { + pub wait_id: u64, + pub reason: Arc, +} diff --git a/src/wflow_core/partition/job_events.rs b/src/wflow_core/partition/job_events.rs index 941f6530..36213432 100644 --- a/src/wflow_core/partition/job_events.rs +++ b/src/wflow_core/partition/job_events.rs @@ -19,6 +19,21 @@ pub struct JobCancelEvent { pub reason: Arc, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct JobMessageEvent { + pub job_id: Arc, + pub message_id: Arc, + pub timestamp: Timestamp, + pub payload_json: Arc, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct JobTimerFiredEvent { + pub job_id: Arc, + pub wait_id: u64, + pub timestamp: Timestamp, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct JobRunEvent { pub job_id: Arc, @@ -36,6 +51,7 @@ pub struct JobRunEvent { pub enum JobRunResult { Success { value_json: Arc }, StepEffect(JobEffectResult), + StepWait(JobWaitResult), WorkerErr(JobRunWorkerError), WflowErr(JobError), Aborted, @@ -77,6 +93,20 @@ pub enum JobEffectResultDeets { EffectErr(JobError), } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct JobWaitResult { + pub step_id: u64, + pub attempt_id: u64, + pub start_at: Timestamp, + pub deets: JobWaitResultDeets, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum JobWaitResultDeets { + Timer { wait_id: u64, fire_at: Timestamp }, + Message { wait_id: u64 }, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub enum JobError { Transient { diff --git a/src/wflow_core/partition/log.rs b/src/wflow_core/partition/log.rs index 3f4eda0b..f8c9b85c 100644 --- a/src/wflow_core/partition/log.rs +++ b/src/wflow_core/partition/log.rs @@ -8,6 +8,8 @@ pub enum PartitionLogEntry { JobInit(job_events::JobInitEvent), JobEffectResult(job_events::JobRunEvent), JobCancel(job_events::JobCancelEvent), + JobMessage(job_events::JobMessageEvent), + JobTimerFired(job_events::JobTimerFiredEvent), JobPartitionEffects(JobPartitionEffectsLogEntry), } diff --git a/src/wflow_core/partition/reduce.rs b/src/wflow_core/partition/reduce.rs index 37e70b3e..c5dbd8f7 100644 --- a/src/wflow_core/partition/reduce.rs +++ b/src/wflow_core/partition/reduce.rs @@ -23,6 +23,8 @@ pub fn reduce_job_init_event( cancelling: false, runs: default(), steps: default(), + pending_messages: default(), + active_wait: None, }, ); @@ -44,6 +46,25 @@ pub fn reduce_job_cancel_event( info!("cancel for unknown or already-archived job, skipping"); return; }; + + if let Some(wait_state) = &job_state.active_wait { + if matches!( + wait_state.deets, + job_events::JobWaitResultDeets::Timer { .. } + ) { + effects.push(PartitionEffect { + job_id: Arc::clone(&event.job_id), + deets: effects::PartitionEffectDeets::CancelWait(effects::CancelWaitDeets { + wait_id: wait_state.wait_id, + reason: Arc::clone(&event.reason), + }), + }); + } + job_state.active_wait = None; + archive_job(state, &event.job_id); + return; + } + job_state.cancelling = true; effects.push(PartitionEffect { job_id: event.job_id, @@ -53,6 +74,92 @@ pub fn reduce_job_cancel_event( }); } +pub fn reduce_job_message_event( + state: &mut state::PartitionJobsState, + effects: &mut Vec, + event: job_events::JobMessageEvent, +) { + let Some(job_state) = state.active.get_mut(&event.job_id) else { + info!("message for unknown or archived job, skipping"); + return; + }; + job_state + .pending_messages + .push_back(state::JobInboxMessage { + message_id: event.message_id, + timestamp: event.timestamp, + payload_json: event.payload_json, + }); + + let Some(wait_state) = job_state.active_wait.clone() else { + return; + }; + let run_id = wait_state.run_id; + let preferred_worker_id = wait_state.preferred_worker_id.clone(); + match &wait_state.deets { + job_events::JobWaitResultDeets::Message { .. } => { + let msg = job_state + .pending_messages + .pop_front() + .expect("just pushed one message"); + complete_wait_step_success_for_wait_state( + &mut job_state.steps, + &wait_state, + msg.payload_json, + event.timestamp, + ); + job_state.active_wait = None; + + effects.push(PartitionEffect { + job_id: Arc::clone(&event.job_id), + deets: effects::PartitionEffectDeets::RunJob(effects::RunJobAttemptDeets { + run_id, + preferred_worker_id, + }), + }); + } + job_events::JobWaitResultDeets::Timer { .. } => {} + } +} + +pub fn reduce_job_timer_fired_event( + state: &mut state::PartitionJobsState, + effects: &mut Vec, + event: job_events::JobTimerFiredEvent, +) { + let Some(job_state) = state.active.get_mut(&event.job_id) else { + info!("timer fired for unknown or archived job, skipping"); + return; + }; + let Some(wait_state) = job_state.active_wait.clone() else { + info!("timer fired but no active wait, skipping"); + return; + }; + let job_events::JobWaitResultDeets::Timer { wait_id, .. } = &wait_state.deets else { + info!("timer fired but active wait is not timer, skipping"); + return; + }; + if *wait_id != event.wait_id { + info!("timer fired for stale wait id, skipping"); + return; + } + + complete_wait_step_success_for_wait_state( + &mut job_state.steps, + &wait_state, + "null".into(), + event.timestamp, + ); + job_state.active_wait = None; + effects.push(PartitionEffect { + job_id: event.job_id, + deets: effects::PartitionEffectDeets::RunJob(effects::RunJobAttemptDeets { + run_id: wait_state.run_id, + preferred_worker_id: wait_state.preferred_worker_id.clone(), + }), + }); +} + pub fn reduce_job_run_event( state: &mut state::PartitionJobsState, effects: &mut Vec, @@ -71,6 +178,8 @@ pub fn reduce_job_run_event( ref mut steps, ref override_wflow_retry_policy, ref cancelling, + ref mut pending_messages, + ref mut active_wait, .. }) = get_job_state(state, &job_id) else { @@ -84,6 +193,7 @@ pub fn reduce_job_run_event( assert!((event.run_id as usize) == runs.len()); runs.push(event); + let next_run_id = runs.len() as u64; let event = runs.last_mut().unwrap(); match &event.result { @@ -91,12 +201,14 @@ pub fn reduce_job_run_event( | job_events::JobRunResult::WorkerErr(_) | job_events::JobRunResult::WflowErr(JobError::Terminal { .. }) | job_events::JobRunResult::Aborted => { + *active_wait = None; archive_job(state, &job_id); } job_events::JobRunResult::WflowErr(JobError::Transient { retry_policy, .. }) => { if *cancelling { archive_job(state, &job_id); } else { + *active_wait = None; let retry_policy = retry_policy .clone() .or(override_wflow_retry_policy.clone()) @@ -125,6 +237,7 @@ pub fn reduce_job_run_event( match &res.deets { job_events::JobEffectResultDeets::EffectErr(JobError::Terminal { .. }) => { + *active_wait = None; archive_job(state, &job_id); } job_events::JobEffectResultDeets::Success { .. } => { @@ -135,7 +248,7 @@ pub fn reduce_job_run_event( job_id, deets: effects::PartitionEffectDeets::RunJob( effects::RunJobAttemptDeets { - run_id: runs.len() as u64, + run_id: next_run_id, preferred_worker_id: worker_id_for_hint.clone(), }, ), @@ -159,7 +272,7 @@ pub fn reduce_job_run_event( job_id, deets: effects::PartitionEffectDeets::RunJob( effects::RunJobAttemptDeets { - run_id: runs.len() as u64, + run_id: next_run_id, preferred_worker_id: None, }, ), @@ -170,6 +283,63 @@ pub fn reduce_job_run_event( } } } + job_events::JobRunResult::StepWait(wait) => { + if *cancelling { + *active_wait = None; + archive_job(state, &job_id); + return; + } + + assert!( + active_wait.is_none(), + "job already has an active wait; wait completion must clear before adding new wait" + ); + let run_id = next_run_id; + let wait_state = state::JobWaitState { + wait_id: wait.wait_id(), + run_id, + preferred_worker_id: worker_id_for_hint.clone(), + step_id: wait.step_id, + attempt_id: wait.attempt_id, + start_at: wait.start_at, + deets: wait.deets.clone(), + }; + *active_wait = Some(wait_state.clone()); + + match wait.deets.clone() { + job_events::JobWaitResultDeets::Timer { wait_id, fire_at } => { + effects.push(PartitionEffect { + job_id, + deets: effects::PartitionEffectDeets::WaitTimer(effects::WaitTimerDeets { + wait_id, + fire_at, + step_id: wait.step_id, + attempt_id: wait.attempt_id, + }), + }); + } + job_events::JobWaitResultDeets::Message { .. } => { + if let Some(msg) = pending_messages.pop_front() { + complete_wait_step_success_for_wait_state( + steps, + &wait_state, + msg.payload_json, + msg.timestamp, + ); + *active_wait = None; + effects.push(PartitionEffect { + job_id, + deets: effects::PartitionEffectDeets::RunJob( + effects::RunJobAttemptDeets { + run_id, + preferred_worker_id: wait_state.preferred_worker_id.clone(), + }, + ), + }); + } + } + } + } } } @@ -184,3 +354,39 @@ fn get_job_state<'a>( ) -> Option<&'a mut state::JobState> { state.active.get_mut(job_id) } + +trait WaitIdExt { + fn wait_id(&self) -> u64; +} + +impl WaitIdExt for job_events::JobWaitResult { + fn wait_id(&self) -> u64 { + match &self.deets { + job_events::JobWaitResultDeets::Timer { wait_id, .. } + | job_events::JobWaitResultDeets::Message { wait_id } => *wait_id, + } + } +} + +fn complete_wait_step_success_for_wait_state( + steps: &mut Vec, + wait_state: &state::JobWaitState, + value_json: Arc, + end_at: Timestamp, +) { + if steps.len() == wait_state.step_id as usize { + steps.push(state::JobStepState::Effect { + attempts: default(), + }); + } + let step = &mut steps[wait_state.step_id as usize]; + let state::JobStepState::Effect { attempts } = step; + assert!((wait_state.attempt_id as usize) == attempts.len()); + attempts.push(job_events::JobEffectResult { + step_id: wait_state.step_id, + attempt_id: wait_state.attempt_id, + start_at: wait_state.start_at, + end_at, + deets: job_events::JobEffectResultDeets::Success { value_json }, + }); +} diff --git a/src/wflow_core/partition/state.rs b/src/wflow_core/partition/state.rs index a34e49b5..87d04245 100644 --- a/src/wflow_core/partition/state.rs +++ b/src/wflow_core/partition/state.rs @@ -1,6 +1,6 @@ use crate::interlude::*; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use crate::gen::metastore::WflowMeta; use crate::partition::job_events::*; @@ -21,9 +21,29 @@ pub struct JobState { pub cancelling: bool, pub runs: Vec, pub steps: Vec, + pub pending_messages: VecDeque, + pub active_wait: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] pub enum JobStepState { Effect { attempts: Vec }, } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct JobInboxMessage { + pub message_id: Arc, + pub timestamp: Timestamp, + pub payload_json: Arc, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct JobWaitState { + pub wait_id: u64, + pub run_id: u64, + pub preferred_worker_id: Option>, + pub step_id: u64, + pub attempt_id: u64, + pub start_at: Timestamp, + pub deets: crate::partition::job_events::JobWaitResultDeets, +} diff --git a/src/wflow_sdk/lib.rs b/src/wflow_sdk/lib.rs index 776c09cd..70187a6f 100644 --- a/src/wflow_sdk/lib.rs +++ b/src/wflow_sdk/lib.rs @@ -79,6 +79,45 @@ impl WflowCtx { } } } + + pub fn sleep(&self, duration: Duration) -> Result<(), JobErrorX> { + let state = host::next_step(&self.job.job_id) + .map_err(|err| JobErrorX::Terminal(ferr!("error getting next op: {err}")))?; + match state { + host::StepState::Completed(_completed) => Ok(()), + host::StepState::Active(active_op_state) => host::sleep( + &self.job.job_id, + active_op_state.id, + duration.as_millis().try_into().map_err(|_| { + JobErrorX::Terminal(ferr!("duration too large for host sleep API")) + })?, + ) + .map_err(|err| JobErrorX::Terminal(ferr!("error in host sleep call: {err}"))), + } + } + + pub fn recv(&self) -> Result, JobErrorX> + where + T: serde::de::DeserializeOwned, + { + let state = host::next_step(&self.job.job_id) + .map_err(|err| JobErrorX::Terminal(ferr!("error getting next op: {err}")))?; + let value_json = match state { + host::StepState::Completed(completed) => completed.value_json, + host::StepState::Active(active_op_state) => { + host::recv_message(&self.job.job_id, active_op_state.id).map_err(|err| { + JobErrorX::Terminal(ferr!("error receiving workflow message: {err}")) + })? + } + }; + let value = serde_json::from_str(&value_json).map_err(|err| { + JobErrorX::Terminal(ferr!( + "error parsing workflow message json for '{type_name}': {err:?}", + type_name = std::any::type_name::() + )) + })?; + Ok(Json(value)) + } } /// Helper function to convert JobErrorX to JobError diff --git a/src/wflow_tokio/partition/effect_worker.rs b/src/wflow_tokio/partition/effect_worker.rs index 34d56895..4b25c5c1 100644 --- a/src/wflow_tokio/partition/effect_worker.rs +++ b/src/wflow_tokio/partition/effect_worker.rs @@ -60,10 +60,13 @@ pub fn start_tokio_effect_worker( job_to_effect_id, worker_id: Arc::clone(&worker_name), sessions: default(), + pending_timers: default(), log: pcx.log_ref(), pcx, }; debug!("starting"); + let mut timer_tick = tokio::time::interval(Duration::from_millis(100)); + timer_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); loop { tokio::select! { biased; @@ -83,6 +86,9 @@ pub fn start_tokio_effect_worker( }; worker.handle_partition_effects(effect_id).await?; } + _ = timer_tick.tick() => { + worker.fire_due_timers().await?; + } }; } worker.shutdown_sessions(); @@ -107,6 +113,15 @@ struct TokioEffectWorker { job_to_effect_id: JobToEffectId, worker_id: WorkerId, sessions: HashMap, CachedRunSession>, + pending_timers: HashMap, +} + +#[derive(Debug, Clone)] +struct PendingTimer { + effect_id: effects::EffectId, + job_id: Arc, + wait_id: u64, + fire_at: Timestamp, } enum CachedHostKind { @@ -128,7 +143,7 @@ impl TokioEffectWorker { job_events::JobRunResult::StepEffect(job_events::JobEffectResult { deets: job_events::JobEffectResultDeets::Success { .. }, .. - }) + }) | job_events::JobRunResult::StepWait(_) ) } @@ -155,6 +170,72 @@ impl TokioEffectWorker { } } + async fn fire_due_timers(&mut self) -> Res<()> { + if self.pending_timers.is_empty() { + return Ok(()); + } + let now = Timestamp::now(); + let due_effect_ids = self + .pending_timers + .values() + .filter(|timer| timer.fire_at <= now) + .map(|timer| timer.effect_id.clone()) + .collect::>(); + + for effect_id in due_effect_ids { + let Some(timer) = self.pending_timers.remove(&effect_id) else { + continue; + }; + { + let mut effects_map = self.state.write_effects().await; + effects_map.remove(&effect_id); + } + self.log + .append(&log::PartitionLogEntry::JobTimerFired( + job_events::JobTimerFiredEvent { + job_id: timer.job_id, + wait_id: timer.wait_id, + timestamp: now, + }, + )) + .await?; + } + Ok(()) + } + + async fn cancel_wait_effects(&mut self, job_id: &Arc, wait_id: u64) { + let remove_effect_ids = { + let effects_map = self.state.read_effects().await; + effects_map + .iter() + .filter_map(|(id, effect)| { + if &effect.job_id != job_id { + return None; + } + let found = match &effect.deets { + effects::PartitionEffectDeets::WaitTimer(wait) => wait.wait_id == wait_id, + effects::PartitionEffectDeets::WaitMessage(wait) => wait.wait_id == wait_id, + _ => false, + }; + if found { + Some(id.clone()) + } else { + None + } + }) + .collect::>() + }; + for effect_id in &remove_effect_ids { + self.pending_timers.remove(effect_id); + } + if !remove_effect_ids.is_empty() { + let mut effects_map = self.state.write_effects().await; + for effect_id in remove_effect_ids { + effects_map.remove(&effect_id); + } + } + } + #[tracing::instrument(skip(self))] async fn handle_partition_effects(&mut self, effect_id: effects::EffectId) -> Res<()> { let (job_id, deets) = { @@ -220,6 +301,23 @@ impl TokioEffectWorker { let mut effects_map = self.state.write_effects().await; effects_map.remove(&effect_id); } + effects::PartitionEffectDeets::WaitTimer(wait) => { + self.pending_timers.insert( + effect_id.clone(), + PendingTimer { + effect_id: effect_id.clone(), + job_id, + wait_id: wait.wait_id, + fire_at: wait.fire_at, + }, + ); + } + effects::PartitionEffectDeets::WaitMessage(_) => {} + effects::PartitionEffectDeets::CancelWait(cancel) => { + self.cancel_wait_effects(&job_id, cancel.wait_id).await; + let mut effects_map = self.state.write_effects().await; + effects_map.remove(&effect_id); + } } Ok(()) } diff --git a/src/wflow_tokio/partition/reducer.rs b/src/wflow_tokio/partition/reducer.rs index b0dd448e..5feaa1e0 100644 --- a/src/wflow_tokio/partition/reducer.rs +++ b/src/wflow_tokio/partition/reducer.rs @@ -202,11 +202,16 @@ impl TokioPartitionReducer { let effects_map = self.state.read_effects().await; effects_map .iter() - .map(|(effect_id, effect)| { - ( - effect_id.clone(), - matches!(effect.deets, effects::PartitionEffectDeets::RunJob(..)), - ) + .filter_map(|(effect_id, effect)| { + let is_run_job = + matches!(effect.deets, effects::PartitionEffectDeets::RunJob(..)); + let should_reschedule = is_run_job + || matches!(effect.deets, effects::PartitionEffectDeets::WaitTimer(..)); + if should_reschedule { + Some((effect_id.clone(), is_run_job)) + } else { + None + } }) .collect::>() }; @@ -261,7 +266,9 @@ impl TokioPartitionReducer { match entry { log::PartitionLogEntry::JobEffectResult(..) | log::PartitionLogEntry::JobInit(..) - | log::PartitionLogEntry::JobCancel(..) => { + | log::PartitionLogEntry::JobCancel(..) + | log::PartitionLogEntry::JobMessage(..) + | log::PartitionLogEntry::JobTimerFired(..) => { self.handle_job_event(entry_id, entry).await?; } log::PartitionLogEntry::JobPartitionEffects(effects) => { @@ -383,6 +390,20 @@ impl TokioPartitionReducer { evt, ) } + log::PartitionLogEntry::JobMessage(evt) => { + wflow_core::partition::reduce::reduce_job_message_event( + &mut jobs, + &mut self.event_effects, + evt, + ) + } + log::PartitionLogEntry::JobTimerFired(evt) => { + wflow_core::partition::reduce::reduce_job_timer_fired_event( + &mut jobs, + &mut self.event_effects, + evt, + ) + } log::PartitionLogEntry::JobPartitionEffects(_) => { unreachable!() } From 913aa441839ddc10198dfda164e27f4de6cae457 Mon Sep 17 00:00:00 2001 From: dman-os <61868957+dman-os@users.noreply.github.com> Date: Sat, 4 Apr 2026 03:37:02 +0300 Subject: [PATCH 3/4] fix: clippy --- .github/workflows/builds.yml | 2 +- src/daybook_core/rt.rs | 2 +- src/test_wflows/lib.rs | 2 +- src/wflow_sdk/lib.rs | 31 ++++++++++++++++++++++--------- 4 files changed, 25 insertions(+), 12 deletions(-) diff --git a/.github/workflows/builds.yml b/.github/workflows/builds.yml index 357e7353..1b7a0d24 100644 --- a/.github/workflows/builds.yml +++ b/.github/workflows/builds.yml @@ -213,7 +213,7 @@ jobs: - name: install rust dependencies run: | sudo apt-get update - sudo apt-get install -y protobuf-compiler libarchive-tools + sudo apt-get install -y protobuf-compiler libarchive-tools pkg-config libdbus-1-dev - name: verify archive tools run: | diff --git a/src/daybook_core/rt.rs b/src/daybook_core/rt.rs index 6788c886..cf47c5cd 100644 --- a/src/daybook_core/rt.rs +++ b/src/daybook_core/rt.rs @@ -734,7 +734,7 @@ impl Rt { .await?; false } - JobRunResult::StepEffect(..) => false, + JobRunResult::StepEffect(..) | JobRunResult::StepWait(..) => false, }; if is_done { // Handle staging branch cleanup based on success/failure diff --git a/src/test_wflows/lib.rs b/src/test_wflows/lib.rs index fe9e293e..97fbfa61 100644 --- a/src/test_wflows/lib.rs +++ b/src/test_wflows/lib.rs @@ -161,7 +161,7 @@ fn sleep_then_succeed(cx: WflowCtx, args: SleepThenSucceedArgs) -> Result<(), Jo struct RecvMessageArgs {} fn recv_message(cx: WflowCtx, _args: RecvMessageArgs) -> Result<(), JobErrorX> { - let Json(value) = cx.recv::()?; + let Json(value) = cx.recv::>()?; let _ = value; Ok(()) } diff --git a/src/wflow_sdk/lib.rs b/src/wflow_sdk/lib.rs index 70187a6f..2e674ba7 100644 --- a/src/wflow_sdk/lib.rs +++ b/src/wflow_sdk/lib.rs @@ -47,6 +47,25 @@ pub struct WflowCtx { pub struct Json(pub T); +pub trait RecvCodec: Sized { + fn decode(value_json: &str) -> Result; +} + +impl RecvCodec for Json +where + T: serde::de::DeserializeOwned, +{ + fn decode(value_json: &str) -> Result { + let value = serde_json::from_str(value_json).map_err(|err| { + JobErrorX::Terminal(ferr!( + "error parsing workflow message json for '{type_name}': {err:?}", + type_name = std::any::type_name::() + )) + })?; + Ok(Json(value)) + } +} + impl WflowCtx { pub fn effect(&self, func: F) -> Result where @@ -96,9 +115,9 @@ impl WflowCtx { } } - pub fn recv(&self) -> Result, JobErrorX> + pub fn recv(&self) -> Result where - T: serde::de::DeserializeOwned, + O: RecvCodec, { let state = host::next_step(&self.job.job_id) .map_err(|err| JobErrorX::Terminal(ferr!("error getting next op: {err}")))?; @@ -110,13 +129,7 @@ impl WflowCtx { })? } }; - let value = serde_json::from_str(&value_json).map_err(|err| { - JobErrorX::Terminal(ferr!( - "error parsing workflow message json for '{type_name}': {err:?}", - type_name = std::any::type_name::() - )) - })?; - Ok(Json(value)) + O::decode(&value_json) } } From 43ca1a2bede26f0d06b1a78937f012e897f1c28a Mon Sep 17 00:00:00 2001 From: dman-os <61868957+dman-os@users.noreply.github.com> Date: Sat, 4 Apr 2026 05:11:18 +0300 Subject: [PATCH 4/4] fix: missing dbus dep --- flake.nix | 1 + 1 file changed, 1 insertion(+) diff --git a/flake.nix b/flake.nix index 07f87a3b..da069007 100644 --- a/flake.nix +++ b/flake.nix @@ -185,6 +185,7 @@ baseBuildInputs = with pkgs; [ pkg-config openssl + dbus protobuf mold deno