diff --git a/.DS_Store b/.DS_Store index 51010bf..3916e7a 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/.github/workflows/rust-ci.yml b/.github/workflows/rust-ci.yml new file mode 100644 index 0000000..ffe52a6 --- /dev/null +++ b/.github/workflows/rust-ci.yml @@ -0,0 +1,58 @@ +name: Test & Lint CI + +on: + push: + branches: [ main, develop ] + pull_request: + branches: [ main, develop ] + +env: + CARGO_TERM_COLOR: always + +jobs: + check: + name: Rust CI Checks + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + with: + components: rustfmt, clippy + + - name: Cache cargo registry + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo- + + - name: Check formatting + run: | + cargo fmt --all -- --check + if [ $? -ne 0 ]; then + echo "::error::Code is not formatted. Please run 'cargo fmt' locally." + exit 1 + fi + + - name: Run clippy + run: | + cargo clippy --all --all-targets -- -D warnings + if [ $? -ne 0 ]; then + echo "::error::Clippy found issues. Please fix them locally." + exit 1 + fi + + - name: Run tests + run: | + cargo test --all + if [ $? -ne 0 ]; then + echo "::error::Tests failed. Please fix them locally." + exit 1 + fi \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 263916a..c0ccc7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1430,6 +1430,26 @@ dependencies = [ "serde", ] +[[package]] +name = "bindgen" +version = "0.69.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088" +dependencies = [ + "bitflags 2.9.1", + "cexpr", + "clang-sys", + "itertools 0.10.5", + "lazy_static", + "lazycell", + "proc-macro2", + "quote", + "regex", + "rustc-hash 1.1.0", + "shlex", + "syn 2.0.104", +] + [[package]] name = "bindgen" version = "0.70.1" @@ -1448,6 +1468,24 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "bindgen" +version = "0.71.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f58bf3d7db68cfbac37cfc485a8d711e87e064c3d0fe0435b92f7a407f9d6b3" +dependencies = [ + "bitflags 2.9.1", + "cexpr", + "clang-sys", + "itertools 0.13.0", + "proc-macro2", + "quote", + "regex", + "rustc-hash 2.1.1", + "shlex", + "syn 2.0.104", +] + [[package]] name = "bit-set" version = "0.8.0" @@ -1802,6 +1840,16 @@ dependencies = [ "serde", ] +[[package]] +name = "bzip2-sys" +version = "0.1.13+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "225bff33b2141874fe80d71e07d6eec4f85c5c216453dd96388240f96e1acc14" +dependencies = [ + "cc", + "pkg-config", +] + [[package]] name = "c-kzg" version = "2.1.1" @@ -4467,6 +4515,12 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libc" version = "0.2.174" @@ -4526,7 +4580,7 @@ version = "0.14.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78a09b56be5adbcad5aa1197371688dc6bb249a26da3bca2011ee2fb987ebfb" dependencies = [ - "bindgen", + "bindgen 0.70.1", "errno", "libc", ] @@ -4542,6 +4596,21 @@ dependencies = [ "redox_syscall", ] +[[package]] +name = "librocksdb-sys" +version = "0.17.1+9.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b7869a512ae9982f4d46ba482c2a304f1efd80c6412a3d4bf57bb79a619679f" +dependencies = [ + "bindgen 0.69.5", + "bzip2-sys", + "cc", + "libc", + "libz-sys", + "lz4-sys", + "zstd-sys", +] + [[package]] name = "libsecp256k1" version = "0.7.2" @@ -7275,7 +7344,7 @@ name = "reth-mdbx-sys" version = "1.5.1" source = "git+https://github.com/paradigmxyz/reth?tag=v1.5.1#dbe7ee9c21392f360ff01f6307480f5d7dd73a3a" dependencies = [ - "bindgen", + "bindgen 0.70.1", "cc", ] @@ -8896,6 +8965,16 @@ dependencies = [ "byteorder", ] +[[package]] +name = "rocksdb" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26ec73b20525cb235bad420f911473b69f9fe27cc856c5461bccd7e4af037f43" +dependencies = [ + "libc", + "librocksdb-sys", +] + [[package]] name = "rolling-file" version = "0.2.0" @@ -9315,8 +9394,10 @@ dependencies = [ "eyre", "reth-revm", "reth-tracing", + "rocksdb", "serde", "serde_json", + "serde_yaml", "tokio", "toml", ] @@ -9336,6 +9417,7 @@ dependencies = [ "alloy-sol-types", "chrono", "eyre", + "futures", "futures-util", "once_cell", "rayon", @@ -9353,6 +9435,7 @@ dependencies = [ "reth-revm", "reth-tracing", "reth-transaction-pool", + "rocksdb", "searcher-reth-manager", "serde_json", "tokio", @@ -9562,6 +9645,19 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "serde_yaml" +version = "0.9.34+deprecated" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" +dependencies = [ + "indexmap 2.10.0", + "itoa", + "ryu", + "serde", + "unsafe-libyaml", +] + [[package]] name = "serdect" version = "0.2.0" @@ -10669,6 +10765,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "unsafe-libyaml" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" + [[package]] name = "unsigned-varint" version = "0.8.0" @@ -11820,6 +11922,7 @@ version = "2.0.15+zstd.1.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb81183ddd97d0c74cedf1d50d85c8d08c1b8b68ee863bdee9e706eedba1a237" dependencies = [ + "bindgen 0.71.1", "cc", "pkg-config", ] diff --git a/Cargo.toml b/Cargo.toml index 8c28643..51f8a60 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,3 +85,9 @@ rayon = "1.10.0" reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } chrono = { version = "0.4", default-features = false, features = ["clock"] } once_cell = "1" + +#rocksdb +rocksdb = "0.23.0" + +# todo: replace it with toml +serde_yaml = "0.9.34" \ No newline at end of file diff --git a/bin/searcher-reth/src/macros.rs b/bin/searcher-reth/src/macros.rs new file mode 100644 index 0000000..108f2a8 --- /dev/null +++ b/bin/searcher-reth/src/macros.rs @@ -0,0 +1,21 @@ +#[macro_export] +macro_rules! install_strategy { + ( + $builder:expr, + $config:expr, + $wallet:expr, + $signal_manager:expr, + $exex_id:expr, + $strategy_type:ty + ) => {{ + use searcher_reth_extension::strategy::core::strategy::Strategy; + + let cfg = $config.read().unwrap().get_strategy($exex_id).unwrap(); + let strategy = <$strategy_type>::new(cfg); + let searcher_exex = searcher_reth_extension::exex::SearcherExEx::new( + $wallet.clone(), + $signal_manager.subscribe(), + ); + $builder.install_exex($exex_id, move |ctx| searcher_exex.run(ctx, strategy)) + }}; +} diff --git a/bin/searcher-reth/src/main.rs b/bin/searcher-reth/src/main.rs index 339e787..de23c60 100644 --- a/bin/searcher-reth/src/main.rs +++ b/bin/searcher-reth/src/main.rs @@ -1,3 +1,5 @@ +mod macros; + use std::sync::{Arc, RwLock}; use clap::Parser; @@ -5,14 +7,14 @@ use reth::chainspec::EthereumChainSpecParser; use reth_node_ethereum::EthereumNode; use reth_tracing::tracing::error; use searcher_reth_extension::{ - exex::SearcherExEx, relayer_pool::WalletPool, - strategy::{ - core::strategy::Strategy, path_finding::PathFinder, profit_reporter::init_reporter, - }, + strategy::{liquidator::Liquidator, path_finding::PathFinder, profit_reporter::init_reporter}, util::signal_manager::SignalManager, }; -use searcher_reth_manager::{common::PATH_FINDER_EXEX_ID, manager::ConfigManager}; +use searcher_reth_manager::{ + common::{LIQUIDATOR_EXEX_ID, PATH_FINDER_EXEX_ID}, + manager::ConfigManager, +}; fn main() -> eyre::Result<()> { let config = Arc::new(RwLock::new(ConfigManager::from_file("env.toml")?)); @@ -33,18 +35,24 @@ fn main() -> eyre::Result<()> { std::process::exit(0); }); - // Install Exex for various strategies - let searcher_exex = SearcherExEx::new(wallet, signal_manager.subscribe()); + // Install strategies let mut node_builder = builder.node(EthereumNode::default()); - - // Install PathFinder strategy - node_builder = node_builder.install_exex(PATH_FINDER_EXEX_ID, { - let path_finder_cfg = - config.clone().read().unwrap().get_strategy(PATH_FINDER_EXEX_ID).unwrap(); - let path_finder = PathFinder::new(path_finder_cfg); - move |ctx| searcher_exex.run(ctx, path_finder) - }); - + node_builder = install_strategy!( + node_builder, + config, + wallet, + signal_manager, + PATH_FINDER_EXEX_ID, + PathFinder + ); + node_builder = install_strategy!( + node_builder, + config, + wallet, + signal_manager, + LIQUIDATOR_EXEX_ID, + Liquidator + ); // TODO: Add other strategies here as needed let handle = node_builder.launch().await?; handle.wait_for_node_exit().await diff --git a/crates/extension/Cargo.toml b/crates/extension/Cargo.toml index 2fcbfd9..7742f87 100644 --- a/crates/extension/Cargo.toml +++ b/crates/extension/Cargo.toml @@ -44,5 +44,4 @@ searcher-reth-strategy.workspace = true searcher-reth-manager.workspace = true rayon.workspace = true -reth-transaction-pool.workspace = true - +reth-transaction-pool.workspace = true \ No newline at end of file diff --git a/crates/extension/src/exex.rs b/crates/extension/src/exex.rs index 706b098..95484a1 100644 --- a/crates/extension/src/exex.rs +++ b/crates/extension/src/exex.rs @@ -1,5 +1,6 @@ use std::{future::Future, sync::Arc}; +use crate::relayer_pool::{RelayerMessage, RelayerPool, WalletPool}; use eyre::Result; use futures_util::StreamExt; use reth::network::NetworkInfo; @@ -15,8 +16,6 @@ use searcher_reth_manager::SignalType; use searcher_reth_strategy::core::strategy::Strategy; use tokio::sync::broadcast; -use crate::relayer_pool::{RelayerMessage, RelayerPool, WalletPool}; - pub struct SearcherExEx { pub wallet: Arc, pub signal_rx: broadcast::Receiver, diff --git a/crates/manager/Cargo.toml b/crates/manager/Cargo.toml index f215a3b..878c296 100644 --- a/crates/manager/Cargo.toml +++ b/crates/manager/Cargo.toml @@ -17,3 +17,5 @@ alloy-primitives.workspace = true reth-revm.workspace = true serde_json.workspace = true alloy-serde = "1.0.20" +rocksdb.workspace = true +serde_yaml.workspace = true diff --git a/crates/manager/src/common.rs b/crates/manager/src/common.rs index 7028090..57f58b9 100644 --- a/crates/manager/src/common.rs +++ b/crates/manager/src/common.rs @@ -3,16 +3,23 @@ use reth_revm::state::Bytecode; use reth_tracing::tracing; use serde::{Deserialize, Serialize}; -use crate::{gas::GasConfig, strategy::path_finder::PathFinderConfig, types::CandidateEntry}; +use crate::{ + gas::GasConfig, + strategy::{liquidator::LiquidatorConfig, path_finder::PathFinderConfig}, + types::StrategyCandidates, +}; // Strategy configuration pub const PATH_FINDER_EXEX_ID: &str = "path-finder"; +pub const LIQUIDATOR_EXEX_ID: &str = "liquidator"; + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "kebab-case", tag = "type")] pub enum StrategyConfig { #[serde(rename = "path-finder")] PathFinder(PathFinderConfig), - // #[serde(rename = "liquidation")] Liquidation(LiquidationConfig), + #[serde(rename = "liquidator")] + Liquidator(LiquidatorConfig), // #[serde(rename = "arbitrage")] Arbitrage(ArbitrageConfig), } @@ -23,7 +30,7 @@ pub trait CommonStrategyConfig { fn get_contract(&self) -> Bytecode; fn get_profit_range(&self) -> (U256, U256); fn get_gas_config(&self) -> GasConfig; - fn load_candidates(&self, chain_id: u64) -> Vec; + fn load_candidates(&self, chain_id: u64) -> StrategyCandidates; } pub const ONE_ETHER: u128 = 1_000_000_000_000_000_000; @@ -31,8 +38,8 @@ pub const ONE_ETHER: u128 = 1_000_000_000_000_000_000; impl CommonStrategyConfig for StrategyConfig { fn get_exex_id(&self) -> &'static str { match self { - StrategyConfig::PathFinder(_) => PATH_FINDER_EXEX_ID, - // TODO: Add other strategy configurations + StrategyConfig::PathFinder(_) => "path-finder", + StrategyConfig::Liquidator(_) => "liquidator", } } @@ -42,6 +49,7 @@ impl CommonStrategyConfig for StrategyConfig { U256::from(config.min_liquidity.parse::().unwrap() * ONE_ETHER), U256::from(config.max_liquidity.parse::().unwrap() * ONE_ETHER), ), + StrategyConfig::Liquidator(_) => (U256::ZERO, U256::ZERO), } } @@ -56,6 +64,16 @@ impl CommonStrategyConfig for StrategyConfig { tracing::warn!("Invalid vault address, using default ZERO address"); Address::ZERO } + }, + StrategyConfig::Liquidator(config) => match config.vault.parse() { + Ok(address) => { + tracing::info!("Using vault address: {:?}", address); + address + } + Err(_) => { + tracing::warn!("Invalid vault address, using default ZERO address"); + Address::ZERO + } }, // TODO: Add other strategy configurations } } @@ -81,6 +99,26 @@ impl CommonStrategyConfig for StrategyConfig { Bytecode::default() // Return an empty bytecode on error } } + } + StrategyConfig::Liquidator(config) => { + tracing::info!("Loading strategy contract for Liquidator: {:?}", config.contract); + let bytes = match hex::decode(config.contract.clone()) { + Ok(vec) => alloy_primitives::Bytes::from(vec), + Err(e) => { + tracing::error!("Failed to decode contract hex: {}", e); + return Bytecode::default(); + } + }; + match Bytecode::new_raw_checked(bytes) { + Ok(code) => { + tracing::info!("Loaded strategy contract: {:?}", code); + code + } + Err(e) => { + tracing::error!("Failed to load strategy contract: {}", e); + Bytecode::default() // Return an empty bytecode on error + } + } } // TODO: Add other strategy configurations } } @@ -94,6 +132,14 @@ impl CommonStrategyConfig for StrategyConfig { U256::from((max_profit * (ONE_ETHER as f64)) as u128), U256::from((min_profit * (ONE_ETHER as f64)) as u128), ) + } + StrategyConfig::Liquidator(config) => { + let max_profit = config.max_profit.parse::().unwrap(); + let min_profit = config.min_profit.parse::().unwrap(); + ( + U256::from((max_profit * (ONE_ETHER as f64)) as u128), + U256::from((min_profit * (ONE_ETHER as f64)) as u128), + ) } // TODO: Add other strategy configurations } } @@ -101,10 +147,11 @@ impl CommonStrategyConfig for StrategyConfig { fn get_gas_config(&self) -> GasConfig { match self { StrategyConfig::PathFinder(config) => config.gas_config.clone(), + StrategyConfig::Liquidator(config) => config.gas_config.clone(), } } - fn load_candidates(&self, chain_id: u64) -> Vec { + fn load_candidates(&self, chain_id: u64) -> StrategyCandidates { match self { StrategyConfig::PathFinder(config) => { match config.load_candidates(chain_id) { @@ -114,11 +161,27 @@ impl CommonStrategyConfig for StrategyConfig { candidates.len(), chain_id ); - candidates + StrategyCandidates::Candidates(candidates) } Err(e) => { tracing::error!("Failed to load candidates: {}", e); - vec![] // Return an empty vector on error + StrategyCandidates::Candidates(vec![]) // Return an empty vector on error + } + } + } + StrategyConfig::Liquidator(config) => { + match config.load_processors(chain_id) { + Ok(processors) => { + tracing::info!( + "Loaded {} processors for chain_id: {}", + processors.len(), + chain_id + ); + StrategyCandidates::Processors(processors) + } + Err(e) => { + tracing::error!("Failed to load processors: {}", e); + StrategyCandidates::Processors(vec![]) // Return an empty vector on error } } } diff --git a/crates/manager/src/strategy/liquidator/config.yaml b/crates/manager/src/strategy/liquidator/config.yaml new file mode 100644 index 0000000..e69de29 diff --git a/crates/manager/src/strategy/liquidator/mod.rs b/crates/manager/src/strategy/liquidator/mod.rs new file mode 100644 index 0000000..298a2d3 --- /dev/null +++ b/crates/manager/src/strategy/liquidator/mod.rs @@ -0,0 +1,60 @@ +use crate::{gas::GasConfig, types::ProcessorEntry}; +use eyre::eyre; +use serde::{Deserialize, Serialize}; +use std::{collections::HashMap, fs::File, io::BufReader, path::PathBuf}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub struct LiquidatorConfig { + pub vault: String, + pub contract: String, + pub gas_config: GasConfig, + pub path: PathBuf, + + pub max_liquidity: String, + /* Maximum liquidity to use for path finding ex. 1000 * + * 1ether(1000 USDC) */ + pub min_liquidity: String, + /* Minimum liquidity to use for path finding ex. 100 * + * 1ether(100 USDC) */ + pub max_profit: String, + pub min_profit: String, +} + +// { +// "1": [ // chain_id = 1 +// { +// "table": "aave_execute_borrow", +// "processor": "AaveExecuteBorrow" +// }, +// { +// "table": "another_table", +// "processor": "AnotherProcessor" +// } +// ], +// "42161": [ // chain_id = 42161(Arbitrum) +// { +// "table": "dolomite_borrow", +// "processor": "DolomiteBorrow" +// } +// ] +// } + +impl LiquidatorConfig { + pub fn load_processors(&self, chain_id: u64) -> eyre::Result> { + if !self.path.exists() { + return Err(eyre!("Routes JSON file not found at: {}", self.path.display())); + } + + let file = File::open(&self.path)?; + let processors: HashMap> = + serde_json::from_reader(BufReader::new(file)) + .map_err(|e| eyre!("Failed to parse routes JSON: {}", e))?; + + let chain_processors = processors + .get(&chain_id.to_string()) + .ok_or_else(|| eyre!("No processors found for chain_id: {}", chain_id))?; + + Ok(chain_processors.clone()) + } +} diff --git a/crates/manager/src/strategy/mod.rs b/crates/manager/src/strategy/mod.rs index e028dcf..256a179 100644 --- a/crates/manager/src/strategy/mod.rs +++ b/crates/manager/src/strategy/mod.rs @@ -1 +1,2 @@ +pub mod liquidator; pub mod path_finder; diff --git a/crates/manager/src/types.rs b/crates/manager/src/types.rs index 3a93254..0a2cf1b 100644 --- a/crates/manager/src/types.rs +++ b/crates/manager/src/types.rs @@ -36,6 +36,18 @@ pub struct CandidateEntry { pub initial_token: Address, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ProcessorEntry { + pub table: String, + pub processor: String, +} + +#[derive(Debug, Clone)] +pub enum StrategyCandidates { + Candidates(Vec), + Processors(Vec), +} + pub type CandidateMap = HashMap>; #[cfg(test)] diff --git a/crates/strategy/Cargo.toml b/crates/strategy/Cargo.toml index f8c27e8..b0fd138 100644 --- a/crates/strategy/Cargo.toml +++ b/crates/strategy/Cargo.toml @@ -34,6 +34,7 @@ alloy-rpc-types.workspace = true # async futures-util.workspace = true +futures.workspace = true tokio.workspace = true # misc @@ -44,6 +45,9 @@ reth-transaction-pool.workspace = true #searcher-reth searcher-reth-manager.workspace = true +#rocksdb +rocksdb.workspace = true + serde_json.workspace = true reqwest.workspace = true chrono.workspace = true diff --git a/crates/strategy/src/lib.rs b/crates/strategy/src/lib.rs index 87f78a2..161dd82 100644 --- a/crates/strategy/src/lib.rs +++ b/crates/strategy/src/lib.rs @@ -1,3 +1,4 @@ pub mod core; +pub mod liquidator; pub mod path_finding; pub mod profit_reporter; diff --git a/crates/strategy/src/liquidator/datasets/aave_execute_borrow.rs b/crates/strategy/src/liquidator/datasets/aave_execute_borrow.rs new file mode 100644 index 0000000..10e12c6 --- /dev/null +++ b/crates/strategy/src/liquidator/datasets/aave_execute_borrow.rs @@ -0,0 +1,59 @@ +use crate::liquidator::db_writer::RocksDB; +use alloy_primitives::{Address, address}; +use alloy_sol_types::{SolEvent, sol}; +use eyre::Result; +use reth_primitives::{Block, Receipt, RecoveredBlock}; +use reth_tracing::tracing::debug; + +// Pool contarct address +const ARBITRUM_AAVE_CONTRACT_ADDRESS: Address = + address!("0x794a61358D6845594F94dc1DB02A252b5b4814aD"); + +sol! { + event Borrow( + address indexed reserve, + address user, + address indexed onBehalfOf, + uint256 amount, + uint8 interestRateMode, + uint256 borrowRate, + uint16 indexed referralCode + ); +} + +pub async fn process_aave_execute_borrow( + block_data: &(RecoveredBlock, Vec), + db: &RocksDB, +) -> Result<()> { + let block = &block_data.0; + let receipts = &block_data.1; + + // Iterate through transactions and their receipts + for (tx_idx, (tx, receipt)) in block.body().transactions.iter().zip(receipts.iter()).enumerate() + { + // Process each log in the receipt + for (log_idx, log) in receipt.logs.iter().enumerate() { + // Filter on dolomite contract address + if log.address == ARBITRUM_AAVE_CONTRACT_ADDRESS { + // First check if this log matches our event signature + if log.topics().get(0) != Some(&Borrow::SIGNATURE_HASH) { + continue; + } + + match Borrow::decode_raw_log(log.topics(), &log.data.data) { + Ok(create) => { + db.save( + "aave_borrow", + &format!("{}", create.onBehalfOf), + &format!("{}_{}", create.reserve, create.amount), + ); + } + Err(e) => { + debug!("Failed to decode aave borrow event: {:?}", e); + } + } + } + } + } + Ok(()) +} diff --git a/crates/strategy/src/liquidator/datasets/dolomite_borrow_position.rs b/crates/strategy/src/liquidator/datasets/dolomite_borrow_position.rs new file mode 100644 index 0000000..7094fe1 --- /dev/null +++ b/crates/strategy/src/liquidator/datasets/dolomite_borrow_position.rs @@ -0,0 +1,54 @@ +use crate::liquidator::db_writer::RocksDB; +use alloy_primitives::{Address, address}; +use alloy_sol_types::{SolEvent, sol}; +use eyre::Result; +use reth_primitives::{Block, Receipt, RecoveredBlock}; +use reth_tracing::tracing::debug; + +// BorrowPositionProxyV2 address +const BERA_DOLOMITE_CONTRACT_ADDRESS: Address = + address!("0xC06271eb97d960F4034DDF953e16271CcB2B10BD"); + +sol! { + event BorrowPositionOpen( + address indexed _borrower, + uint256 indexed _borrowAccountNumber + ); +} + +pub async fn process_dolomite_borrow_positions( + block_data: &(RecoveredBlock, Vec), + db: &RocksDB, +) -> Result<()> { + let block = &block_data.0; + let receipts = &block_data.1; + + // Iterate through transactions and their receipts + for (tx_idx, (tx, receipt)) in block.body().transactions.iter().zip(receipts.iter()).enumerate() + { + // Process each log in the receipt + for (log_idx, log) in receipt.logs.iter().enumerate() { + // Filter on dolomite contract address + if log.address == BERA_DOLOMITE_CONTRACT_ADDRESS { + // First check if this log matches our event signature + if log.topics().get(0) != Some(&BorrowPositionOpen::SIGNATURE_HASH) { + continue; + } + + match BorrowPositionOpen::decode_raw_log(log.topics(), &log.data.data) { + Ok(create) => { + db.save( + "dolomite_borrow_positions", + &format!("{}", create._borrowAccountNumber), + &format!("{}", create._borrower), + ); + } + Err(e) => { + debug!("Failed to decode dolomite borrow position open event: {:?}", e); + } + } + } + } + } + Ok(()) +} diff --git a/crates/strategy/src/liquidator/datasets/mod.rs b/crates/strategy/src/liquidator/datasets/mod.rs new file mode 100644 index 0000000..635209d --- /dev/null +++ b/crates/strategy/src/liquidator/datasets/mod.rs @@ -0,0 +1,2 @@ +pub mod aave_execute_borrow; +pub mod dolomite_borrow_position; diff --git a/crates/strategy/src/liquidator/db_writer.rs b/crates/strategy/src/liquidator/db_writer.rs new file mode 100644 index 0000000..03ac864 --- /dev/null +++ b/crates/strategy/src/liquidator/db_writer.rs @@ -0,0 +1,84 @@ +use rocksdb::{ColumnFamilyDescriptor, DB, Options}; +use std::{collections::HashSet, sync::Arc}; + +// database table 이름 타입으로 정의 +pub enum TableName { + DolomiteBorrowPositions, + AaveExecuteBorrow, +} + +impl TableName { + pub fn from_str(s: &str) -> Option { + match s { + "dolomite_borrow_positions" => Some(TableName::DolomiteBorrowPositions), + "aave_execute_borrow" => Some(TableName::AaveExecuteBorrow), + _ => None, + } + } + + pub fn as_str(&self) -> &'static str { + match self { + TableName::DolomiteBorrowPositions => "dolomite_borrow_positions", + TableName::AaveExecuteBorrow => "aave_execute_borrow", + } + } +} + +#[derive(Clone)] +pub struct RocksDB { + db: Arc, +} + +impl RocksDB { + pub fn init(file_path: &str, required_cfs: &[&str]) -> Self { + let existing_cfs = DB::list_cf(&Options::default(), file_path) + .unwrap_or_else(|_| vec!["default".to_string()]); + let existing_cf_set: HashSet = existing_cfs.iter().cloned().collect(); + + let cf_descriptors: Vec<_> = existing_cfs + .iter() + .map(|cf_name| ColumnFamilyDescriptor::new(cf_name, Options::default())) + .collect(); + + let mut db = DB::open_cf_descriptors(&Options::default(), file_path, cf_descriptors) + .expect("Failed to open DB"); + + for cf in required_cfs { + if !existing_cf_set.contains(*cf) { + db.create_cf(*cf, &Options::default()) + .expect(&format!("Failed to create CF: {}", cf)); + } + } + + RocksDB { db: Arc::new(db) } + } + + pub fn save(&self, cf: &str, k: &str, v: &str) -> bool { + let cf = self.db.cf_handle(cf).expect("missing CF"); + self.db.put_cf(cf, k.as_bytes(), v.as_bytes()).is_ok() + } + + pub fn find(&self, cf: &str, k: &str) -> Option { + let cf = self.db.cf_handle(cf).expect("missing CF"); + match self.db.get_cf(cf, k.as_bytes()) { + Ok(Some(v)) => { + let result = String::from_utf8(v).unwrap(); + println!("Finding '{}' returns '{}'", k, result); + Some(result) + } + Ok(None) => { + println!("Finding '{}' returns None", k); + None + } + Err(e) => { + println!("Error retrieving value for {}: {}", k, e); + None + } + } + } + + pub fn delete(&self, cf: &str, k: &str) -> bool { + let cf = self.db.cf_handle(cf).expect("missing CF"); + self.db.delete_cf(cf, k.as_bytes()).is_ok() + } +} diff --git a/crates/strategy/src/liquidator/liquidator.rs b/crates/strategy/src/liquidator/liquidator.rs new file mode 100644 index 0000000..8e4421c --- /dev/null +++ b/crates/strategy/src/liquidator/liquidator.rs @@ -0,0 +1,251 @@ +use crate::{ + core::strategy::Strategy, + liquidator::{ + LiquidationPayload, + datasets::{ + aave_execute_borrow::process_aave_execute_borrow, + dolomite_borrow_position::process_dolomite_borrow_positions, + }, + db_writer::{RocksDB, TableName}, + }, +}; +use alloy_eips::NumHash; +use alloy_primitives::Address; +use alloy_rpc_types::AccessList; +use eyre::{Error, Result}; +use reth_primitives::{Block, Receipt, RecoveredBlock}; +use reth_provider::{BlockHashReader, DBProvider, LatestStateProviderRef, StateCommitmentProvider}; +use reth_revm::state::Bytecode; +use reth_tracing::tracing; +use reth_transaction_pool::PoolTransaction; +use searcher_reth_manager::{ + common::{CommonStrategyConfig, StrategyConfig}, + gas::GasConfig, + types::StrategyCandidates, +}; +use std::time::Instant; + +struct ProcessorInfo { + table_name: &'static str, + processor_name: String, + processor: for<'a> fn( + &'a (RecoveredBlock, Vec), + &'a RocksDB, + ) -> futures::future::BoxFuture<'a, Result<()>>, +} + +impl ProcessorInfo { + fn new( + table_name: &'static str, + processor_name: &str, + processor: for<'a> fn( + &'a (RecoveredBlock, Vec), + &'a RocksDB, + ) -> futures::future::BoxFuture<'a, Result<()>>, + ) -> Self { + Self { table_name, processor_name: processor_name.to_string(), processor } + } +} + +pub struct Liquidator { + processors: Vec, + config: StrategyConfig, +} + +impl Strategy for Liquidator { + // TODO : define action for liquidator + type Action = LiquidationPayload; + + fn new(config: StrategyConfig) -> Self { + Self { processors: Vec::new(), config: config.clone() } + } + + fn gas_config(&self) -> GasConfig { + self.config.get_gas_config() + } + + fn get_code(&self) -> Bytecode { + Bytecode::default() + } + + fn get_vault(&self) -> Address { + self.config.get_vault() + } + + fn prepare(&mut self, chain_id: u64) { + tracing::info!( + target: "liquidator", + event = "prepare_enter", + chain_id = chain_id, + "Entered prepare()" + ); + let config_result = self.config.load_candidates(chain_id); + let processors = match config_result { + StrategyCandidates::Processors(c) => c, + _ => vec![], + }; + // processors for 문 돌면서 add_processor 해줘야함 + for processor in processors { + self.add_processor(processor.table, processor.processor); + } + + tracing::info!("Initialized liquidator with processors: {:?}", self.list_processors()); + tracing::info!( + target: "liquidator", + event = "prepare_exit", + chain_id = chain_id, + "Exiting prepare()" + ); + } + + fn find_profitable_candidates< + T: PoolTransaction, + DB: DBProvider + BlockHashReader + StateCommitmentProvider, + >( + &mut self, + block: NumHash, + latest_state_provider: LatestStateProviderRef<'_, DB>, + pending_txs: Vec, + ) -> Result, AccessList)>, Error> { + // TODO : define actions for find_profitable_candidates stage + // 1. get liquidations candidates from db + // 2. filter candidates which can be liquidated + // 3. return the candidates + // 4. store new candidates that indexer processed + Ok(None) + } +} + +impl Liquidator { + // add_processor adds indexer processor to liquidator + fn add_processor(&mut self, table_name: String, processor_name: String) { + let table = match TableName::from_str(&table_name) { + Some(t) => t, + None => { + return; + } // Skip if table name is not recognized + }; + + let processor = match table { + TableName::DolomiteBorrowPositions => { + ProcessorInfo::new(table.as_str(), &processor_name, |block_data, db| { + Box::pin(process_dolomite_borrow_positions(block_data, db)) + }) + } + TableName::AaveExecuteBorrow => { + ProcessorInfo::new(table.as_str(), &processor_name, |block_data, db| { + Box::pin(process_aave_execute_borrow(block_data, db)) + }) + } + }; + self.processors.push(processor); + } + + fn list_processors(&self) -> Vec<&str> { + self.processors.iter().map(|p| p.processor_name.as_str()).collect() + } + + async fn process_blocks( + &self, + blocks_and_receipts: impl Iterator, &Vec)>, + db: &RocksDB, + ) -> Result<()> { + // Convert the iterator items into owned values directly + let blocks_and_receipts: Vec<_> = blocks_and_receipts + .map(|(block, receipts)| (block.clone(), receipts.clone())) + .collect(); + + for (block, receipts) in blocks_and_receipts { + let block_number = block.number; + let block_data = (block, receipts); + if let Err(e) = self.process_block_data(&block_data, db).await { + tracing::warn!("Failed to process block {}: {}", block_number, e); + } + } + + Ok(()) + } + + async fn process_block_data( + &self, + block_data: &(RecoveredBlock, Vec), + db: &RocksDB, + ) -> Result<()> { + let block_number = block_data.0.number; + + // Create a vector to store all processing tasks + let mut tasks = Vec::new(); + + // Spawn a task for each enabled processor + for processor in &self.processors { + // Clone necessary data for the task + let processor_name = processor.processor_name.clone(); + let processor_fn = processor.processor; + let block_data = block_data.clone(); + let db = db.clone(); // Clone db before spawn + + // Spawn the task + let task = tokio::spawn(async move { + let event_start_time = Instant::now(); + match processor_fn(&block_data, &db).await { + Ok(()) => Ok((processor_name, event_start_time.elapsed())), + Err(e) => Err((processor_name, e.to_string())), + } + }); + + tasks.push(task); + } + + // Wait for all tasks to complete and collect results + let mut total_records = 0; + let mut event_results = Vec::new(); + let mut failed_events = Vec::new(); + + for task in tasks { + match task.await { + Ok(Ok((name, duration))) => { + total_records += 1; // Assuming each event writes one record for now + event_results.push((name, duration)); + } + Ok(Err((name, error))) => { + failed_events.push((name, error)); + } + Err(e) => { + tracing::warn!("Task join error: {}", e); + } + } + } + + // Sort events by name for consistent logging + event_results.sort_by(|a, b| a.0.cmp(&b.0)); + + // Create a consolidated success log + if !event_results.is_empty() { + let events_summary: Vec = event_results + .iter() + .map(|(name, time)| format!("{}({}, {:.2}s)", name, 1, time.as_secs_f64())) + .collect(); + + tracing::info!( + "exex{{id=\"exex-indexer\"}}: Block {} processed - Events: [{}], Total records: {}", + block_number, + events_summary.join(", "), + total_records + ); + } + + // Create a consolidated error log + if !failed_events.is_empty() { + let failure_summary: Vec = + failed_events.iter().map(|(name, error)| format!("{}: {}", name, error)).collect(); + + tracing::warn!( + "exex{{id=\"exex-indexer\"}}: Block {} failures - {}", + block_number, + failure_summary.join(", ") + ); + } + + Ok(()) + } +} diff --git a/crates/strategy/src/liquidator/mod.rs b/crates/strategy/src/liquidator/mod.rs new file mode 100644 index 0000000..af775d1 --- /dev/null +++ b/crates/strategy/src/liquidator/mod.rs @@ -0,0 +1,9 @@ +mod datasets; +mod db_writer; +mod liquidator; +mod types; + +pub use datasets::*; +pub use db_writer::*; +pub use liquidator::*; +pub use types::*; diff --git a/crates/strategy/src/liquidator/types.rs b/crates/strategy/src/liquidator/types.rs new file mode 100644 index 0000000..e08d478 --- /dev/null +++ b/crates/strategy/src/liquidator/types.rs @@ -0,0 +1,34 @@ +use alloy_sol_types::sol; + +sol! { + #[derive(Debug)] + struct LiquidationPayload { + uint8 lendingType; // Protocol type: 1 = AAVE V3, 2+ = future protocols + uint256 liquidateAmount; // Amount of debt to repay (e.g., 1000 ETH) + address liquidateToken; // Debt token address to repay (e.g., WETH, DAI) + address collateralToken; // Collateral token to receive (e.g., WBTC, USDC) + address lendingProtocol; // Protocol contract address (e.g., AAVE Pool) + address user; // User address to liquidate (must have HF < 1) + } + + function getProfit( + uint256 initialAmt, + address stableToken, // USDC or other stable token address + LiquidationPayload memory payload + ) external view returns (uint256 finalAmount); + + #[derive(Debug)] + struct LiquidationPayloads { + uint256 amount; + LiquidationPayload payload; + address swapRouterBefore; + address swapRouterAfter; + bytes swapDataBefore; + bytes swapDataAfter; + } + + function execute( + address stableToken, + LiquidationPayloads[] memory liquidations + ) external returns (uint256 totalProfit); +} diff --git a/crates/strategy/src/path_finding/path_finder.rs b/crates/strategy/src/path_finding/path_finder.rs index 16f5535..88be320 100644 --- a/crates/strategy/src/path_finding/path_finder.rs +++ b/crates/strategy/src/path_finding/path_finder.rs @@ -32,7 +32,7 @@ use reth_transaction_pool::PoolTransaction; use searcher_reth_manager::{ common::{CommonStrategyConfig, ONE_ETHER, StrategyConfig}, gas::GasConfig, - types::CandidateEntry, + types::{CandidateEntry, StrategyCandidates}, }; use crate::path_finding::types::executeCall; @@ -81,7 +81,11 @@ impl Strategy for PathFinder { chain_id = chain_id, "Entered prepare()" ); - let candidates = self.config.load_candidates(chain_id); + let config_result = self.config.load_candidates(chain_id); + let candidates = match config_result { + StrategyCandidates::Candidates(c) => c, + _ => vec![], + }; tracing::info!( target: "path-finder", event = "candidates_count", diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..14eb32b --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,3 @@ +[toolchain] +channel = "nightly" +components = ["rustfmt", "clippy"] \ No newline at end of file