diff --git a/Cargo.lock b/Cargo.lock index ee3c544ba..80314cf1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -339,7 +339,7 @@ version = "2.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "99fce0350197dcd4ba4e9a7dd43915d908c0eb0e7352755791709a705e1c76b6" dependencies = [ - "darling", + "darling 0.23.0", "proc-macro2", "quote", "syn 2.0.117", @@ -1322,14 +1322,38 @@ dependencies = [ "typenum", ] +[[package]] +name = "darling" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cdf337090841a411e2a7f3deb9187445851f91b309c0c0a29e05f74a00a48c0" +dependencies = [ + "darling_core 0.21.3", + "darling_macro 0.21.3", +] + [[package]] name = "darling" version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25ae13da2f202d56bd7f91c25fba009e7717a1e4a1cc98a76d844b65ae912e9d" dependencies = [ - "darling_core", - "darling_macro", + "darling_core 0.23.0", + "darling_macro 0.23.0", +] + +[[package]] +name = "darling_core" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1247195ecd7e3c85f83c8d2a366e4210d588e802133e1e355180a9870b517ea4" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 2.0.117", ] [[package]] @@ -1345,13 +1369,24 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "darling_macro" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" +dependencies = [ + "darling_core 0.21.3", + "quote", + "syn 2.0.117", +] + [[package]] name = "darling_macro" version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3984ec7bd6cfa798e62b4a642426a5be0e68f9401cfc2a01e3fa9ea2fcdb8d" dependencies = [ - "darling_core", + "darling_core 0.23.0", "quote", "syn 2.0.117", ] @@ -2504,6 +2539,12 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "pastey" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5a797f0e07bdf071d15742978fc3128ec6c22891c31a3a931513263904c982a" + [[package]] name = "pathdiff" version = "0.2.3" @@ -3327,6 +3368,7 @@ dependencies = [ "dashmap", "derive_more", "eyre", + "libc", "libloading", "paste", "quanta", @@ -3342,6 +3384,7 @@ dependencies = [ "revm-primitives", "revm-state", "revmc-backend", + "revmc-builtins", "revmc-codegen", "revmc-context", "revmc-llvm", @@ -3350,6 +3393,8 @@ dependencies = [ "tempfile", "tracing", "tracing-subscriber 0.3.23", + "wait-timeout", + "wincode", ] [[package]] @@ -3687,7 +3732,7 @@ version = "3.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3db8978e608f1fe7357e211969fd9abdcae80bac1ba7a3369bb7eb6b404eb65" dependencies = [ - "darling", + "darling 0.23.0", "proc-macro2", "quote", "syn 2.0.117", @@ -4438,6 +4483,31 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "wincode" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37095eb18dd6254c66217edc61a29d83d51f8818de8a2ffe88e4584ad73fb5f9" +dependencies = [ + "pastey", + "proc-macro2", + "quote", + "thiserror 2.0.18", + "wincode-derive", +] + +[[package]] +name = "wincode-derive" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e262d55d1261f31e2cfe49cc6385a421d14d99faa0526bbe3cc1bda0d3005c62" +dependencies = [ + "darling 0.21.3", + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "windows-core" version = "0.62.2" diff --git a/Cargo.toml b/Cargo.toml index b0d6a6d06..da3dd5939 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,7 @@ revm-statetest-types = { version = "17.0.0", default-features = false } crossbeam-channel = "0.5" crossbeam-queue = "0.3" +libc = "0.2" rayon = "1.10" color-eyre = "0.6" criterion = { package = "codspeed-criterion-compat", version = "4", default-features = false, features = ["cargo_bench_support"] } @@ -72,6 +73,8 @@ tracing-subscriber = "0.3" tracing-tracy = "0.11" paste = "1.0" quanta = "0.12" +wait-timeout = "0.2" +wincode = { version = "0.5.4", features = ["derive"] } [profile.dev] opt-level = 3 diff --git a/crates/revmc-builtins/src/ir.rs b/crates/revmc-builtins/src/ir.rs index 07d407658..da35636c7 100644 --- a/crates/revmc-builtins/src/ir.rs +++ b/crates/revmc-builtins/src/ir.rs @@ -126,6 +126,7 @@ macro_rules! builtins { #[allow(unused_variables)] impl Builtin { pub const COUNT: usize = builtins!(@count $($ident),*); + pub const ALL: &[Self; Self::COUNT] = &[$(Self::$ident),*]; pub const fn name(self) -> &'static str { match self { @@ -139,6 +140,13 @@ macro_rules! builtins { } } + pub fn parse(name: &str) -> Option { + match name { + $(stringify!($name) => Some(Self::$ident),)* + _ => None, + } + } + pub fn ret(self, $bcx: &mut B) -> Option { $($types_init)* match self { diff --git a/crates/revmc-cli/src/main.rs b/crates/revmc-cli/src/main.rs index 700ede0d6..fb2d8c17d 100644 --- a/crates/revmc-cli/src/main.rs +++ b/crates/revmc-cli/src/main.rs @@ -21,6 +21,9 @@ enum Command { } fn main() -> Result<()> { + if revmc::runtime::maybe_run_jit_helper()?.is_break() { + return Ok(()); + } if std::env::var_os("RUST_BACKTRACE").is_none() { // SAFETY: This is called at the very beginning of main, before any other threads are // spawned. diff --git a/crates/revmc-codegen/src/compiler/mod.rs b/crates/revmc-codegen/src/compiler/mod.rs index b8a4c1fee..efc565737 100644 --- a/crates/revmc-codegen/src/compiler/mod.rs +++ b/crates/revmc-codegen/src/compiler/mod.rs @@ -502,9 +502,8 @@ impl EvmCompiler { Ok(()) } - /// (AOT) Finalizes the module and writes the compiled object to the given writer. + /// Finalizes the module and writes the compiled object to the given writer. pub fn write_object(&mut self, w: W) -> Result<()> { - ensure!(self.is_aot(), "cannot write AOT object during JIT compilation"); self.finalize()?; { let _t = self.remarks.time(|r| &r.codegen); diff --git a/crates/revmc-llvm/src/lib.rs b/crates/revmc-llvm/src/lib.rs index 0820bf506..1257eb028 100644 --- a/crates/revmc-llvm/src/lib.rs +++ b/crates/revmc-llvm/src/lib.rs @@ -36,7 +36,7 @@ use revmc_backend::{ }; use std::{ cell::Cell, - ffi::CString, + ffi::{CStr, CString}, fmt::{self, Write}, iter, mem::ManuallyDrop, @@ -453,6 +453,20 @@ impl OrcJitState { } } +fn link_jit_object_in_dylib( + orc: &OrcJitState, + jd: orc::JITDylibRef, + symbol_name: &CStr, + object: &[u8], + symbols: &[(CString, usize)], +) -> Result<(usize, orc::ResourceTracker)> { + orc.global.define_builtins(symbols); + let tracker = jd.create_resource_tracker(); + orc.global.jit.add_object_with_rt(symbol_name, object, &tracker).map_err(error_msg)?; + let addr = orc.global.jit.lookup_in(jd, symbol_name).map_err(error_msg)?; + Ok((addr, tracker)) +} + /// Wraps a module in a [`orc::ThreadSafeModule`] for transfer to LLJIT. /// /// Uses a raw pointer cast to work around `Module<'ctx>` invariance — the module's @@ -738,6 +752,44 @@ impl EvmLlvmBackend { Ok(()) } + /// Returns pending absolute symbols collected while translating the current JIT module. + pub fn pending_symbol_names(&self) -> Vec { + self.orc + .as_ref() + .map(|orc| orc.pending_symbols.iter().map(|(name, _)| name.clone()).collect()) + .unwrap_or_default() + } + + /// Links a relocatable object into this backend's JITDylib and returns the function address + /// and resource tracker that owns the linked code. + pub fn link_jit_object( + &mut self, + symbol_name: &CStr, + object: &[u8], + symbols: &[(CString, usize)], + ) -> Result<(usize, orc::ResourceTracker)> { + let orc = self.ensure_orc()?; + let (addr, tracker) = + link_jit_object_in_dylib(orc, orc.jd(), symbol_name, object, symbols)?; + orc.loaded_trackers.push(tracker); + Ok((addr, orc.loaded_trackers.pop().unwrap())) + } + + /// Links a relocatable object into a fresh JITDylib and returns the function address, + /// resource tracker, and guard for the JITDylib that owns the linked code. + pub fn link_jit_object_in_fresh_dylib( + &mut self, + symbol_name: &CStr, + object: &[u8], + symbols: &[(CString, usize)], + ) -> Result<(usize, orc::ResourceTracker, Arc)> { + let orc = self.ensure_orc()?; + let jd = orc.global.create_jit_dylib(); + let jd_guard = Arc::new(JitDylibGuard { global: orc.global, jd }); + let (addr, tracker) = link_jit_object_in_dylib(orc, jd, symbol_name, object, symbols)?; + Ok((addr, tracker, jd_guard)) + } + /// Pops and returns the [`ResourceTracker`](orc::ResourceTracker) for the last committed /// JIT module. /// diff --git a/crates/revmc-llvm/src/orc.rs b/crates/revmc-llvm/src/orc.rs index cdf04bf5e..125b123ce 100644 --- a/crates/revmc-llvm/src/orc.rs +++ b/crates/revmc-llvm/src/orc.rs @@ -17,7 +17,10 @@ use crate::llvm_string; use inkwell::{ context::Context, llvm_sys::{ - core::{LLVMContextCreate, LLVMModuleCreateWithNameInContext}, + core::{ + LLVMContextCreate, LLVMCreateMemoryBufferWithMemoryRangeCopy, + LLVMModuleCreateWithNameInContext, + }, error::*, orc2::{lljit::*, *}, prelude::*, @@ -1398,6 +1401,23 @@ impl LLJIT { }) } + /// Add a relocatable object file to the given ResourceTracker's JITDylib. + pub fn add_object_with_rt( + &self, + name: &CStr, + object: &[u8], + rt: &ResourceTracker, + ) -> Result<(), LLVMString> { + let buf = unsafe { + LLVMCreateMemoryBufferWithMemoryRangeCopy( + object.as_ptr().cast(), + object.len(), + name.as_ptr(), + ) + }; + cvt(unsafe { LLVMOrcLLJITAddObjectFileWithRT(self.as_inner(), rt.as_inner(), buf) }) + } + /// Gets the execution session. pub fn get_execution_session(&self) -> ExecutionSessionRef<'_> { unsafe { ExecutionSessionRef::from_inner(LLVMOrcLLJITGetExecutionSession(self.as_inner())) } diff --git a/crates/revmc-runtime/Cargo.toml b/crates/revmc-runtime/Cargo.toml index 8ac038c5f..5daa6040a 100644 --- a/crates/revmc-runtime/Cargo.toml +++ b/crates/revmc-runtime/Cargo.toml @@ -25,6 +25,7 @@ revmc-backend.workspace = true revmc-codegen.workspace = true revmc-context = { workspace = true, features = ["evm"] } revmc-llvm = { workspace = true, optional = true } +revmc-builtins = { workspace = true, features = ["ir"] } alloy-primitives = { workspace = true, features = ["std", "map-fxhash"] } alloy-evm = { workspace = true, optional = true } @@ -45,11 +46,14 @@ crossbeam-queue.workspace = true dashmap = "6" derive_more = { version = "2", default-features = false, features = ["debug"] } eyre.workspace = true +libc.workspace = true libloading = "0.9" tempfile = "3.10" quanta.workspace = true rayon.workspace = true tracing.workspace = true +wait-timeout.workspace = true +wincode.workspace = true [dev-dependencies] revmc-statetest = { path = "../revmc-statetest" } diff --git a/crates/revmc-runtime/src/runtime/backend.rs b/crates/revmc-runtime/src/runtime/backend.rs index d1e196fa2..b827df4a3 100644 --- a/crates/revmc-runtime/src/runtime/backend.rs +++ b/crates/revmc-runtime/src/runtime/backend.rs @@ -7,7 +7,10 @@ use crate::{ storage::{ ArtifactKey, ArtifactManifest, ArtifactStore, BackendSelection, RuntimeCacheKey, }, - worker::{AotSuccess, CompileJob, SyncNotifier, WorkerPool, WorkerResult, WorkerSuccess}, + worker::{ + AotSuccess, CompileJob, JitCodeBacking, JitObjectSuccess, SyncNotifier, WorkerPool, + WorkerResult, WorkerSuccess, + }, }, }; use alloy_primitives::{ @@ -19,6 +22,7 @@ use crossbeam_queue::ArrayQueue; use dashmap::DashMap; use quanta::Instant; use std::{ + ffi::CString, mem, ops::ControlFlow, sync::{Arc, atomic::Ordering}, @@ -27,6 +31,8 @@ use std::{ #[cfg(feature = "llvm")] use crate::llvm::jit_memory_usage; +#[cfg(feature = "llvm")] +use revmc_context::RawEvmCompilerFn; /// The resident map type: code_hash+spec_id → compiled program. pub(crate) type ResidentMap = DashMap, DefaultHashBuilder>; @@ -56,6 +62,65 @@ fn jit_total_bytes() -> usize { } } +#[cfg(feature = "llvm")] +struct JitObjectLinker { + backend: Option, +} + +#[cfg(feature = "llvm")] +impl JitObjectLinker { + const fn new() -> Self { + Self { backend: None } + } + + fn link( + &mut self, + success: &JitObjectSuccess, + ) -> eyre::Result<(EvmCompilerFn, Arc)> { + let backend = match &mut self.backend { + Some(backend) => backend, + None => self.backend.insert(crate::EvmLlvmBackend::new(false)?), + }; + + let symbol_name = CString::new(success.symbol_name.clone())?; + let builtin_symbols = success + .builtin_symbols + .iter() + .map(|name| { + let addr = revmc_builtins::Builtin::parse(name) + .ok_or_else(|| eyre::eyre!("unknown builtin symbol: {name}"))? + .addr(); + Ok((CString::new(name.as_str())?, addr)) + }) + .collect::>>()?; + let (addr, tracker, jd_guard) = backend.link_jit_object_in_fresh_dylib( + &symbol_name, + &success.object_bytes, + &builtin_symbols, + )?; + let func = + EvmCompilerFn::new(unsafe { std::mem::transmute::(addr) }); + Ok((func, Arc::new(JitCodeBacking::new(tracker, jd_guard)))) + } +} + +#[cfg(not(feature = "llvm"))] +struct JitObjectLinker; + +#[cfg(not(feature = "llvm"))] +impl JitObjectLinker { + const fn new() -> Self { + Self + } + + fn link( + &mut self, + _success: &JitObjectSuccess, + ) -> eyre::Result<(EvmCompilerFn, Arc)> { + eyre::bail!("LLVM backend not available") + } +} + /// Commands sent to the backend thread on the bounded command channel. /// /// Lookup-observed events are NOT carried here — they go through the @@ -71,6 +136,10 @@ pub(crate) enum Command { ClearPersisted, /// Clear both resident and persisted. ClearAll, + /// Pause out-of-process helper execution. + Pause, + /// Resume out-of-process helper execution. + Resume, /// Shut down the backend. Shutdown, } @@ -135,6 +204,8 @@ struct BackendState { entries: HashMap, /// Worker pool for JIT compilation. workers: WorkerPool, + /// Backend-thread-owned linker for out-of-process JIT objects. + jit_object_linker: JitObjectLinker, /// Receiver for worker results. result_rx: chan::Receiver, /// Artifact store for persisted artifacts. @@ -161,6 +232,8 @@ impl BackendState { Command::ClearResident => self.handle_clear_resident(), Command::ClearPersisted => self.handle_clear_persisted(), Command::ClearAll => self.handle_clear_all(), + Command::Pause => self.workers.pause(), + Command::Resume => self.workers.resume(), Command::Shutdown => return ControlFlow::Break(()), } ControlFlow::Continue(()) @@ -376,6 +449,7 @@ impl BackendState { } fn handle_clear_resident(&mut self) { + self.workers.cancel_in_flight(); self.inner.resident.clear(); self.resident_meta.clear(); // Notify any pending sync callers before clearing entries. @@ -479,6 +553,9 @@ impl BackendState { Ok(WorkerSuccess::Aot(success)) => { self.handle_aot_success(result.key, success); } + Ok(WorkerSuccess::JitObject(success)) => { + self.handle_jit_object_success(result.key, success, result.compile_duration); + } Err(err) => { self.entries.remove(&result.key); self.inner.stats.compilations_failed.fetch_add(1, Ordering::Relaxed); @@ -495,6 +572,41 @@ impl BackendState { notify(); } + fn handle_jit_object_success( + &mut self, + key: RuntimeCacheKey, + success: JitObjectSuccess, + compile_duration: std::time::Duration, + ) { + match self.jit_object_linker.link(&success) { + Ok((func, backing)) => { + let program = Arc::new(CompiledProgram::new_jit(key, func, backing)); + self.insert_resident(key, program); + self.entries.remove(&key); + self.inner.stats.compilations_succeeded.fetch_add(1, Ordering::Relaxed); + + debug!( + code_hash = %key.code_hash, + spec_id = ?key.spec_id, + compile_time = ?compile_duration, + object_len = success.object_bytes.len(), + "JIT object linked and published to resident map", + ); + } + Err(err) => { + self.entries.remove(&key); + self.inner.stats.compilations_failed.fetch_add(1, Ordering::Relaxed); + + warn!( + code_hash = %key.code_hash, + error = %err, + compile_time = ?compile_duration, + "failed to link JIT object", + ); + } + } + } + fn handle_aot_success(&mut self, key: RuntimeCacheKey, success: AotSuccess) { let artifact_key = ArtifactKey { runtime: key, @@ -700,7 +812,7 @@ pub(crate) fn run( let (result_tx, result_rx) = chan::unbounded::(); - let workers = WorkerPool::new(result_tx, config.clone()); + let workers = WorkerPool::new(result_tx, config.clone(), Arc::clone(&inner.stats)); let sweep_interval = config.tuning.eviction_sweep_interval; let event_drain_interval = config.tuning.event_drain_interval; @@ -717,6 +829,7 @@ pub(crate) fn run( resident_meta: preload_meta, entries: HashMap::default(), workers, + jit_object_linker: JitObjectLinker::new(), result_rx, store: config.store, tuning: config.tuning, @@ -760,3 +873,50 @@ pub(crate) fn run( state.workers.shutdown(); while state.result_rx.try_recv().is_ok() {} } + +#[cfg(all(test, feature = "llvm"))] +mod tests { + use super::*; + use crate::runtime::worker::{ + CompileJob, WorkerSuccess, compile_jit_object_artifact, create_compiler, + }; + use revm_primitives::hardfork::SpecId; + + /// PUSH1 0x42 PUSH0 MSTORE PUSH1 0x20 PUSH0 RETURN. + const BYTECODE_RET42: &[u8] = &[0x60, 0x42, 0x5f, 0x52, 0x60, 0x20, 0x5f, 0xf3]; + + fn compile_jit_object(symbol_name: &str) -> JitObjectSuccess { + let config = RuntimeConfig::default(); + let mut compiler = create_compiler(&config, true).unwrap(); + let job = CompileJob { + kind: CompilationKind::Jit, + key: RuntimeCacheKey { code_hash: keccak256(BYTECODE_RET42), spec_id: SpecId::CANCUN }, + bytecode: Bytes::copy_from_slice(BYTECODE_RET42), + symbol_name: symbol_name.to_owned(), + opt_level: config.tuning.jit_opt_level, + sync_notifier: SyncNotifier::none(), + generation: 0, + }; + + match compile_jit_object_artifact(&job, &mut compiler).unwrap() { + WorkerSuccess::JitObject(success) => success, + _ => unreachable!(), + } + } + + #[test] + fn jit_object_linker_relinks_live_symbol_name() { + let success = compile_jit_object("jit_duplicate_symbol"); + let success2 = JitObjectSuccess { + symbol_name: success.symbol_name.clone(), + object_bytes: success.object_bytes.clone(), + builtin_symbols: success.builtin_symbols.clone(), + }; + + let mut linker = JitObjectLinker::new(); + let (_first_func, _first_backing) = linker.link(&success).unwrap(); + let (_second_func, _second_backing) = linker + .link(&success2) + .expect("same symbol should link while previous backing remains alive"); + } +} diff --git a/crates/revmc-runtime/src/runtime/config.rs b/crates/revmc-runtime/src/runtime/config.rs index 449756f32..e807c8f6c 100644 --- a/crates/revmc-runtime/src/runtime/config.rs +++ b/crates/revmc-runtime/src/runtime/config.rs @@ -1,10 +1,15 @@ //! Runtime configuration. -use crate::{CompileTimings, runtime::storage::ArtifactStore}; +use crate::{CompileTimings, eyre, runtime::storage::ArtifactStore}; use alloy_primitives::B256; use revm_context_interface::cfg::GasParams; use revm_primitives::hardfork::SpecId; -use std::{path::PathBuf, sync::Arc, time::Duration}; +use std::{path::PathBuf, str::FromStr, sync::Arc, time::Duration}; + +const JIT_MODE_ENV: &str = "REVMC_JIT_MODE"; +const JIT_HELPER_PATH_ENV: &str = "REVMC_JIT_HELPER_PATH"; +const JIT_HELPER_MEMORY_LIMIT_ENV: &str = "REVMC_JIT_HELPER_MEMORY_LIMIT_BYTES"; +const JIT_HELPER_CPU_COUNT_ENV: &str = "REVMC_JIT_HELPER_CPU_COUNT"; /// Runtime configuration. #[derive(Clone, derive_more::Debug)] @@ -76,6 +81,20 @@ pub struct RuntimeConfig { /// Defaults to `false`. pub aot: bool, + /// Where JIT compilation work runs. + /// + /// Defaults to [`JitMode::InProcess`]. + pub jit_mode: JitMode, + + /// Helper executable used when [`jit_mode`](Self::jit_mode) + /// is [`JitMode::OutOfProcess`]. + /// + /// When `None`, the runtime spawns `std::env::current_exe()` and expects it + /// to call [`super::maybe_run_jit_helper`] during startup. + /// + /// Defaults to `None`. + pub jit_helper_path: Option, + /// Blocking mode: every lookup synchronously JIT-compiles on miss and never /// falls back to the interpreter. /// @@ -123,6 +142,53 @@ pub enum CompilationKind { Aot, } +/// Where JIT compilation work runs. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub enum JitMode { + /// Compile on background threads in this process. + #[default] + InProcess, + /// Compile in a helper process and link the result into this process. + /// + /// This is reserved for the out-of-process JIT implementation and is + /// disabled by default. + OutOfProcess, +} + +impl FromStr for JitMode { + type Err = String; + + fn from_str(s: &str) -> Result { + Ok(match s { + "in-process" => Self::InProcess, + "out-of-process" => Self::OutOfProcess, + _ => return Err(format!("unknown JIT mode: {s}")), + }) + } +} + +impl RuntimeConfig { + /// Applies runtime environment overrides. + /// + /// Recognized variables are `REVMC_JIT_MODE`, `REVMC_JIT_HELPER_PATH`, + /// `REVMC_JIT_HELPER_MEMORY_LIMIT_BYTES`, and `REVMC_JIT_HELPER_CPU_COUNT`. + pub fn with_env_overrides(mut self) -> eyre::Result { + if let Some(mode) = env_var(JIT_MODE_ENV) { + self.jit_mode = mode.parse().map_err(|e: String| eyre::eyre!("{JIT_MODE_ENV}: {e}"))?; + } + if let Some(path) = env_path(JIT_HELPER_PATH_ENV) { + self.jit_helper_path = Some(path); + } + if let Some(limit) = parse_env_u64(JIT_HELPER_MEMORY_LIMIT_ENV)? { + self.tuning.jit_helper_memory_limit_bytes = limit; + } + if let Some(count) = parse_env_usize(JIT_HELPER_CPU_COUNT_ENV)? { + self.tuning.jit_helper_cpu_count = count; + } + Ok(self) + } +} + impl Default for RuntimeConfig { fn default() -> Self { Self { @@ -136,6 +202,8 @@ impl Default for RuntimeConfig { no_dse: false, gas_params: None, aot: false, + jit_mode: JitMode::default(), + jit_helper_path: None, blocking: false, on_compilation: None, } @@ -185,6 +253,31 @@ pub struct RuntimeTuning { /// Defaults to `min(max(1, cpus/2), 4)`. pub jit_worker_count: usize, + /// Timeout for a single out-of-process JIT compilation job. + /// + /// When exceeded, the helper process is killed and a fresh helper is spawned for + /// the next job. Only applies to [`JitMode::OutOfProcess`]. + /// + /// Defaults to `5s`. + pub jit_timeout: Duration, + + /// Maximum address space for the out-of-process JIT helper, in bytes. + /// + /// `0` disables the limit. On Unix this is applied with `RLIMIT_AS` before + /// the helper process starts executing. + /// + /// Defaults to `0`. + pub jit_helper_memory_limit_bytes: u64, + + /// Maximum CPU count for the out-of-process JIT helper. + /// + /// `0` disables the limit. On Linux this limits the helper's CPU affinity + /// to the first N CPUs from the helper's current affinity mask before the + /// helper process starts executing. + /// + /// Defaults to `0`. + pub jit_helper_cpu_count: usize, + /// Capacity of the per-worker job queue. /// /// Defaults to `64`. @@ -253,6 +346,9 @@ impl Default for RuntimeTuning { jit_max_bytecode_len: 0, jit_max_pending_jobs: 2048, jit_worker_count: worker_count, + jit_timeout: Duration::from_secs(5), + jit_helper_memory_limit_bytes: 0, + jit_helper_cpu_count: 0, jit_worker_queue_capacity: 64, jit_opt_level: crate::OptimizationLevel::default(), aot_opt_level: crate::OptimizationLevel::default(), @@ -263,3 +359,24 @@ impl Default for RuntimeTuning { } } } + +fn parse_env_u64(name: &str) -> eyre::Result> { + let Some(value) = env_var(name) else { return Ok(None) }; + value.parse().map(Some).map_err(|e| eyre::eyre!("{name}: {e}")) +} + +fn parse_env_usize(name: &str) -> eyre::Result> { + let Some(value) = env_var(name) else { return Ok(None) }; + value.parse().map(Some).map_err(|e| eyre::eyre!("{name}: {e}")) +} + +fn env_var(name: &str) -> Option { + std::env::var(name).ok() +} + +fn env_path(name: &str) -> Option { + std::env::var_os(name).map(|path| { + let path = PathBuf::from(path); + path.canonicalize().unwrap_or(path) + }) +} diff --git a/crates/revmc-runtime/src/runtime/mod.rs b/crates/revmc-runtime/src/runtime/mod.rs index efa4842a2..bc5919775 100644 --- a/crates/revmc-runtime/src/runtime/mod.rs +++ b/crates/revmc-runtime/src/runtime/mod.rs @@ -16,9 +16,10 @@ use crossbeam_queue::ArrayQueue; use revm_primitives::{B256, hardfork::SpecId, hints_util::cold_path}; use stats::RuntimeStats; use std::{ + ops::ControlFlow, sync::{ Arc, - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicUsize, Ordering}, }, time::Duration, }; @@ -30,7 +31,7 @@ pub use api::{ }; mod config; -pub use config::{CompilationEvent, CompilationKind, RuntimeConfig, RuntimeTuning}; +pub use config::{CompilationEvent, CompilationKind, JitMode, RuntimeConfig, RuntimeTuning}; mod backend; @@ -43,8 +44,30 @@ pub use storage::{ RuntimeCacheKey, StoredArtifact, }; +#[cfg(all(feature = "llvm", unix))] +mod out_of_process; + mod worker; +/// Runs the out-of-process JIT helper if this process was launched as one. +/// +/// Returns [`ControlFlow::Break`] after the helper request has been handled and +/// the caller should exit immediately. Normal application startup should +/// continue on [`ControlFlow::Continue`]. +pub fn maybe_run_jit_helper() -> eyre::Result> { + #[cfg(all(feature = "llvm", unix))] + { + out_of_process::maybe_run_jit_helper() + } + #[cfg(not(all(feature = "llvm", unix)))] + { + if std::env::var_os("REVMC_JIT_HELPER").is_some() { + eyre::bail!("out-of-process JIT helper is only available on Unix with LLVM") + } + Ok(ControlFlow::Continue(())) + } +} + #[cfg(test)] mod tests; @@ -64,9 +87,11 @@ pub(crate) struct BackendShared { /// Lock-free queue of events. #[debug(skip)] events: EventQueue, + /// Number of active out-of-process helper pauses. + pause_depth: AtomicUsize, /// Shared stats counters. #[debug(skip)] - stats: RuntimeStats, + stats: Arc, } /// Inner state for [`JitBackend`]. Owns the backend thread lifecycle. @@ -124,7 +149,7 @@ impl JitBackend { /// Call [`set_enabled`](Self::set_enabled) to lazily spawn the backend thread with /// a default [`RuntimeConfig`]. pub fn disabled() -> Self { - Self::new(RuntimeConfig::default()).expect("default config cannot fail") + Self::new_inner(RuntimeConfig::default()).expect("default config cannot fail") } /// Creates a backend from the given config. @@ -133,10 +158,19 @@ impl JitBackend { /// immediately and AOT artifacts are preloaded. Otherwise, both are deferred until the /// first [`set_enabled(true)`](Self::set_enabled) call. pub fn new(mut config: RuntimeConfig) -> eyre::Result { + config = config.with_env_overrides()?; + Self::new_inner(config) + } + + fn new_inner(mut config: RuntimeConfig) -> eyre::Result { if config.blocking { config.enabled = true; config.tuning.jit_hot_threshold = 0; } + #[cfg(not(unix))] + if config.jit_mode == JitMode::OutOfProcess { + eyre::bail!("out-of-process JIT is only available on Unix"); + } let enabled = config.enabled; let (tx, rx) = chan::bounded::(config.tuning.channel_capacity); @@ -145,7 +179,8 @@ impl JitBackend { let shared = Arc::new(BackendShared { resident: ResidentMap::default(), events, - stats: RuntimeStats::default(), + pause_depth: AtomicUsize::new(0), + stats: Arc::new(RuntimeStats::default()), }); let this = Self { inner: Arc::new(BackendInner { @@ -324,6 +359,39 @@ impl JitBackend { self.inner.enabled.load(Ordering::Relaxed) } + /// Pauses out-of-process helper execution. + /// + /// Resident compiled functions are still returned by [`lookup`](Self::lookup), and lookup + /// events are still processed for stats, hotness tracking, and compilation dispatch. In + /// out-of-process mode, the helper process group is stopped until the pause depth returns to + /// zero, so dispatched helper requests remain buffered and resume once the helper continues. + /// In in-process mode, pause only tracks pause depth. + pub fn pause(&self) { + if self.inner.shared.pause_depth.fetch_add(1, Ordering::Relaxed) == 0 { + let _ = self.inner.tx.send(Command::Pause); + } + } + + /// Resumes out-of-process helper execution once all active pauses have been released. + pub fn resume(&self) { + if self + .inner + .shared + .pause_depth + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |depth| { + Some(depth.saturating_sub(1)) + }) + .is_ok_and(|depth| depth == 1) + { + let _ = self.inner.tx.send(Command::Resume); + } + } + + /// Returns whether out-of-process helper execution is paused. + pub fn is_paused(&self) -> bool { + self.inner.shared.pause_depth.load(Ordering::Relaxed) != 0 + } + /// Sets whether the runtime is enabled, spawning the backend thread on first enable. /// /// When `enabled` is `true` and the backend thread has not been spawned yet, this @@ -354,6 +422,7 @@ impl JitBackend { debug!( blocking = self.inner.blocking, workers = config.tuning.jit_worker_count, + jit_mode = ?config.jit_mode, hot_threshold = config.tuning.jit_hot_threshold, channel_capacity = config.tuning.channel_capacity, "spawning backend thread", diff --git a/crates/revmc-runtime/src/runtime/out_of_process.rs b/crates/revmc-runtime/src/runtime/out_of_process.rs new file mode 100644 index 000000000..556b1125f --- /dev/null +++ b/crates/revmc-runtime/src/runtime/out_of_process.rs @@ -0,0 +1,956 @@ +//! Out-of-process JIT helper process and IPC. + +use crate::{ + CompileTimings, OptimizationLevel, eyre, + runtime::{ + config::{CompilationKind, RuntimeConfig}, + stats::RuntimeStats, + storage::RuntimeCacheKey, + worker::{ + CompileJob, CompilerState, CompilerTarget, JitObjectSuccess, SyncNotifier, + WorkerResult, WorkerSuccess, compile_with_state, + }, + }, +}; +use alloy_primitives::{B256, Bytes}; +use crossbeam_channel as chan; +use rayon::ThreadPoolBuilder; +use revm_context_interface::cfg::{GasParams, gas_params::GasId}; +use revm_primitives::hardfork::SpecId; +use std::{ + cell::RefCell, + collections::HashMap, + io::{BufReader, BufWriter, Read, Write}, + ops::ControlFlow, + os::unix::process::CommandExt, + path::PathBuf, + process::{Child, ChildStdin, Command, Stdio}, + sync::{ + Arc, Condvar, Mutex, + atomic::{AtomicBool, AtomicU64, Ordering}, + }, + thread::JoinHandle, + time::{Duration, Instant}, +}; +use wait_timeout::ChildExt; +use wincode::{SchemaRead, SchemaWrite}; + +const HELPER_ENV: &str = "REVMC_JIT_HELPER"; +const GAS_PARAM_COUNT: usize = 256; +const HELPER_PAUSE_TIMEOUT: Duration = Duration::from_millis(100); + +type GasParamPairs = Vec<(u8, u64)>; +type PendingResponses = Arc>>>>; +type PendingPauses = Arc>>>>; + +/// Runs the out-of-process JIT helper if this process was launched as one. +pub(super) fn maybe_run_jit_helper() -> eyre::Result> { + if std::env::var_os(HELPER_ENV).is_none() { + return Ok(ControlFlow::Continue(())); + } + run_jit_helper_stdio()?; + Ok(ControlFlow::Break(())) +} + +/// Compiles a job in the out-of-process helper. +pub(super) fn compile_job( + job: CompileJob, + config: &RuntimeConfig, + helper: &HelperProcess, +) -> WorkerResult { + let t0 = Instant::now(); + let (outcome, timings) = match run_helper_job(&job, config, helper) { + Ok(result) => (result.outcome, result.timings), + Err(error) => (Err(error), CompileTimings::default()), + }; + WorkerResult { + key: job.key, + outcome, + kind: job.kind, + sync_notifier: job.sync_notifier, + generation: job.generation, + compile_duration: t0.elapsed(), + timings, + } +} + +fn run_helper_job( + job: &CompileJob, + config: &RuntimeConfig, + helper: &HelperProcess, +) -> Result { + helper.compile(job, config) +} + +struct HelperJobResult { + outcome: Result, + timings: CompileTimings, +} + +struct HelperIo { + stdin: BufWriter, +} + +pub(super) struct HelperProcess { + inner: Mutex>>, + paused: Arc, + stats: Arc, +} + +impl HelperProcess { + pub(super) fn new(stats: Arc) -> Self { + Self { inner: Mutex::new(None), paused: Arc::new(AtomicBool::new(false)), stats } + } + + fn compile(&self, job: &CompileJob, config: &RuntimeConfig) -> Result { + let helper = { + let mut slot = self.inner.lock().unwrap(); + if slot.as_ref().is_none_or(|helper| !helper.matches_config(config)) { + let restarting = slot.is_some(); + debug!("spawning JIT helper"); + match HelperProcessInner::spawn(config, self.stats.clone(), self.paused.clone()) { + Ok(helper) => { + if self.paused.load(Ordering::Relaxed) { + helper.pause(); + } + self.stats.jit_helper_spawns.fetch_add(1, Ordering::Relaxed); + if restarting { + self.stats.jit_helper_restarts.fetch_add(1, Ordering::Relaxed); + } + *slot = Some(Arc::new(helper)); + } + Err(err) => { + self.stats.jit_helper_spawn_failures.fetch_add(1, Ordering::Relaxed); + return Err(err); + } + } + } + slot.as_ref().unwrap().clone() + }; + + match helper.compile(job, config) { + Ok(result) => Ok(result), + Err(err) => { + let mut slot = self.inner.lock().unwrap(); + if slot.as_ref().is_some_and(|current| Arc::ptr_eq(current, &helper)) { + warn!(error = %err, "discarding JIT helper after failed job"); + self.stats.jit_helper_restarts.fetch_add(1, Ordering::Relaxed); + *slot = None; + } + Err(err) + } + } + } + + pub(super) fn cancel_in_flight(&self) { + if let Some(helper) = self.inner.lock().unwrap().take() { + helper.kill(); + } + } + + pub(super) fn pause(&self) { + self.stats.jit_helper_pause_requests.fetch_add(1, Ordering::Relaxed); + self.paused.store(true, Ordering::Relaxed); + if let Some(helper) = self.inner.lock().unwrap().as_ref() { + helper.pause(); + } + } + + pub(super) fn resume(&self) { + self.stats.jit_helper_resume_requests.fetch_add(1, Ordering::Relaxed); + self.paused.store(false, Ordering::Relaxed); + if let Some(helper) = self.inner.lock().unwrap().as_ref() { + helper.resume(); + } + } +} + +struct HelperProcessInner { + path: PathBuf, + init: HelperInit, + child: Mutex, + io: Mutex, + pending: PendingResponses, + pending_pauses: PendingPauses, + reader: Mutex>>, + next_job_id: AtomicU64, + shutdown_timeout: Duration, + stats: Arc, + paused: Arc, +} + +impl HelperProcessInner { + fn spawn( + config: &RuntimeConfig, + stats: Arc, + paused: Arc, + ) -> Result { + let path = match &config.jit_helper_path { + Some(path) => path.clone(), + None => { + std::env::current_exe().map_err(|e| format!("failed to locate current exe: {e}"))? + } + }; + let init = helper_init(config); + let mut command = Command::new(&path); + command + .env(HELPER_ENV, "1") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()); + apply_helper_limits(&mut command, config); + + let mut child = command.spawn().map_err(|e| format!("failed to spawn JIT helper: {e}"))?; + let mut stdin = BufWriter::new(child.stdin.take().ok_or("helper stdin unavailable")?); + write_init(&mut stdin, &init).map_err(|e| format!("failed to write helper init: {e}"))?; + stdin.flush().map_err(|e| format!("failed to flush helper init: {e}"))?; + let stdout = child.stdout.take().ok_or("helper stdout unavailable")?; + let pending = PendingResponses::default(); + let pending_pauses = PendingPauses::default(); + let reader_pending = pending.clone(); + let reader_pending_pauses = pending_pauses.clone(); + let reader_stats = stats.clone(); + let reader = std::thread::spawn(move || { + let mut stdout = BufReader::new(stdout); + loop { + let result = read_helper_response(&mut stdout); + match result { + Ok(HelperResponseMessage::Job(id, result)) => { + if let Some(tx) = reader_pending.lock().unwrap().remove(&id) { + let _ = tx.send(Ok(result)); + } + } + Ok(HelperResponseMessage::Paused(id)) => { + if let Some(tx) = reader_pending_pauses.lock().unwrap().remove(&id) { + let _ = tx.send(Ok(())); + } + } + Err(error) => { + reader_stats.jit_helper_disconnects.fetch_add(1, Ordering::Relaxed); + for (_, tx) in reader_pending.lock().unwrap().drain() { + let _ = tx.send(Err(error.clone())); + } + for (_, tx) in reader_pending_pauses.lock().unwrap().drain() { + let _ = tx.send(Err(error.clone())); + } + break; + } + } + } + }); + Ok(Self { + path, + init, + child: Mutex::new(child), + io: Mutex::new(HelperIo { stdin }), + pending, + pending_pauses, + reader: Mutex::new(Some(reader)), + next_job_id: AtomicU64::new(0), + shutdown_timeout: config.tuning.shutdown_timeout, + stats, + paused, + }) + } + + fn matches_config(&self, config: &RuntimeConfig) -> bool { + if self.init != helper_init(config) { + return false; + } + match &config.jit_helper_path { + Some(path) => self.path == *path, + None => std::env::current_exe().map(|path| self.path == path).unwrap_or(false), + } + } + + fn compile(&self, job: &CompileJob, config: &RuntimeConfig) -> Result { + let id = self.next_job_id.fetch_add(1, Ordering::Relaxed); + let (tx, rx) = chan::bounded(1); + self.pending.lock().unwrap().insert(id, tx); + + { + let mut io = self.io.lock().unwrap(); + if let Err(e) = write_job(&mut io.stdin, id, job) { + self.pending.lock().unwrap().remove(&id); + return Err(format!("failed to write helper job: {e}")); + } + if let Err(e) = io.stdin.flush() { + self.pending.lock().unwrap().remove(&id); + return Err(format!("failed to flush helper job: {e}")); + } + } + + loop { + match rx.recv_timeout(config.tuning.jit_timeout) { + Ok(result) => return result, + Err(chan::RecvTimeoutError::Timeout) if self.paused.load(Ordering::Relaxed) => { + continue; + } + Err(chan::RecvTimeoutError::Timeout) => { + warn!(timeout = ?config.tuning.jit_timeout, "JIT helper timed out"); + self.pending.lock().unwrap().remove(&id); + self.stats.jit_helper_timeouts.fetch_add(1, Ordering::Relaxed); + self.kill(); + return Err(format!( + "JIT helper timed out after {:?}; helper will be restarted", + config.tuning.jit_timeout + )); + } + Err(chan::RecvTimeoutError::Disconnected) => { + let status = self.child.lock().unwrap().try_wait().ok().flatten(); + let message = match status { + Some(status) => format!("JIT helper exited with {status}"), + None => "JIT helper disconnected".into(), + }; + warn!(message, "JIT helper disconnected"); + self.stats.jit_helper_disconnects.fetch_add(1, Ordering::Relaxed); + return Err(format!("{message}; helper will be restarted")); + } + } + } + } + + fn kill(&self) -> bool { + let mut child = self.child.lock().unwrap(); + if matches!(child.try_wait(), Ok(Some(_))) { + return true; + } + kill_helper(&mut child); + match child.wait_timeout(self.shutdown_timeout) { + Ok(Some(_)) => true, + Ok(None) => { + warn!(timeout = ?self.shutdown_timeout, "timed out waiting for JIT helper exit"); + false + } + Err(err) => { + warn!(%err, "failed to wait for JIT helper exit"); + false + } + } + } + + fn pause(&self) { + let id = self.next_job_id.fetch_add(1, Ordering::Relaxed); + let (tx, rx) = chan::bounded(1); + self.pending_pauses.lock().unwrap().insert(id, tx); + + let send_result = { + let mut io = self.io.lock().unwrap(); + write_pause(&mut io.stdin, id).and_then(|()| io.stdin.flush()) + }; + + if let Err(err) = send_result { + self.pending_pauses.lock().unwrap().remove(&id); + self.stats.jit_helper_pause_failures.fetch_add(1, Ordering::Relaxed); + warn!(%err, "failed to request graceful JIT helper pause"); + } else { + match rx.recv_timeout(HELPER_PAUSE_TIMEOUT) { + Ok(Ok(())) => { + self.stats.jit_helper_pause_acknowledgements.fetch_add(1, Ordering::Relaxed); + } + Ok(Err(err)) => { + self.stats.jit_helper_pause_failures.fetch_add(1, Ordering::Relaxed); + warn!(%err, "JIT helper graceful pause failed"); + } + Err(chan::RecvTimeoutError::Timeout) => { + self.pending_pauses.lock().unwrap().remove(&id); + self.stats.jit_helper_pause_timeouts.fetch_add(1, Ordering::Relaxed); + warn!(timeout = ?HELPER_PAUSE_TIMEOUT, "timed out waiting for JIT helper pause"); + } + Err(chan::RecvTimeoutError::Disconnected) => { + self.stats.jit_helper_pause_failures.fetch_add(1, Ordering::Relaxed); + warn!("JIT helper disconnected before pause acknowledgement"); + } + } + } + + self.signal(libc::SIGSTOP, "pause"); + } + + fn resume(&self) { + self.signal(libc::SIGCONT, "resume"); + let send_result = { + let mut io = self.io.lock().unwrap(); + write_resume(&mut io.stdin).and_then(|()| io.stdin.flush()) + }; + if let Err(err) = send_result { + self.stats.jit_helper_resume_failures.fetch_add(1, Ordering::Relaxed); + warn!(%err, "failed to request JIT helper resume"); + } + } + + fn signal(&self, signal: libc::c_int, action: &str) { + let mut child = self.child.lock().unwrap(); + if matches!(child.try_wait(), Ok(Some(_))) { + return; + } + signal_helper(&child, signal, action); + } +} + +impl Drop for HelperProcessInner { + fn drop(&mut self) { + let exited = self.kill(); + if exited && let Some(reader) = self.reader.lock().unwrap().take() { + let _ = reader.join(); + } + } +} + +fn apply_helper_limits(command: &mut Command, config: &RuntimeConfig) { + let memory_limit = config.tuning.jit_helper_memory_limit_bytes; + let cpu_count = config.tuning.jit_helper_cpu_count; + + // SAFETY: `pre_exec` runs in the child after fork and before exec. The closure only calls + // libc process/resource/affinity syscalls and constructs an `io::Error` if they fail. + unsafe { + command.pre_exec(move || { + set_process_group()?; + if memory_limit > 0 { + set_rlimit(libc::RLIMIT_AS as _, memory_limit)?; + } + if cpu_count > 0 { + limit_cpu_affinity(cpu_count)?; + } + Ok(()) + }); + } +} + +fn set_process_group() -> std::io::Result<()> { + if unsafe { libc::setpgid(0, 0) } != 0 { + return Err(std::io::Error::last_os_error()); + } + Ok(()) +} + +fn kill_helper(child: &mut Child) { + let pid = child.id() as libc::pid_t; + if unsafe { libc::kill(-pid, libc::SIGKILL) } == 0 { + return; + } + + let err = std::io::Error::last_os_error(); + if err.raw_os_error() != Some(libc::ESRCH) { + warn!(%err, "failed to kill JIT helper process group"); + } + if let Err(err) = child.kill() { + warn!(%err, "failed to kill JIT helper"); + } +} + +fn signal_helper(child: &Child, signal: libc::c_int, action: &str) { + let pid = child.id() as libc::pid_t; + if unsafe { libc::kill(-pid, signal) } == 0 { + return; + } + + let err = std::io::Error::last_os_error(); + if err.raw_os_error() != Some(libc::ESRCH) { + warn!(%err, signal, action, "failed to signal JIT helper process group"); + } +} + +fn set_rlimit(resource: libc::c_int, value: u64) -> std::io::Result<()> { + let value = libc::rlim_t::try_from(value).unwrap_or(libc::rlim_t::MAX); + let limit = libc::rlimit { rlim_cur: value, rlim_max: value }; + if unsafe { libc::setrlimit(resource as _, &limit) } != 0 { + return Err(std::io::Error::last_os_error()); + } + Ok(()) +} + +#[cfg(any(target_os = "linux", target_os = "android"))] +fn limit_cpu_affinity(cpu_count: usize) -> std::io::Result<()> { + let mut current = unsafe { std::mem::zeroed::() }; + let size = std::mem::size_of::(); + if unsafe { libc::sched_getaffinity(0, size, &mut current) } != 0 { + return Err(std::io::Error::last_os_error()); + } + + let mut limited = unsafe { std::mem::zeroed::() }; + let mut remaining = cpu_count; + for cpu in 0..(8 * size) { + if unsafe { libc::CPU_ISSET(cpu, ¤t) } { + unsafe { libc::CPU_SET(cpu, &mut limited) }; + remaining -= 1; + if remaining == 0 { + break; + } + } + } + if remaining == cpu_count { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "current CPU affinity mask is empty", + )); + } + + if unsafe { libc::sched_setaffinity(0, size, &limited) } != 0 { + return Err(std::io::Error::last_os_error()); + } + Ok(()) +} + +#[cfg(not(any(target_os = "linux", target_os = "android")))] +fn limit_cpu_affinity(_cpu_count: usize) -> std::io::Result<()> { + Ok(()) +} + +#[derive(Clone, PartialEq, Eq, SchemaWrite, SchemaRead)] +struct HelperInit { + debug_assertions: bool, + no_dedup: bool, + no_dse: bool, + dump_dir: Option, + gas_params: Option, + jit_worker_count: usize, + compiler_recycle_threshold: usize, +} + +#[derive(SchemaWrite, SchemaRead)] +enum HelperRequest { + Init(HelperInit), + Compile(HelperCompile), + Pause { id: u64 }, + Resume, +} + +#[derive(SchemaWrite, SchemaRead)] +struct HelperCompile { + id: u64, + code_hash: [u8; 32], + spec_id: u8, + opt_level: u8, + symbol_name: String, + bytecode: Vec, +} + +#[derive(SchemaWrite, SchemaRead)] +enum HelperResponse { + Ok { + id: u64, + symbol_name: String, + object_bytes: Vec, + builtin_symbols: Vec, + timings: HelperTimings, + }, + Err { + id: u64, + error: String, + timings: HelperTimings, + }, + Paused { + id: u64, + }, +} + +#[derive(Clone, Copy, Default, SchemaWrite, SchemaRead)] +struct HelperTimings { + parse: u64, + translate: u64, + optimize: u64, + codegen: u64, +} + +fn write_init(w: &mut BufWriter, init: &HelperInit) -> std::io::Result<()> { + write_message(w, &HelperRequest::Init(init.clone())) +} + +fn write_job( + w: &mut BufWriter, + id: u64, + job: &CompileJob, +) -> std::io::Result<()> { + let req = HelperRequest::Compile(HelperCompile { + id, + code_hash: job.key.code_hash.0, + spec_id: job.key.spec_id as u8, + opt_level: opt_level_to_u8(job.opt_level), + symbol_name: job.symbol_name.clone(), + bytecode: job.bytecode.to_vec(), + }); + write_message(w, &req) +} + +fn write_pause(w: &mut BufWriter, id: u64) -> std::io::Result<()> { + write_message(w, &HelperRequest::Pause { id }) +} + +fn write_resume(w: &mut BufWriter) -> std::io::Result<()> { + write_message(w, &HelperRequest::Resume) +} + +enum HelperResponseMessage { + Job(u64, HelperJobResult), + Paused(u64), +} + +fn read_helper_response( + r: &mut BufReader, +) -> Result { + match read_message(r).map_err(|e| format!("failed to decode helper result: {e}"))? { + HelperResponse::Ok { id, symbol_name, object_bytes, builtin_symbols, timings } => { + Ok(HelperResponseMessage::Job( + id, + HelperJobResult { + outcome: Ok(WorkerSuccess::JitObject(JitObjectSuccess { + symbol_name, + object_bytes: Bytes::from(object_bytes), + builtin_symbols, + })), + timings: timings.into(), + }, + )) + } + HelperResponse::Err { id, error, timings } => Ok(HelperResponseMessage::Job( + id, + HelperJobResult { outcome: Err(error), timings: timings.into() }, + )), + HelperResponse::Paused { id } => Ok(HelperResponseMessage::Paused(id)), + } +} + +impl From for HelperTimings { + fn from(timings: CompileTimings) -> Self { + Self { + parse: duration_to_nanos(timings.parse), + translate: duration_to_nanos(timings.translate), + optimize: duration_to_nanos(timings.optimize), + codegen: duration_to_nanos(timings.codegen), + } + } +} + +impl From for CompileTimings { + fn from(timings: HelperTimings) -> Self { + Self { + parse: Duration::from_nanos(timings.parse), + translate: Duration::from_nanos(timings.translate), + optimize: Duration::from_nanos(timings.optimize), + codegen: Duration::from_nanos(timings.codegen), + } + } +} + +fn duration_to_nanos(duration: Duration) -> u64 { + u64::try_from(duration.as_nanos()).unwrap_or(u64::MAX) +} + +thread_local! { + static HELPER_COMPILER: RefCell> = const { RefCell::new(None) }; +} + +#[derive(Default)] +struct HelperPauseState { + paused: Mutex, + resumed: Condvar, +} + +impl HelperPauseState { + fn pause(&self) { + *self.paused.lock().unwrap() = true; + } + + fn resume(&self) { + *self.paused.lock().unwrap() = false; + self.resumed.notify_all(); + } + + fn wait_resumed(&self) { + let mut paused = self.paused.lock().unwrap(); + while *paused { + paused = self.resumed.wait(paused).unwrap(); + } + } +} + +fn run_jit_helper_stdio() -> eyre::Result<()> { + let mut stdin = BufReader::new(std::io::stdin().lock()); + let config = Arc::new(read_helper_init(&mut stdin)?); + let worker_count = config.tuning.jit_worker_count.max(1); + let pool = ThreadPoolBuilder::new() + .num_threads(worker_count) + .thread_name(|i| format!("revmc-helper-{i:02}")) + .exit_handler(|_| { + HELPER_COMPILER.with_borrow_mut(Option::take); + }) + .build()?; + let stdout = Arc::new(Mutex::new(BufWriter::new(std::io::stdout()))); + let pause_state = Arc::new(HelperPauseState::default()); + + loop { + let request = match read_helper_request(&mut stdin) { + Ok(request) => request, + Err(err) if is_unexpected_eof(&err) => break, + Err(err) => return Err(err), + }; + match request { + HelperWork::Compile { id, job } => { + let config = Arc::clone(&config); + let stdout = Arc::clone(&stdout); + let pause_state = Arc::clone(&pause_state); + pool.spawn_fifo(move || { + let result = HELPER_COMPILER.with_borrow_mut(|compiler| { + compile_with_state(job, &config, CompilerTarget::JitObject, compiler) + }); + pause_state.wait_resumed(); + let mut stdout = stdout.lock().unwrap(); + if let Err(err) = write_helper_result(&mut stdout, id, result) + .and_then(|()| stdout.flush().map_err(Into::into)) + { + error!(%err, "failed to write helper result"); + std::process::exit(1); + } + }); + } + HelperWork::Pause { id } => { + pause_state.pause(); + let mut stdout = stdout.lock().unwrap(); + write_pause_ack(&mut stdout, id)?; + stdout.flush()?; + } + HelperWork::Resume => { + pause_state.resume(); + } + } + } + + Ok(()) +} + +fn read_helper_init(stdin: &mut BufReader) -> eyre::Result { + match read_message(stdin)? { + HelperRequest::Init(init) => runtime_config_from_init(init), + HelperRequest::Compile(_) | HelperRequest::Pause { .. } | HelperRequest::Resume => { + eyre::bail!("JIT helper received request before init") + } + } +} + +enum HelperWork { + Compile { id: u64, job: CompileJob }, + Pause { id: u64 }, + Resume, +} + +fn read_helper_request(stdin: &mut BufReader) -> eyre::Result { + let req = match read_message(stdin)? { + HelperRequest::Compile(req) => req, + HelperRequest::Pause { id } => return Ok(HelperWork::Pause { id }), + HelperRequest::Resume => return Ok(HelperWork::Resume), + HelperRequest::Init(_) => eyre::bail!("JIT helper received duplicate init"), + }; + let spec_id = SpecId::try_from_u8(req.spec_id).ok_or_else(|| eyre::eyre!("invalid spec id"))?; + let opt_level = opt_level_from_u8(req.opt_level)?; + + let job = CompileJob { + kind: CompilationKind::Jit, + key: RuntimeCacheKey { code_hash: B256::from(req.code_hash), spec_id }, + bytecode: Bytes::from(req.bytecode), + symbol_name: req.symbol_name, + opt_level, + sync_notifier: SyncNotifier::none(), + generation: 0, + }; + Ok(HelperWork::Compile { id: req.id, job }) +} + +fn write_helper_result( + stdout: &mut BufWriter, + id: u64, + result: WorkerResult, +) -> eyre::Result<()> { + let timings = result.timings.into(); + let response = match result.outcome { + Ok(WorkerSuccess::JitObject(success)) => HelperResponse::Ok { + id, + symbol_name: success.symbol_name, + object_bytes: success.object_bytes.to_vec(), + builtin_symbols: success.builtin_symbols, + timings, + }, + Ok(_) => unreachable!(), + Err(error) => HelperResponse::Err { id, error, timings }, + }; + write_message(stdout, &response)?; + Ok(()) +} + +fn write_pause_ack(stdout: &mut BufWriter, id: u64) -> eyre::Result<()> { + write_message(stdout, &HelperResponse::Paused { id })?; + Ok(()) +} + +fn write_message(w: &mut BufWriter, message: &T) -> std::io::Result<()> +where + T: wincode::SchemaWrite, +{ + wincode::serialize_into(w, message) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e)) +} + +fn read_message(r: &mut BufReader) -> std::io::Result +where + T: wincode::SchemaReadOwned, +{ + wincode::deserialize_from(r).map_err(|e| std::io::Error::new(read_error_kind(&e), e)) +} + +fn read_error_kind(err: &wincode::ReadError) -> std::io::ErrorKind { + match err { + wincode::ReadError::Io(wincode::io::ReadError::ReadSizeLimit(_)) => { + std::io::ErrorKind::UnexpectedEof + } + _ => std::io::ErrorKind::InvalidData, + } +} + +fn is_unexpected_eof(err: &eyre::Report) -> bool { + err.downcast_ref::() + .is_some_and(|err| err.kind() == std::io::ErrorKind::UnexpectedEof) +} + +fn opt_level_to_u8(level: OptimizationLevel) -> u8 { + match level { + OptimizationLevel::None => 0, + OptimizationLevel::Less => 1, + OptimizationLevel::Default => 2, + OptimizationLevel::Aggressive => 3, + } +} + +fn opt_level_from_u8(level: u8) -> eyre::Result { + Ok(match level { + 0 => OptimizationLevel::None, + 1 => OptimizationLevel::Less, + 2 => OptimizationLevel::Default, + 3 => OptimizationLevel::Aggressive, + _ => eyre::bail!("invalid optimization level"), + }) +} + +fn gas_params_to_pairs(gas_params: &GasParams) -> GasParamPairs { + (0..GAS_PARAM_COUNT) + .map(|i| { + let id = i as u8; + (id, gas_params.get(GasId::new(id))) + }) + .collect() +} + +fn gas_params_from_pairs(pairs: GasParamPairs) -> eyre::Result { + if pairs.len() != GAS_PARAM_COUNT { + eyre::bail!("invalid gas params length: {}", pairs.len()); + } + + let mut table = [0; GAS_PARAM_COUNT]; + let mut seen = [false; GAS_PARAM_COUNT]; + for (id, value) in pairs { + let index = usize::from(id); + if seen[index] { + eyre::bail!("duplicate gas param id: {id}"); + } + seen[index] = true; + table[index] = value; + } + if let Some((id, _)) = seen.iter().enumerate().find(|(_, seen)| !**seen) { + eyre::bail!("missing gas param id: {id}"); + } + + Ok(GasParams::new(Arc::new(table))) +} + +fn helper_init(config: &RuntimeConfig) -> HelperInit { + HelperInit { + debug_assertions: config.debug_assertions, + no_dedup: config.no_dedup, + no_dse: config.no_dse, + dump_dir: config.dump_dir.as_ref().map(|path| path.to_string_lossy().into_owned()), + gas_params: config.gas_params.as_ref().map(gas_params_to_pairs), + jit_worker_count: config.tuning.jit_worker_count, + compiler_recycle_threshold: config.tuning.compiler_recycle_threshold, + } +} + +fn runtime_config_from_init(init: HelperInit) -> eyre::Result { + let mut config = RuntimeConfig { + dump_dir: init.dump_dir.map(PathBuf::from), + debug_assertions: init.debug_assertions, + no_dedup: init.no_dedup, + no_dse: init.no_dse, + gas_params: init.gas_params.map(gas_params_from_pairs).transpose()?, + ..Default::default() + }; + config.tuning.jit_worker_count = init.jit_worker_count; + config.tuning.compiler_recycle_threshold = init.compiler_recycle_threshold; + Ok(config) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn helper_pause_state_blocks_until_resume() { + let pause_state = Arc::new(HelperPauseState::default()); + pause_state.pause(); + + let (tx, rx) = chan::bounded(1); + let thread_pause_state = Arc::clone(&pause_state); + let thread = std::thread::spawn(move || { + thread_pause_state.wait_resumed(); + tx.send(()).unwrap(); + }); + + assert!(rx.recv_timeout(Duration::from_millis(50)).is_err()); + pause_state.resume(); + rx.recv_timeout(Duration::from_secs(1)).unwrap(); + thread.join().unwrap(); + } + + #[test] + fn helper_pause_protocol_roundtrips() { + let mut request = Vec::new(); + { + let mut writer = BufWriter::new(&mut request); + write_pause(&mut writer, 42).unwrap(); + writer.flush().unwrap(); + } + let mut reader = BufReader::new(request.as_slice()); + match read_helper_request(&mut reader).unwrap() { + HelperWork::Pause { id } => assert_eq!(id, 42), + HelperWork::Compile { .. } | HelperWork::Resume => panic!("unexpected helper request"), + } + + let mut response = Vec::new(); + { + let mut writer = BufWriter::new(&mut response); + write_pause_ack(&mut writer, 42).unwrap(); + writer.flush().unwrap(); + } + let mut reader = BufReader::new(response.as_slice()); + match read_helper_response(&mut reader).unwrap() { + HelperResponseMessage::Paused(id) => assert_eq!(id, 42), + HelperResponseMessage::Job(..) => panic!("unexpected helper response"), + } + } + + #[test] + fn helper_pause_resume_requests_update_stats() { + let stats = Arc::new(RuntimeStats::default()); + let helper = HelperProcess::new(Arc::clone(&stats)); + + helper.pause(); + helper.pause(); + helper.resume(); + + let snapshot = stats.snapshot(crate::runtime::stats::RuntimeStatsGauges::default()); + assert_eq!(snapshot.jit_helper_pause_requests, 2); + assert_eq!(snapshot.jit_helper_pause_acknowledgements, 0); + assert_eq!(snapshot.jit_helper_pause_failures, 0); + assert_eq!(snapshot.jit_helper_pause_timeouts, 0); + assert_eq!(snapshot.jit_helper_resume_requests, 1); + assert_eq!(snapshot.jit_helper_resume_failures, 0); + } +} diff --git a/crates/revmc-runtime/src/runtime/stats.rs b/crates/revmc-runtime/src/runtime/stats.rs index c02a32c71..b8d1a9700 100644 --- a/crates/revmc-runtime/src/runtime/stats.rs +++ b/crates/revmc-runtime/src/runtime/stats.rs @@ -19,6 +19,28 @@ pub(crate) struct RuntimeStats { pub(crate) compilations_succeeded: AtomicU64, /// Total number of failed compilations (JIT + AOT). pub(crate) compilations_failed: AtomicU64, + /// Total number of out-of-process JIT helper processes spawned successfully. + pub(crate) jit_helper_spawns: AtomicU64, + /// Total number of failed out-of-process JIT helper spawn attempts. + pub(crate) jit_helper_spawn_failures: AtomicU64, + /// Total number of out-of-process JIT helpers discarded for restart. + pub(crate) jit_helper_restarts: AtomicU64, + /// Total number of out-of-process JIT helper job timeouts. + pub(crate) jit_helper_timeouts: AtomicU64, + /// Total number of out-of-process JIT helper disconnects. + pub(crate) jit_helper_disconnects: AtomicU64, + /// Total number of out-of-process JIT helper pause requests. + pub(crate) jit_helper_pause_requests: AtomicU64, + /// Total number of graceful out-of-process JIT helper pause acknowledgements. + pub(crate) jit_helper_pause_acknowledgements: AtomicU64, + /// Total number of graceful out-of-process JIT helper pause failures. + pub(crate) jit_helper_pause_failures: AtomicU64, + /// Total number of graceful out-of-process JIT helper pause acknowledgement timeouts. + pub(crate) jit_helper_pause_timeouts: AtomicU64, + /// Total number of out-of-process JIT helper resume requests. + pub(crate) jit_helper_resume_requests: AtomicU64, + /// Total number of out-of-process JIT helper resume request failures. + pub(crate) jit_helper_resume_failures: AtomicU64, } /// Gauge values sampled at snapshot time. @@ -64,6 +86,28 @@ pub struct RuntimeStatsSnapshot { pub compilations_succeeded: u64, /// Total number of failed compilations (JIT + AOT). pub compilations_failed: u64, + /// Total number of out-of-process JIT helper processes spawned successfully. + pub jit_helper_spawns: u64, + /// Total number of failed out-of-process JIT helper spawn attempts. + pub jit_helper_spawn_failures: u64, + /// Total number of out-of-process JIT helpers discarded for restart. + pub jit_helper_restarts: u64, + /// Total number of out-of-process JIT helper job timeouts. + pub jit_helper_timeouts: u64, + /// Total number of out-of-process JIT helper disconnects. + pub jit_helper_disconnects: u64, + /// Total number of out-of-process JIT helper pause requests. + pub jit_helper_pause_requests: u64, + /// Total number of graceful out-of-process JIT helper pause acknowledgements. + pub jit_helper_pause_acknowledgements: u64, + /// Total number of graceful out-of-process JIT helper pause failures. + pub jit_helper_pause_failures: u64, + /// Total number of graceful out-of-process JIT helper pause acknowledgement timeouts. + pub jit_helper_pause_timeouts: u64, + /// Total number of out-of-process JIT helper resume requests. + pub jit_helper_resume_requests: u64, + /// Total number of out-of-process JIT helper resume request failures. + pub jit_helper_resume_failures: u64, } impl RuntimeStatsSnapshot { @@ -101,6 +145,19 @@ impl RuntimeStats { compilations_dispatched: dispatched, compilations_succeeded: succeeded, compilations_failed: failed, + jit_helper_spawns: self.jit_helper_spawns.load(Ordering::Relaxed), + jit_helper_spawn_failures: self.jit_helper_spawn_failures.load(Ordering::Relaxed), + jit_helper_restarts: self.jit_helper_restarts.load(Ordering::Relaxed), + jit_helper_timeouts: self.jit_helper_timeouts.load(Ordering::Relaxed), + jit_helper_disconnects: self.jit_helper_disconnects.load(Ordering::Relaxed), + jit_helper_pause_requests: self.jit_helper_pause_requests.load(Ordering::Relaxed), + jit_helper_pause_acknowledgements: self + .jit_helper_pause_acknowledgements + .load(Ordering::Relaxed), + jit_helper_pause_failures: self.jit_helper_pause_failures.load(Ordering::Relaxed), + jit_helper_pause_timeouts: self.jit_helper_pause_timeouts.load(Ordering::Relaxed), + jit_helper_resume_requests: self.jit_helper_resume_requests.load(Ordering::Relaxed), + jit_helper_resume_failures: self.jit_helper_resume_failures.load(Ordering::Relaxed), } } } diff --git a/crates/revmc-runtime/src/runtime/tests.rs b/crates/revmc-runtime/src/runtime/tests.rs index fe36af84e..be20e47ae 100644 --- a/crates/revmc-runtime/src/runtime/tests.rs +++ b/crates/revmc-runtime/src/runtime/tests.rs @@ -283,6 +283,32 @@ fn set_enabled_toggle() { assert!(matches!(tb.lookup(req), LookupDecision::Interpret(InterpretReason::Disabled))); } +#[test] +fn pause_processes_lookup_events() { + let tb = TestBackend::with_tuning(RuntimeTuning { jit_worker_count: 0, ..Default::default() }); + let req = TestBackend::req_cancun(&[0x00]); + + assert!(!tb.is_paused()); + tb.pause(); + tb.pause(); + assert!(tb.is_paused()); + assert!(matches!(tb.lookup(req.clone()), LookupDecision::Interpret(InterpretReason::NotReady))); + + let stats = tb.wait_stats(|s| s.lookup_misses == 1); + assert_eq!(stats.lookup_misses, 1); + assert_eq!(stats.lookup_hits, 0); + + tb.resume(); + assert!(tb.is_paused()); + tb.resume(); + assert!(!tb.is_paused()); + assert!(matches!(tb.lookup(req), LookupDecision::Interpret(InterpretReason::NotReady))); + + let stats = tb.wait_stats(|s| s.lookup_misses == 2); + assert_eq!(stats.lookup_misses, 2); + assert_eq!(stats.lookup_hits, 0); +} + #[test] fn lookup_increments_miss_counter() { let tb = TestBackend::new(RuntimeConfig { enabled: true, ..Default::default() }); diff --git a/crates/revmc-runtime/src/runtime/worker.rs b/crates/revmc-runtime/src/runtime/worker.rs index 727c219e2..6fe661874 100644 --- a/crates/revmc-runtime/src/runtime/worker.rs +++ b/crates/revmc-runtime/src/runtime/worker.rs @@ -13,6 +13,7 @@ use crate::{ CompileTimings, EvmCompilerFn, OptimizationLevel, runtime::{ config::{CompilationKind, RuntimeConfig}, + stats::RuntimeStats, storage::RuntimeCacheKey, }, }; @@ -33,8 +34,14 @@ use std::{ use crate::{ EvmCompiler, EvmLlvmBackend, Linker, llvm::{JitDylibGuard, orc::ResourceTracker}, + runtime::config::JitMode, }; +#[cfg(all(feature = "llvm", unix))] +type OutOfProcessHelper = Option>; +#[cfg(not(all(feature = "llvm", unix)))] +type OutOfProcessHelper = (); + /// Notifier for synchronous compilation requests. /// /// Wraps an optional sender that is notified when the compilation @@ -102,6 +109,8 @@ pub(crate) enum WorkerSuccess { Jit(JitSuccess), /// AOT compilation produced shared-library bytes. Aot(AotSuccess), + /// JIT compilation produced relocatable object bytes to link in the parent. + JitObject(JitObjectSuccess), } /// Successful JIT compilation output. @@ -114,6 +123,15 @@ pub(crate) struct JitSuccess { } /// Successful AOT compilation output. +pub(crate) struct JitObjectSuccess { + /// The symbol name in the object file. + pub(crate) symbol_name: String, + /// The raw relocatable object bytes. + pub(crate) object_bytes: Bytes, + /// Builtin absolute symbols referenced by the object. + pub(crate) builtin_symbols: Vec, +} + pub(crate) struct AotSuccess { /// The symbol name in the shared library. pub(crate) symbol_name: String, @@ -167,6 +185,8 @@ impl Drop for JitCodeBacking { pub(crate) struct WorkerPool { /// Rayon pool used to execute compilation jobs. pool: Option, + /// Out-of-process helper used by this worker pool. + out_of_process_helper: OutOfProcessHelper, /// Sender for worker results. result_tx: chan::Sender, /// Runtime configuration shared by spawned jobs. @@ -181,9 +201,14 @@ pub(crate) struct WorkerPool { impl WorkerPool { /// Creates and starts the worker pool. - pub(crate) fn new(result_tx: chan::Sender, config: RuntimeConfig) -> Self { + pub(crate) fn new( + result_tx: chan::Sender, + config: RuntimeConfig, + stats: Arc, + ) -> Self { let worker_count = config.tuning.jit_worker_count; let queue_capacity = worker_count.saturating_mul(config.tuning.jit_worker_queue_capacity); + let out_of_process_helper = create_out_of_process_helper(&config, stats); let pool = (worker_count > 0).then(|| { ThreadPoolBuilder::new() .num_threads(worker_count) @@ -195,6 +220,7 @@ impl WorkerPool { Self { pool, + out_of_process_helper, result_tx, config: Arc::new(config), queued: Arc::new(AtomicUsize::new(0)), @@ -230,9 +256,10 @@ impl WorkerPool { let queued = self.queued.clone(); let result_tx = self.result_tx.clone(); let config = self.config.clone(); + let out_of_process_helper = self.out_of_process_helper.clone(); pool.spawn_fifo(move || { queued.fetch_sub(1, Ordering::AcqRel); - let result = compile_job(job, &config); + let result = compile_job(job, &config, out_of_process_helper); let _ = result_tx.send(result); }); Ok(()) @@ -241,11 +268,27 @@ impl WorkerPool { /// Shuts down all workers after draining queued jobs. pub(crate) fn shutdown(&mut self) { self.shutdown.store(true, Ordering::Release); + self.cancel_in_flight(); if let Some(pool) = &self.pool { pool.broadcast(|_| clear_thread_local_compilers()); } self.pool.take(); } + + /// Cancels any in-flight compilation that can be interrupted externally. + pub(crate) fn cancel_in_flight(&self) { + cancel_out_of_process_helper(&self.out_of_process_helper); + } + + /// Pauses out-of-process helper execution. + pub(crate) fn pause(&self) { + pause_out_of_process_helper(&self.out_of_process_helper); + } + + /// Resumes out-of-process helper execution. + pub(crate) fn resume(&self) { + resume_out_of_process_helper(&self.out_of_process_helper); + } } impl Drop for WorkerPool { @@ -254,6 +297,52 @@ impl Drop for WorkerPool { } } +#[cfg(all(feature = "llvm", unix))] +fn create_out_of_process_helper( + config: &RuntimeConfig, + stats: Arc, +) -> OutOfProcessHelper { + (config.jit_mode == JitMode::OutOfProcess) + .then(|| Arc::new(super::out_of_process::HelperProcess::new(stats))) +} + +#[cfg(not(all(feature = "llvm", unix)))] +fn create_out_of_process_helper( + _config: &RuntimeConfig, + _stats: Arc, +) -> OutOfProcessHelper { +} + +#[cfg(all(feature = "llvm", unix))] +fn cancel_out_of_process_helper(helper: &OutOfProcessHelper) { + if let Some(helper) = helper { + helper.cancel_in_flight(); + } +} + +#[cfg(not(all(feature = "llvm", unix)))] +fn cancel_out_of_process_helper(_helper: &OutOfProcessHelper) {} + +#[cfg(all(feature = "llvm", unix))] +fn pause_out_of_process_helper(helper: &OutOfProcessHelper) { + if let Some(helper) = helper { + helper.pause(); + } +} + +#[cfg(not(all(feature = "llvm", unix)))] +fn pause_out_of_process_helper(_helper: &OutOfProcessHelper) {} + +#[cfg(all(feature = "llvm", unix))] +fn resume_out_of_process_helper(helper: &OutOfProcessHelper) { + if let Some(helper) = helper { + helper.resume(); + } +} + +#[cfg(not(all(feature = "llvm", unix)))] +fn resume_out_of_process_helper(_helper: &OutOfProcessHelper) {} + #[cfg(feature = "llvm")] fn clear_thread_local_compilers() { JIT_COMPILER.with_borrow_mut(Option::take); @@ -264,22 +353,70 @@ fn clear_thread_local_compilers() { fn clear_thread_local_compilers() {} #[cfg(feature = "llvm")] -fn compile_job(job: CompileJob, config: &RuntimeConfig) -> WorkerResult { +fn compile_job( + job: CompileJob, + config: &RuntimeConfig, + out_of_process_helper: OutOfProcessHelper, +) -> WorkerResult { trace!(?job, "received job"); match job.kind { - CompilationKind::Jit => { - JIT_COMPILER.with_borrow_mut(|state| compile_with_state(job, config, state)) - } - CompilationKind::Aot => { - AOT_COMPILER.with_borrow_mut(|state| compile_with_state(job, config, state)) + CompilationKind::Jit if config.jit_mode == JitMode::OutOfProcess => { + compile_out_of_process_job(job, config, out_of_process_helper) } + CompilationKind::Jit => JIT_COMPILER + .with_borrow_mut(|state| compile_with_state(job, config, CompilerTarget::Jit, state)), + CompilationKind::Aot => AOT_COMPILER + .with_borrow_mut(|state| compile_with_state(job, config, CompilerTarget::Aot, state)), + } +} + +#[cfg(all(feature = "llvm", unix))] +fn compile_out_of_process_job( + job: CompileJob, + config: &RuntimeConfig, + helper: OutOfProcessHelper, +) -> WorkerResult { + let helper = helper.expect("missing out-of-process JIT helper"); + super::out_of_process::compile_job(job, config, &helper) +} + +#[cfg(all(feature = "llvm", not(unix)))] +fn compile_out_of_process_job( + job: CompileJob, + _config: &RuntimeConfig, + _helper: OutOfProcessHelper, +) -> WorkerResult { + WorkerResult { + key: job.key, + outcome: Err("out-of-process JIT is only available on Unix".into()), + kind: job.kind, + sync_notifier: job.sync_notifier, + generation: job.generation, + compile_duration: Duration::ZERO, + timings: CompileTimings::default(), } } #[cfg(feature = "llvm")] -fn compile_with_state( +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(super) enum CompilerTarget { + Jit, + JitObject, + Aot, +} + +#[cfg(feature = "llvm")] +impl CompilerTarget { + const fn is_aot_backend(self) -> bool { + matches!(self, Self::JitObject | Self::Aot) + } +} + +#[cfg(feature = "llvm")] +pub(super) fn compile_with_state( job: CompileJob, config: &RuntimeConfig, + target: CompilerTarget, state_slot: &mut Option, ) -> WorkerResult { let _span = match job.kind { @@ -293,7 +430,7 @@ fn compile_with_state( let t0 = Instant::now(); if state_slot.is_none() { - match CompilerState::new(config, job.kind) { + match CompilerState::new(config, target) { Ok(s) => *state_slot = Some(s), Err(e) => { error!(error = %e, "failed to create LLVM backend"); @@ -322,9 +459,10 @@ fn compile_with_state( } compiler.set_opt_level(job.opt_level); - let outcome = match job.kind { - CompilationKind::Jit => compile_jit_artifact(&job, compiler), - CompilationKind::Aot => compile_aot_artifact(&job, compiler), + let outcome = match target { + CompilerTarget::Jit => compile_jit_artifact(&job, compiler), + CompilerTarget::JitObject => compile_jit_object_artifact(&job, compiler), + CompilerTarget::Aot => compile_aot_artifact(&job, compiler), }; let timings = compiler.take_timings(); @@ -337,7 +475,7 @@ fn compile_with_state( && state.compilations_since_recycle >= config.tuning.compiler_recycle_threshold { debug!(compilations_since_recycle = state.compilations_since_recycle, "recycling compiler"); - match CompilerState::new(config, job.kind) { + match CompilerState::new(config, target) { Ok(new_state) => { *state_slot = Some(new_state); revmc_llvm::global_gc(); @@ -361,16 +499,16 @@ fn compile_with_state( } #[cfg(feature = "llvm")] -struct CompilerState { +pub(super) struct CompilerState { compiler: EvmCompiler, compilations_since_recycle: usize, } #[cfg(feature = "llvm")] impl CompilerState { - fn new(config: &RuntimeConfig, kind: CompilationKind) -> Result { + fn new(config: &RuntimeConfig, target: CompilerTarget) -> Result { Ok(Self { - compiler: create_compiler(config, kind == CompilationKind::Aot)?, + compiler: create_compiler(config, target.is_aot_backend())?, compilations_since_recycle: 0, }) } @@ -383,7 +521,7 @@ thread_local! { } #[cfg(feature = "llvm")] -fn create_compiler( +pub(super) fn create_compiler( config: &RuntimeConfig, aot: bool, ) -> Result, String> { @@ -428,6 +566,43 @@ fn compile_jit_artifact( } /// Compiles a single bytecode to a shared library and returns the raw bytes. +#[cfg(feature = "llvm")] +pub(super) fn compile_jit_object_artifact( + job: &CompileJob, + compiler: &mut EvmCompiler, +) -> Result { + compiler + .translate(&job.symbol_name, &job.bytecode[..], job.key.spec_id) + .map_err(|e| format!("JIT object translate failed: {e}"))?; + + let mut object_bytes = Vec::new(); + compiler + .write_object(&mut object_bytes) + .map_err(|e| format!("JIT object write failed: {e}"))?; + let mut builtin_symbols: Vec = compiler + .backend() + .pending_symbol_names() + .into_iter() + .map(|name| name.to_string_lossy().into_owned()) + .collect(); + if builtin_symbols.is_empty() { + builtin_symbols = + revmc_builtins::Builtin::ALL.iter().map(|builtin| builtin.name().to_string()).collect(); + } + + debug!( + bytecode_len = job.bytecode.len(), + object_len = object_bytes.len(), + "JIT object compilation succeeded", + ); + + Ok(WorkerSuccess::JitObject(JitObjectSuccess { + symbol_name: job.symbol_name.clone(), + object_bytes: Bytes::from(object_bytes), + builtin_symbols, + })) +} + #[cfg(feature = "llvm")] fn compile_aot_artifact( job: &CompileJob, @@ -470,7 +645,11 @@ fn compile_aot_artifact( } #[cfg(not(feature = "llvm"))] -fn compile_job(job: CompileJob, _config: &RuntimeConfig) -> WorkerResult { +fn compile_job( + job: CompileJob, + _config: &RuntimeConfig, + _helper: OutOfProcessHelper, +) -> WorkerResult { WorkerResult { key: job.key, outcome: Err("LLVM backend not available".into()), diff --git a/docs/out-of-process-jit.md b/docs/out-of-process-jit.md new file mode 100644 index 000000000..d9a88e912 --- /dev/null +++ b/docs/out-of-process-jit.md @@ -0,0 +1,57 @@ +# Out-of-process JIT exploration + +`RuntimeConfig::jit_mode` now has an `OutOfProcess` variant, disabled by default. This document records the current prototype and the remaining work needed to make it production-ready. + +## Recommended architecture + +Keep execution and ORC linking in the parent process. Move only translation, optimization, and object emission to a helper process that owns the background worker pool. + +The helper process receives `CompileJob`s over IPC and returns relocatable object bytes plus timings/errors. The parent process then adds the object to its existing ORC `LLJIT`, resolves the symbol to a local `EvmCompilerFn`, and owns the `ResourceTracker`/`JitDylibGuard` exactly as it does today. This preserves the current runtime API and keeps compiled function pointers valid in the caller process. + +Using LLVM ORC's remote executor APIs (`ExecutorProcessControl`, `SimpleRemoteEPC`, etc.) would put the executable code in the child process. That is not transparent for the current `EvmCompilerFn` API, because callers need a local function pointer and direct calls into `revm` state. + +## LLVM ORC support needed + +- Add C/Rust bindings to add an already-compiled object buffer to a specific `JITDylib` with a `ResourceTracker`: + - `LLJIT::add_object_file_with_rt` via `LLVMOrcLLJITAddObjectFileWithRT` if available, or a small C++ wrapper around `ObjectLayer::add` / `object::ObjectFile` materialization. + - Continue to perform `lookup_in(jd, symbol)` in the parent after linking. +- Keep builtin absolute symbols, process symbol generators, perf/debug plugins, and memory accounting in the parent ORC instance. The child only emits relocatable objects with unresolved external symbols. +- Preserve per-entry eviction by creating the `ResourceTracker` in the parent before adding the object and returning it through `JitCodeBacking`. +- Replace the current object capture TLS path for out-of-process jobs: object bytes are already returned by the helper, so disassembly/debug dumping should consume those bytes directly. + +## Runtime/IPC work + +Current prototype: + +- `RuntimeConfig::jit_mode = JitMode::OutOfProcess` makes the runtime keep a global persistent helper process spawned via `RuntimeConfig::jit_helper_path`, or `std::env::current_exe()` when unset. +- `RuntimeConfig::default()` uses plain in-process defaults. `JitBackend::new` applies `RuntimeConfig::with_env_overrides()`, which recognizes `REVMC_JIT_MODE=out-of-process` and `REVMC_JIT_MODE=in-process`; other spellings are rejected. `REVMC_JIT_HELPER_PATH` overrides the helper executable path. Test harnesses should point this at a binary that calls `revmc::runtime::maybe_run_jit_helper()` at startup, such as `target/debug/revmc`. +- `REVMC_JIT_HELPER_MEMORY_LIMIT_BYTES` applies a Unix `RLIMIT_AS` limit to helper processes before `exec`. +- `REVMC_JIT_HELPER_CPU_COUNT` limits helper CPU affinity on Linux before `exec`. +- Helper binaries must call `revmc::runtime::maybe_run_jit_helper()` at process startup. `revmc-cli` does this already. +- Workers send length-prefixed wincode-serialized JIT object requests to the helper over stdin and receive length-prefixed wincode-serialized responses from stdout. +- The parent links returned object bytes into its local ORC instance, resolves the symbol, and constructs `JitCodeBacking` with a parent-owned `ResourceTracker`. +- `RuntimeTuning::jit_timeout` bounds each helper compilation; timed-out helpers are killed and replaced on the next job. +- Clearing resident code or shutting down the runtime kills the helper process so in-flight out-of-process compiles can be interrupted instead of waiting for LLVM to finish. + +## Fork-only helper startup + +Using `fork()` without `exec` is not safe with the current lazy helper model. The helper is spawned from a runtime worker after the backend has started threads, and a child forked from a multithreaded process can only safely run async-signal-safe operations until `exec`. The helper would immediately need to run normal Rust code, allocate, use locks, deserialize IPC messages, and initialize/run LLVM, so it does not fit that rule. + +Avoiding LLVM translation in the parent is not sufficient. The parent still uses LLVM ORC to link returned object files, and the Rust runtime, allocator, tracing, channels, and other libraries can have process-global locks or thread-local state before the helper is spawned. + +A fork-only helper may be viable only as an early fork server: fork during single-threaded startup before any LLVM initialization or backend worker creation, then let the child own all helper-side Rust/LLVM state. That would require an explicit startup path instead of the current lazy spawn. + +Still needed: + +- Move the worker pool into a single helper process; the parent should only enqueue IPC requests. +- Carry the remaining data in the IPC payloads: generation, timings, and richer errors. +- Keep AOT jobs either in the helper too or explicitly route them through the existing in-process AOT path; the first option gives consistent isolation. +- Define graceful shutdown semantics for queued helper jobs; the current implementation kills the helper for cancellation/shutdown. +- Treat helper crash as worker-pool failure: fail pending synchronous jobs, drop pending async jobs, and optionally respawn. + +## Open questions + +- Stability requirements for the private IPC protocol. +- Whether debug dumps should be written by the child, the parent, or both. +- Whether compiler recycling is still needed per helper worker once the whole helper can be restarted. +- How to expose helper process configuration such as executable path, environment, and restart policy without making the default API noisy.