diff --git a/.gitignore b/.gitignore index eef72e6d..9cf3e7a4 100644 --- a/.gitignore +++ b/.gitignore @@ -7,7 +7,10 @@ /tmp # will have compiled files and executables -/target +**/target + +# migration crate is a subpackage; its lock file is autogenerated +migration/Cargo.lock # These are backup files generated by rustfmt **/*.rs.bk diff --git a/.gitmodules b/.gitmodules index 1af6d865..8e328b08 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,5 +1,5 @@ [submodule "rust-lightning"] path = rust-lightning - url = https://github.com/RGB-Tools/rust-lightning.git - branch = rgb + url = https://github.com/dcorral/rust-lightning.git + branch = persistence-layer shallow = true diff --git a/Cargo.lock b/Cargo.lock index 8d2f31b7..e702327d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2374,6 +2374,7 @@ name = "lightning" version = "0.2.2" dependencies = [ "bech32 0.11.1", + "bincode", "bitcoin", "dnssec-prover", "futures", @@ -2386,7 +2387,6 @@ dependencies = [ "possiblyrandom", "rgb-lib", "serde", - "serde_json", "tokio", ] @@ -3630,8 +3630,8 @@ dependencies = [ [[package]] name = "rgb-lib" -version = "0.3.0-beta.5" -source = "git+https://github.com/RGB-Tools/rgb-lib?branch=master#57a88030f522860e8450b6927de2b83005c658a2" +version = "0.3.0-beta.6" +source = "git+https://github.com/RGB-Tools/rgb-lib?tag=0.3.0-beta.6#129982767af09f5729726f4a7ff4518d4646f852" dependencies = [ "amplify", "base64 0.22.1", @@ -3672,8 +3672,8 @@ dependencies = [ [[package]] name = "rgb-lib-migration" -version = "0.3.0-beta.3" -source = "git+https://github.com/RGB-Tools/rgb-lib?branch=master#57a88030f522860e8450b6927de2b83005c658a2" +version = "0.3.0-beta.4" +source = "git+https://github.com/RGB-Tools/rgb-lib?tag=0.3.0-beta.6#129982767af09f5729726f4a7ff4518d4646f852" dependencies = [ "sea-orm-migration", "tokio", @@ -3689,6 +3689,7 @@ dependencies = [ "axum-extra", "baid58", "base64 0.22.1", + "bincode", "biscuit-auth", "bitcoin", "bitcoin-bech32", @@ -3717,7 +3718,9 @@ dependencies = [ "regex", "reqwest 0.12.28", "rgb-lib", + "rln-migration", "scrypt", + "sea-orm", "serde", "serde_json", "serial_test", @@ -3859,6 +3862,14 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "rln-migration" +version = "0.1.0" +dependencies = [ + "sea-orm-migration", + "tokio", +] + [[package]] name = "rsa" version = "0.9.10" diff --git a/Cargo.toml b/Cargo.toml index fed84e29..6153a615 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ axum-extra = "0.9.4" # axum-macros = "0.4.2" # uncomment to use debug_handler baid58 = "0.4.4" base64 = "0.22.1" +bincode = "1" biscuit-auth = "6.0.0" bitcoin = "0.32" bitcoin-bech32 = "0.13" @@ -36,10 +37,16 @@ lightning-rapid-gossip-sync = { version = "0.2.0", path = "./rust-lightning/ligh magic-crypt = "4.0.1" rand = "0.8.5" regex = { version = "1.11", default-features = false } -rgb-lib = { version = "0.3.0-beta.5", features = [ +rgb-lib = { version = "0.3.0-beta.6", features = [ "electrum", "esplora", ] } +rln-migration = { path = "migration" } +sea-orm = { version = "1.1.19", default-features = false, features = [ + "macros", + "runtime-tokio-rustls", + "sqlx-sqlite", +] } scrypt = "0.11.0" serde = { version = "^1.0", features = ["derive"] } serde_json = "1.0" @@ -71,7 +78,7 @@ tracing-test = "0.2.5" [patch.crates-io] lightning = { path = "./rust-lightning/lightning" } lightning-background-processor = { path = "./rust-lightning/lightning-background-processor"} -rgb-lib = { git = "https://github.com/RGB-Tools/rgb-lib", branch = "master" } +rgb-lib = { git = "https://github.com/RGB-Tools/rgb-lib", tag = "0.3.0-beta.6" } [lints.rust.unexpected_cfgs] level = "allow" diff --git a/migration/Cargo.toml b/migration/Cargo.toml new file mode 100644 index 00000000..5d2c3cc9 --- /dev/null +++ b/migration/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "rln-migration" +version = "0.1.0" +edition = "2021" +publish = false + +[lib] +name = "rln_migration" +path = "src/lib.rs" + +[dependencies] +sea-orm-migration = { version = "1.1.19", default-features = false, features = [ + "cli", + "runtime-tokio-rustls", + "sqlx-postgres", + "sqlx-sqlite", +] } +tokio = { version = "1", features = ["rt", "macros"] } diff --git a/migration/README.md b/migration/README.md new file mode 100644 index 00000000..d5634477 --- /dev/null +++ b/migration/README.md @@ -0,0 +1,57 @@ +# DB migrations + +Every time a change to a DB object or table is needed, a migration has to be +created. + +In rgb-lightning-node we use sea-orm tools to handle the DB and its migrations. + +To generate new migrations the [sea-orm-cli] tool is needed. You should install +the same version that has been previously used. You can find this in the +`src/database/entities/mod.rs` file, where the first line will specify `//! +SeaORM Entity. Generated by sea-orm-codegen `. Install it with: +```sh +cargo install sea-orm-cli --version +``` + +Then, to generate a new migration file, run: +```sh +sea-orm-cli migrate generate +``` + +This command will create a new file where you'll find the `up` and `down` +methods (see `migration/src/m20250127_000001_init_db.rs` for an example). These +methods will be empty and will need to be implemented in order to give +instructions on how to respectively update and revert the new changes. + +Once the migration file is ready, you'll need to run a local postgres DB and +use it to refresh the migration and generate entities with `sea-orm-cli`. This +is accomplished with: +```sh +docker pull postgres:latest + +docker run -p 127.0.0.1:5432:5432/tcp --name migration-postgres \ + -e POSTGRES_PASSWORD=mysecretpassword -d postgres + +DATABASE_URL=postgres://postgres:mysecretpassword@localhost:5432 \ + sea-orm-cli migrate up + +DATABASE_URL=postgres://postgres:mysecretpassword@localhost:5432 \ + sea-orm-cli migrate refresh + +DATABASE_URL=postgres://postgres:mysecretpassword@localhost:5432/postgres \ + sea-orm-cli generate entity -o src/database/entities --expanded-format + +docker rm -f migration-postgres +``` + +The command to generate entities will apply some unwanted changes, for example +it will change the enum fields to integers and will remove some extra `derive`s +that we manually added. Those changes will need to be discarded, so please be +sure to add only the code that is related to the new changes you just applied. +To do this we suggest to first refresh the migration and generate entities with +`sea-orm-cli` on the branch you are about to apply the DB changes on. The +generated diff will only include unwanted changes, so they can be used as a +reference to revert them. + + +[sea-orm-cli]: https://github.com/SeaQL/sea-orm/tree/master/sea-orm-cli diff --git a/migration/src/lib.rs b/migration/src/lib.rs new file mode 100644 index 00000000..540be398 --- /dev/null +++ b/migration/src/lib.rs @@ -0,0 +1,12 @@ +pub use sea_orm_migration::prelude::*; + +mod m20250127_000001_init_db; + +pub struct Migrator; + +#[async_trait::async_trait] +impl MigratorTrait for Migrator { + fn migrations() -> Vec> { + vec![Box::new(m20250127_000001_init_db::Migration)] + } +} diff --git a/migration/src/m20250127_000001_init_db.rs b/migration/src/m20250127_000001_init_db.rs new file mode 100644 index 00000000..257228bb --- /dev/null +++ b/migration/src/m20250127_000001_init_db.rs @@ -0,0 +1,147 @@ +use sea_orm_migration::{prelude::*, schema::*}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(KvStore::Table) + .if_not_exists() + .col(string(KvStore::PrimaryNamespace)) + .col(string(KvStore::SecondaryNamespace)) + .col(string(KvStore::Key)) + .col(blob(KvStore::Value)) + .primary_key( + Index::create() + .col(KvStore::PrimaryNamespace) + .col(KvStore::SecondaryNamespace) + .col(KvStore::Key), + ) + .to_owned(), + ) + .await?; + + manager + .create_table( + Table::create() + .table(Config::Table) + .if_not_exists() + .col( + ColumnDef::new(Config::Idx) + .integer() + .not_null() + .primary_key(), + ) + .col(string(Config::EncryptedMnemonic)) + .col(string_null(Config::IndexerUrl)) + .col(string_null(Config::BitcoinNetwork)) + .col(string_null(Config::WalletFingerprint)) + .col(string_null(Config::WalletAccountXpubVanilla)) + .col(string_null(Config::WalletAccountXpubColored)) + .col(string_null(Config::WalletMasterFingerprint)) + .col(big_unsigned(Config::CreatedAt)) + .col(big_unsigned(Config::UpdatedAt)) + .to_owned(), + ) + .await?; + + manager + .create_table( + Table::create() + .table(RevokedToken::Table) + .if_not_exists() + .col( + ColumnDef::new(RevokedToken::TokenId) + .string() + .not_null() + .primary_key(), + ) + .col( + ColumnDef::new(RevokedToken::RevokedAt) + .big_integer() + .not_null(), + ) + .to_owned(), + ) + .await?; + + manager + .create_table( + Table::create() + .table(ChannelPeer::Table) + .if_not_exists() + .col( + ColumnDef::new(ChannelPeer::Pubkey) + .string() + .not_null() + .primary_key(), + ) + .col(ColumnDef::new(ChannelPeer::Address).string().not_null()) + .col( + ColumnDef::new(ChannelPeer::CreatedAt) + .big_integer() + .not_null(), + ) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(ChannelPeer::Table).to_owned()) + .await?; + manager + .drop_table(Table::drop().table(RevokedToken::Table).to_owned()) + .await?; + manager + .drop_table(Table::drop().table(Config::Table).to_owned()) + .await?; + manager + .drop_table(Table::drop().table(KvStore::Table).to_owned()) + .await + } +} + +#[derive(DeriveIden)] +enum KvStore { + Table, + PrimaryNamespace, + SecondaryNamespace, + Key, + Value, +} + +#[derive(DeriveIden)] +enum Config { + Table, + Idx, + EncryptedMnemonic, + IndexerUrl, + BitcoinNetwork, + WalletFingerprint, + WalletAccountXpubVanilla, + WalletAccountXpubColored, + WalletMasterFingerprint, + CreatedAt, + UpdatedAt, +} + +#[derive(DeriveIden)] +enum RevokedToken { + Table, + TokenId, + RevokedAt, +} + +#[derive(DeriveIden)] +enum ChannelPeer { + Table, + Pubkey, + Address, + CreatedAt, +} diff --git a/migration/src/main.rs b/migration/src/main.rs new file mode 100644 index 00000000..5f9306ae --- /dev/null +++ b/migration/src/main.rs @@ -0,0 +1,6 @@ +use sea_orm_migration::prelude::*; + +#[tokio::main] +async fn main() { + cli::run_cli(rln_migration::Migrator).await; +} diff --git a/rust-lightning b/rust-lightning index 245e4f55..03499296 160000 --- a/rust-lightning +++ b/rust-lightning @@ -1 +1 @@ -Subproject commit 245e4f5517ce3f3678172e31da4aaa91da55b2c1 +Subproject commit 034992967627db48abeab066bf2e28befefe8ab7 diff --git a/src/auth.rs b/src/auth.rs index 65bd3a33..69a5a01d 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -1,21 +1,12 @@ use axum::{body::Body, extract::State, http::Request, middleware::Next, response::Response}; use biscuit_auth::{macros::authorizer, Biscuit, PublicKey}; -use std::{ - collections::HashSet, - fs, - io::{BufRead, BufReader, Write as IoWrite}, - path::PathBuf, - sync::Arc, -}; -use tempfile::NamedTempFile; +use std::{collections::HashSet, sync::Arc}; use crate::{ error::{APIError, AppError, AuthError}, utils::{hex_str, hex_str_to_vec, AppState}, }; -const REVOKED_TOKENS_FILE: &str = "revoked_tokens.txt"; - const READ_ONLY_OPS: [&str; 23] = [ "/assetbalance", "/assetmetadata", @@ -183,55 +174,12 @@ fn is_token_expired(token: &Biscuit) -> bool { impl AppState { pub(crate) fn revoke_token(&self, token_to_revoke: &Biscuit) -> Result<(), APIError> { let revocation_ids = token_to_revoke.revocation_identifiers(); + let token_id_hexes: Vec = revocation_ids.iter().map(|id| hex_str(id)).collect(); - let file_body = { - let mut revoked = self.revoked_tokens.lock().unwrap(); - for id in revocation_ids { - revoked.insert(id); - } - - let mut updated_list = String::new(); - for token_id in revoked.iter() { - updated_list.push_str(&hex_str(token_id)); - updated_list.push('\n'); - } - updated_list - }; // drop lock - - let path = self.get_revoked_tokens_path(); + self.get_db().add_revoked_tokens(token_id_hexes)?; - // write to a temp file - let dir = path.parent().expect("parent defined"); - let mut tmp = NamedTempFile::new_in(dir).map_err(|e| { - tracing::error!( - "Failed to create temporary file in {}: {}", - dir.display(), - e - ); - APIError::IO(e) - })?; - tmp.as_file_mut() - .write_all(file_body.as_bytes()) - .and_then(|_| tmp.as_file_mut().flush()) - .and_then(|_| tmp.as_file().sync_all()) - .map_err(|e| { - tracing::error!( - "Failed to write/flush/sync temporary revoked-tokens file: {}", - e - ); - APIError::IO(e) - })?; - - // atomically replace the destination file with the synced temp file - tmp.persist(&path).map_err(|persist_err| { - let e = persist_err.error; - tracing::error!( - "Failed to persist temporary file to {}: {}", - path.display(), - e - ); - APIError::IO(e) - })?; + let mut revoked = self.revoked_tokens.lock().unwrap(); + revoked.extend(revocation_ids); Ok(()) } @@ -242,64 +190,15 @@ impl AppState { !revocation_ids.is_disjoint(&*revoked) } - fn get_revoked_tokens_path(&self) -> PathBuf { - self.static_state.storage_dir_path.join(REVOKED_TOKENS_FILE) - } - pub(crate) fn load_revoked_tokens(&self) -> Result>, AppError> { - let path = self.get_revoked_tokens_path(); - - let file = match fs::File::open(&path) { - Ok(f) => f, - Err(e) if e.kind() == std::io::ErrorKind::NotFound => { - tracing::info!( - "No revoked tokens file found at {}, starting with empty set", - path.display() - ); - return Ok(HashSet::new()); - } - Err(e) => { - tracing::error!( - "Failed to open revoked tokens file {}: {}", - path.display(), - e - ); - return Err(AppError::IO(e)); - } - }; - - let mut revoked: HashSet> = HashSet::new(); - let reader = BufReader::new(file); - for (lineno, line_res) in reader.lines().enumerate() { - let line = line_res.map_err(|e| { - tracing::error!( - "I/O error while reading {} at line {}: {}", - path.display(), - lineno + 1, - e - ); - AppError::IO(e) - })?; - let s = line.trim(); - if s.is_empty() || s.starts_with('#') { - continue; - } - match hex_str_to_vec(s) { - Some(token_id) => { - revoked.insert(token_id); - } - None => { - tracing::error!( - "Invalid hex string in revoked tokens at {}:{} -> {:?}", - path.display(), - lineno + 1, - s - ); - return Err(AppError::InvalidRevokedTokensFile); - } - } - } - tracing::info!("Loaded {} revoked tokens", revoked.len()); + let db = self.get_db(); + let revoked = db.load_revoked_tokens().map_err(|e| { + tracing::error!("Failed to load revoked tokens from database: {e:?}"); + AppError::IO(std::io::Error::other(format!( + "Failed to load revoked tokens: {e:?}" + ))) + })?; + tracing::info!("Loaded {} revoked tokens from database", revoked.len()); Ok(revoked) } } diff --git a/src/database/entities/channel_peer.rs b/src/database/entities/channel_peer.rs new file mode 100644 index 00000000..d02b2999 --- /dev/null +++ b/src/database/entities/channel_peer.rs @@ -0,0 +1,60 @@ +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.20 + +use sea_orm::entity::prelude::*; + +#[derive(Copy, Clone, Default, Debug, DeriveEntity)] +pub struct Entity; + +impl EntityName for Entity { + fn table_name(&self) -> &str { + "channel_peer" + } +} + +#[derive(Clone, Debug, PartialEq, DeriveModel, DeriveActiveModel, Eq)] +pub struct Model { + pub pubkey: String, + pub address: String, + pub created_at: i64, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] +pub enum Column { + Pubkey, + Address, + CreatedAt, +} + +#[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)] +pub enum PrimaryKey { + Pubkey, +} + +impl PrimaryKeyTrait for PrimaryKey { + type ValueType = String; + fn auto_increment() -> bool { + false + } +} + +#[derive(Copy, Clone, Debug, EnumIter)] +pub enum Relation {} + +impl ColumnTrait for Column { + type EntityName = Entity; + fn def(&self) -> ColumnDef { + match self { + Self::Pubkey => ColumnType::String(StringLen::None).def(), + Self::Address => ColumnType::String(StringLen::None).def(), + Self::CreatedAt => ColumnType::BigInteger.def(), + } + } +} + +impl RelationTrait for Relation { + fn def(&self) -> RelationDef { + panic!("No RelationDef") + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/src/database/entities/config.rs b/src/database/entities/config.rs new file mode 100644 index 00000000..6a0d7f9f --- /dev/null +++ b/src/database/entities/config.rs @@ -0,0 +1,81 @@ +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.20 + +use sea_orm::entity::prelude::*; + +#[derive(Copy, Clone, Default, Debug, DeriveEntity)] +pub struct Entity; + +impl EntityName for Entity { + fn table_name(&self) -> &str { + "config" + } +} + +#[derive(Clone, Debug, PartialEq, DeriveModel, DeriveActiveModel, Eq)] +pub struct Model { + pub idx: i32, + pub encrypted_mnemonic: String, + pub indexer_url: Option, + pub bitcoin_network: Option, + pub wallet_fingerprint: Option, + pub wallet_account_xpub_vanilla: Option, + pub wallet_account_xpub_colored: Option, + pub wallet_master_fingerprint: Option, + pub created_at: i64, + pub updated_at: i64, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] +pub enum Column { + Idx, + EncryptedMnemonic, + IndexerUrl, + BitcoinNetwork, + WalletFingerprint, + WalletAccountXpubVanilla, + WalletAccountXpubColored, + WalletMasterFingerprint, + CreatedAt, + UpdatedAt, +} + +#[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)] +pub enum PrimaryKey { + Idx, +} + +impl PrimaryKeyTrait for PrimaryKey { + type ValueType = i32; + fn auto_increment() -> bool { + false + } +} + +#[derive(Copy, Clone, Debug, EnumIter)] +pub enum Relation {} + +impl ColumnTrait for Column { + type EntityName = Entity; + fn def(&self) -> ColumnDef { + match self { + Self::Idx => ColumnType::Integer.def(), + Self::EncryptedMnemonic => ColumnType::String(StringLen::None).def(), + Self::IndexerUrl => ColumnType::String(StringLen::None).def().null(), + Self::BitcoinNetwork => ColumnType::String(StringLen::None).def().null(), + Self::WalletFingerprint => ColumnType::String(StringLen::None).def().null(), + Self::WalletAccountXpubVanilla => ColumnType::String(StringLen::None).def().null(), + Self::WalletAccountXpubColored => ColumnType::String(StringLen::None).def().null(), + Self::WalletMasterFingerprint => ColumnType::String(StringLen::None).def().null(), + Self::CreatedAt => ColumnType::BigInteger.def(), + Self::UpdatedAt => ColumnType::BigInteger.def(), + } + } +} + +impl RelationTrait for Relation { + fn def(&self) -> RelationDef { + panic!("No RelationDef") + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/src/database/entities/kv_store.rs b/src/database/entities/kv_store.rs new file mode 100644 index 00000000..ff33a157 --- /dev/null +++ b/src/database/entities/kv_store.rs @@ -0,0 +1,65 @@ +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.20 + +use sea_orm::entity::prelude::*; + +#[derive(Copy, Clone, Default, Debug, DeriveEntity)] +pub struct Entity; + +impl EntityName for Entity { + fn table_name(&self) -> &str { + "kv_store" + } +} + +#[derive(Clone, Debug, PartialEq, DeriveModel, DeriveActiveModel, Eq)] +pub struct Model { + pub primary_namespace: String, + pub secondary_namespace: String, + pub key: String, + pub value: Vec, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] +pub enum Column { + PrimaryNamespace, + SecondaryNamespace, + Key, + Value, +} + +#[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)] +pub enum PrimaryKey { + PrimaryNamespace, + SecondaryNamespace, + Key, +} + +impl PrimaryKeyTrait for PrimaryKey { + type ValueType = (String, String, String); + fn auto_increment() -> bool { + false + } +} + +#[derive(Copy, Clone, Debug, EnumIter)] +pub enum Relation {} + +impl ColumnTrait for Column { + type EntityName = Entity; + fn def(&self) -> ColumnDef { + match self { + Self::PrimaryNamespace => ColumnType::String(StringLen::None).def(), + Self::SecondaryNamespace => ColumnType::String(StringLen::None).def(), + Self::Key => ColumnType::String(StringLen::None).def(), + Self::Value => ColumnType::VarBinary(StringLen::None).def(), + } + } +} + +impl RelationTrait for Relation { + fn def(&self) -> RelationDef { + panic!("No RelationDef") + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/src/database/entities/mod.rs b/src/database/entities/mod.rs new file mode 100644 index 00000000..6fbfdc1b --- /dev/null +++ b/src/database/entities/mod.rs @@ -0,0 +1,8 @@ +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.20 + +pub mod prelude; + +pub mod channel_peer; +pub mod config; +pub mod kv_store; +pub mod revoked_token; diff --git a/src/database/entities/prelude.rs b/src/database/entities/prelude.rs new file mode 100644 index 00000000..384c36a9 --- /dev/null +++ b/src/database/entities/prelude.rs @@ -0,0 +1,6 @@ +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.20 + +pub use super::channel_peer::Entity as ChannelPeer; +pub use super::config::Entity as Config; +pub use super::kv_store::Entity as KvStore; +pub use super::revoked_token::Entity as RevokedToken; diff --git a/src/database/entities/revoked_token.rs b/src/database/entities/revoked_token.rs new file mode 100644 index 00000000..00b64cae --- /dev/null +++ b/src/database/entities/revoked_token.rs @@ -0,0 +1,57 @@ +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.20 + +use sea_orm::entity::prelude::*; + +#[derive(Copy, Clone, Default, Debug, DeriveEntity)] +pub struct Entity; + +impl EntityName for Entity { + fn table_name(&self) -> &str { + "revoked_token" + } +} + +#[derive(Clone, Debug, PartialEq, DeriveModel, DeriveActiveModel, Eq)] +pub struct Model { + pub token_id: String, + pub revoked_at: i64, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] +pub enum Column { + TokenId, + RevokedAt, +} + +#[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)] +pub enum PrimaryKey { + TokenId, +} + +impl PrimaryKeyTrait for PrimaryKey { + type ValueType = String; + fn auto_increment() -> bool { + false + } +} + +#[derive(Copy, Clone, Debug, EnumIter)] +pub enum Relation {} + +impl ColumnTrait for Column { + type EntityName = Entity; + fn def(&self) -> ColumnDef { + match self { + Self::TokenId => ColumnType::String(StringLen::None).def(), + Self::RevokedAt => ColumnType::BigInteger.def(), + } + } +} + +impl RelationTrait for Relation { + fn def(&self) -> RelationDef { + panic!("No RelationDef") + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/src/database/mod.rs b/src/database/mod.rs new file mode 100644 index 00000000..384a47e7 --- /dev/null +++ b/src/database/mod.rs @@ -0,0 +1 @@ +pub(crate) mod entities; diff --git a/src/disk.rs b/src/disk.rs index 6c6f104e..b8e40330 100644 --- a/src/disk.rs +++ b/src/disk.rs @@ -1,39 +1,19 @@ -use bitcoin::secp256k1::PublicKey; use bitcoin::Network; use chrono::Utc; use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringDecayParameters}; -use lightning::util::hash_tables::new_hash_map; use lightning::util::logger::{Logger, Record}; -use lightning::util::ser::{Readable, ReadableArgs, Writer}; -use std::collections::HashMap; +use lightning::util::ser::{ReadableArgs, Writer}; use std::fs; use std::fs::File; -use std::io::{BufRead, BufReader}; -use std::net::SocketAddr; +use std::io::BufReader; use std::path::{Path, PathBuf}; use std::sync::Arc; -use crate::error::APIError; -use crate::ldk::{ - ChannelIdsMap, InboundPaymentInfoStorage, NetworkGraph, OutboundPaymentInfoStorage, - OutputSpenderTxes, SwapMap, -}; -use crate::utils::{parse_peer_info, LOGS_DIR}; +use crate::ldk::NetworkGraph; +use crate::utils::LOGS_DIR; pub(crate) const LDK_LOGS_FILE: &str = "logs.txt"; -pub(crate) const INBOUND_PAYMENTS_FNAME: &str = "inbound_payments"; -pub(crate) const OUTBOUND_PAYMENTS_FNAME: &str = "outbound_payments"; - -pub(crate) const CHANNEL_PEER_DATA: &str = "channel_peer_data"; - -pub(crate) const OUTPUT_SPENDER_TXES: &str = "output_spender_txes"; - -pub(crate) const CHANNEL_IDS_FNAME: &str = "channel_ids"; - -pub(crate) const MAKER_SWAPS_FNAME: &str = "maker_swaps"; -pub(crate) const TAKER_SWAPS_FNAME: &str = "taker_swaps"; - pub(crate) struct FilesystemLogger { data_dir: PathBuf, } @@ -73,76 +53,6 @@ impl Logger for FilesystemLogger { } } -pub(crate) fn persist_channel_peer( - path: &Path, - pubkey: &PublicKey, - address: &SocketAddr, -) -> Result<(), APIError> { - let pubkey = pubkey.to_string(); - let peer_info = if path.exists() { - let mut updated_peer_info = fs::read_to_string(path)? - .lines() - .filter(|&line| !line.trim().starts_with(&pubkey)) - .map(|line| line.trim()) - .collect::>() - .join("\n"); - updated_peer_info += format!( - "{}{pubkey}@{address}", - if updated_peer_info.is_empty() { - "" - } else { - "\n" - } - ) - .as_str(); - updated_peer_info - } else { - format!("{pubkey}@{address}") - }; - let mut tmp_path = path.to_path_buf(); - tmp_path.set_extension("ptmp"); - fs::write(&tmp_path, peer_info.to_string().as_bytes())?; - fs::rename(tmp_path, path)?; - tracing::info!("persisted peer (pubkey: {pubkey}, addr: {address})"); - Ok(()) -} - -pub(crate) fn delete_channel_peer(path: &Path, pubkey: String) -> Result<(), APIError> { - if path.exists() { - let updated_peer_info = fs::read_to_string(path)? - .lines() - .filter(|&line| !line.trim().starts_with(&pubkey)) - .map(|line| line.trim()) - .collect::>() - .join("\n"); - let mut tmp_path = path.to_path_buf(); - tmp_path.set_extension("dtmp"); - fs::write(&tmp_path, updated_peer_info.to_string().as_bytes())?; - fs::rename(tmp_path, path)?; - } - Ok(()) -} - -pub(crate) fn read_channel_peer_data( - path: &Path, -) -> Result, APIError> { - let mut peer_data = HashMap::new(); - if !path.exists() { - return Ok(HashMap::new()); - } - let file = File::open(path)?; - let reader = BufReader::new(file); - for line in reader.lines() { - match parse_peer_info(line.unwrap()) { - Ok((pubkey, socket_addr)) => { - peer_data.insert(pubkey, socket_addr.expect("saved info with address")); - } - Err(e) => return Err(e), - } - } - Ok(peer_data) -} - pub(crate) fn read_network( path: &Path, network: Network, @@ -156,48 +66,6 @@ pub(crate) fn read_network( NetworkGraph::new(network, logger) } -pub(crate) fn read_inbound_payment_info(path: &Path) -> InboundPaymentInfoStorage { - if let Ok(file) = File::open(path) { - if let Ok(info) = InboundPaymentInfoStorage::read(&mut BufReader::new(file)) { - return info; - } - } - InboundPaymentInfoStorage { - payments: new_hash_map(), - } -} - -pub(crate) fn read_outbound_payment_info(path: &Path) -> OutboundPaymentInfoStorage { - if let Ok(file) = File::open(path) { - if let Ok(info) = OutboundPaymentInfoStorage::read(&mut BufReader::new(file)) { - return info; - } - } - OutboundPaymentInfoStorage { - payments: new_hash_map(), - } -} - -pub(crate) fn read_output_spender_txes(path: &Path) -> OutputSpenderTxes { - if let Ok(file) = File::open(path) { - if let Ok(info) = OutputSpenderTxes::read(&mut BufReader::new(file)) { - return info; - } - } - new_hash_map() -} - -pub(crate) fn read_swaps_info(path: &Path) -> SwapMap { - if let Ok(file) = File::open(path) { - if let Ok(info) = SwapMap::read(&mut BufReader::new(file)) { - return info; - } - } - SwapMap { - swaps: new_hash_map(), - } -} - pub(crate) fn read_scorer( path: &Path, graph: Arc, @@ -212,14 +80,3 @@ pub(crate) fn read_scorer( } ProbabilisticScorer::new(params, graph, logger) } - -pub(crate) fn read_channel_ids_info(path: &Path) -> ChannelIdsMap { - if let Ok(file) = File::open(path) { - if let Ok(info) = ChannelIdsMap::read(&mut BufReader::new(file)) { - return info; - } - } - ChannelIdsMap { - channel_ids: new_hash_map(), - } -} diff --git a/src/error.rs b/src/error.rs index fd32d4af..fb949f33 100644 --- a/src/error.rs +++ b/src/error.rs @@ -71,9 +71,6 @@ pub enum APIError { #[error("Failed to issue asset: {0}")] FailedIssuingAsset(String), - #[error("Unable to create keys seed file {0}: {1}")] - FailedKeysCreation(String, String), - #[error("Failed to open channel: {0}")] FailedOpenChannel(String), @@ -322,6 +319,12 @@ impl APIError { } } +impl From for APIError { + fn from(err: sea_orm::DbErr) -> Self { + APIError::IO(std::io::Error::other(format!("database error: {err}"))) + } +} + impl From for APIError { fn from(err: axum::extract::rejection::JsonRejection) -> Self { APIError::InvalidRequest(err.to_string()) @@ -428,7 +431,6 @@ impl IntoResponse for APIError { APIError::FailedClosingChannel(_) | APIError::FailedInvoiceCreation(_) | APIError::FailedIssuingAsset(_) - | APIError::FailedKeysCreation(_, _) | APIError::FailedOpenChannel(_) | APIError::FailedPayment(_) | APIError::FailedPeerDisconnection(_) @@ -558,9 +560,6 @@ pub enum AppError { #[error("The provided authentication args are invalid")] InvalidAuthenticationArgs, - #[error("The revoked tokens file contains an invalid entry")] - InvalidRevokedTokensFile, - #[error("The provided root public key is invalid")] InvalidRootKey, diff --git a/src/kv_store.rs b/src/kv_store.rs new file mode 100644 index 00000000..b4670aae --- /dev/null +++ b/src/kv_store.rs @@ -0,0 +1,424 @@ +use std::collections::{HashMap, HashSet}; +use std::net::SocketAddr; +use std::str::FromStr; +use std::sync::Arc; + +use bitcoin::io; +use bitcoin::secp256k1::PublicKey; +use lightning::util::async_poll::AsyncResult; +use lightning::util::persist::{KVStore, KVStoreSync}; +use sea_orm::sea_query::OnConflict; +use sea_orm::{ + ActiveValue, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, TransactionTrait, +}; + +use crate::database::entities::{ + channel_peer, config, kv_store, + prelude::{ChannelPeer, Config, KvStore, RevokedToken}, + revoked_token, +}; +use crate::error::APIError; + +const CONFIG_IDX: i32 = 1; + +/// Drives a future to completion from sync code, using the ambient multi-threaded runtime. +fn block_on(fut: F) -> F::Output +where + F: std::future::Future + Send, + F::Output: Send, +{ + tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(fut)) +} + +pub struct SeaOrmKvStore { + connection: Arc, +} + +impl SeaOrmKvStore { + /// Caller must ensure migrations have already been run. + pub fn from_connection(connection: Arc) -> Self { + Self { connection } + } + + fn get_connection(&self) -> &DatabaseConnection { + &self.connection + } + + pub fn add_revoked_tokens(&self, token_id_hexes: Vec) -> Result<(), APIError> { + let now = crate::utils::get_current_timestamp() as i64; + + block_on( + self.connection + .transaction::<_, (), sea_orm::DbErr>(move |txn| { + Box::pin(async move { + for hex in token_id_hexes { + let token = revoked_token::ActiveModel { + token_id: ActiveValue::Set(hex), + revoked_at: ActiveValue::Set(now), + }; + RevokedToken::insert(token) + .on_conflict( + OnConflict::column(revoked_token::Column::TokenId) + .do_nothing() + .to_owned(), + ) + .exec(txn) + .await?; + } + Ok(()) + }) + }), + ) + .map_err(|e| match e { + sea_orm::TransactionError::Connection(err) + | sea_orm::TransactionError::Transaction(err) => APIError::from(err), + })?; + + Ok(()) + } + + pub fn delete_channel_peer(&self, pubkey: &str) -> Result<(), APIError> { + block_on( + ChannelPeer::delete_many() + .filter(channel_peer::Column::Pubkey.eq(pubkey)) + .exec(self.get_connection()), + )?; + + Ok(()) + } + + pub fn get_config(&self) -> Result, APIError> { + Ok(block_on( + Config::find_by_id(CONFIG_IDX).one(self.get_connection()), + )?) + } + + pub fn is_initialized(&self) -> Result { + Ok(self.get_config()?.is_some()) + } + + pub fn load_revoked_tokens(&self) -> Result>, APIError> { + let results = block_on(RevokedToken::find().all(self.get_connection()))?; + + let mut revoked = HashSet::new(); + for record in results { + if let Some(token_bytes) = crate::utils::hex_str_to_vec(&record.token_id) { + revoked.insert(token_bytes); + } + } + + Ok(revoked) + } + + pub fn persist_channel_peer( + &self, + pubkey: &PublicKey, + address: &SocketAddr, + ) -> Result<(), APIError> { + let now = crate::utils::get_current_timestamp() as i64; + + let peer = channel_peer::ActiveModel { + pubkey: ActiveValue::Set(pubkey.to_string()), + address: ActiveValue::Set(address.to_string()), + created_at: ActiveValue::Set(now), + }; + + block_on( + ChannelPeer::insert(peer) + .on_conflict( + OnConflict::column(channel_peer::Column::Pubkey) + .update_column(channel_peer::Column::Address) + .to_owned(), + ) + .exec(self.get_connection()), + )?; + + tracing::info!("persisted peer (pubkey: {pubkey}, addr: {address})"); + Ok(()) + } + + pub fn read_channel_peer_data(&self) -> Result, APIError> { + let results = block_on(ChannelPeer::find().all(self.get_connection()))?; + + let mut peer_data = HashMap::new(); + for record in results { + if let (Ok(pubkey), Ok(address)) = ( + PublicKey::from_str(&record.pubkey), + SocketAddr::from_str(&record.address), + ) { + peer_data.insert(pubkey, address); + } + } + + Ok(peer_data) + } + + pub fn save_mnemonic(&self, encrypted_mnemonic: String) -> Result<(), APIError> { + let now = crate::utils::get_current_timestamp() as i64; + + let row = config::ActiveModel { + idx: ActiveValue::Set(CONFIG_IDX), + encrypted_mnemonic: ActiveValue::Set(encrypted_mnemonic), + indexer_url: ActiveValue::NotSet, + bitcoin_network: ActiveValue::NotSet, + wallet_fingerprint: ActiveValue::NotSet, + wallet_account_xpub_vanilla: ActiveValue::NotSet, + wallet_account_xpub_colored: ActiveValue::NotSet, + wallet_master_fingerprint: ActiveValue::NotSet, + created_at: ActiveValue::Set(now), + updated_at: ActiveValue::Set(now), + }; + + block_on( + Config::insert(row) + .on_conflict( + OnConflict::column(config::Column::Idx) + .update_columns([ + config::Column::EncryptedMnemonic, + config::Column::UpdatedAt, + ]) + .to_owned(), + ) + .exec(self.get_connection()), + )?; + + Ok(()) + } + + fn update_config_field(&self, column: config::Column, value: &str) -> Result<(), APIError> { + let now = crate::utils::get_current_timestamp() as i64; + block_on( + Config::update_many() + .filter(config::Column::Idx.eq(CONFIG_IDX)) + .col_expr(column, value.into()) + .col_expr(config::Column::UpdatedAt, now.into()) + .exec(self.get_connection()), + )?; + Ok(()) + } + + pub fn set_indexer_url(&self, value: &str) -> Result<(), APIError> { + self.update_config_field(config::Column::IndexerUrl, value) + } + + pub fn set_bitcoin_network(&self, value: &str) -> Result<(), APIError> { + self.update_config_field(config::Column::BitcoinNetwork, value) + } + + pub fn set_wallet_fingerprint(&self, value: &str) -> Result<(), APIError> { + self.update_config_field(config::Column::WalletFingerprint, value) + } + + pub fn set_wallet_account_xpub_vanilla(&self, value: &str) -> Result<(), APIError> { + self.update_config_field(config::Column::WalletAccountXpubVanilla, value) + } + + pub fn set_wallet_account_xpub_colored(&self, value: &str) -> Result<(), APIError> { + self.update_config_field(config::Column::WalletAccountXpubColored, value) + } + + pub fn set_wallet_master_fingerprint(&self, value: &str) -> Result<(), APIError> { + self.update_config_field(config::Column::WalletMasterFingerprint, value) + } +} + +impl KVStore for SeaOrmKvStore { + fn read( + &self, + primary_namespace: &str, + secondary_namespace: &str, + key: &str, + ) -> AsyncResult<'static, Vec, io::Error> { + tracing::trace!(primary_namespace, secondary_namespace, key, "KVStore read"); + let conn = Arc::clone(&self.connection); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + Box::pin(async move { + let result = KvStore::find() + .filter(kv_store::Column::PrimaryNamespace.eq(&primary_namespace)) + .filter(kv_store::Column::SecondaryNamespace.eq(&secondary_namespace)) + .filter(kv_store::Column::Key.eq(&key)) + .one(conn.as_ref()) + .await + .map_err(|e| { + tracing::error!(primary_namespace, secondary_namespace, key, error = %e, "KVStore read failed"); + io::Error::new(io::ErrorKind::Other, format!("Database read failed: {e}")) + })?; + + match result { + Some(record) => Ok(record.value), + None => { + tracing::trace!( + primary_namespace, + secondary_namespace, + key, + "KVStore key not found" + ); + Err(io::Error::new(io::ErrorKind::NotFound, "Key not found")) + } + } + }) + } + + fn write( + &self, + primary_namespace: &str, + secondary_namespace: &str, + key: &str, + buf: Vec, + ) -> AsyncResult<'static, (), io::Error> { + tracing::trace!( + primary_namespace, + secondary_namespace, + key, + value_len = buf.len(), + "KVStore write" + ); + let conn = Arc::clone(&self.connection); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + Box::pin(async move { + let model = kv_store::ActiveModel { + primary_namespace: ActiveValue::Set(primary_namespace.clone()), + secondary_namespace: ActiveValue::Set(secondary_namespace.clone()), + key: ActiveValue::Set(key.clone()), + value: ActiveValue::Set(buf), + }; + + KvStore::insert(model) + .on_conflict( + OnConflict::columns([ + kv_store::Column::PrimaryNamespace, + kv_store::Column::SecondaryNamespace, + kv_store::Column::Key, + ]) + .update_column(kv_store::Column::Value) + .to_owned(), + ) + .exec(conn.as_ref()) + .await + .map_err(|e| { + tracing::error!(primary_namespace, secondary_namespace, key, error = %e, "KVStore write failed"); + io::Error::new(io::ErrorKind::Other, format!("Database write failed: {e}")) + })?; + + Ok(()) + }) + } + + fn remove( + &self, + primary_namespace: &str, + secondary_namespace: &str, + key: &str, + lazy: bool, + ) -> AsyncResult<'static, (), io::Error> { + tracing::trace!( + primary_namespace, + secondary_namespace, + key, + lazy, + "KVStore remove" + ); + let conn = Arc::clone(&self.connection); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + Box::pin(async move { + KvStore::delete_many() + .filter(kv_store::Column::PrimaryNamespace.eq(&primary_namespace)) + .filter(kv_store::Column::SecondaryNamespace.eq(&secondary_namespace)) + .filter(kv_store::Column::Key.eq(&key)) + .exec(conn.as_ref()) + .await + .map_err(|e| { + tracing::error!(primary_namespace, secondary_namespace, key, error = %e, "KVStore remove failed"); + io::Error::new(io::ErrorKind::Other, format!("Database delete failed: {e}")) + })?; + + Ok(()) + }) + } + + fn list( + &self, + primary_namespace: &str, + secondary_namespace: &str, + ) -> AsyncResult<'static, Vec, io::Error> { + tracing::trace!(primary_namespace, secondary_namespace, "KVStore list"); + let conn = Arc::clone(&self.connection); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + Box::pin(async move { + let results = KvStore::find() + .filter(kv_store::Column::PrimaryNamespace.eq(&primary_namespace)) + .filter(kv_store::Column::SecondaryNamespace.eq(&secondary_namespace)) + .all(conn.as_ref()) + .await + .map_err(|e| { + tracing::error!(primary_namespace, secondary_namespace, error = %e, "KVStore list failed"); + io::Error::new(io::ErrorKind::Other, format!("Database list failed: {e}")) + })?; + + Ok(results.into_iter().map(|r| r.key).collect()) + }) + } +} + +impl KVStoreSync for SeaOrmKvStore { + fn read( + &self, + primary_namespace: &str, + secondary_namespace: &str, + key: &str, + ) -> Result, io::Error> { + block_on(KVStore::read( + self, + primary_namespace, + secondary_namespace, + key, + )) + } + + fn write( + &self, + primary_namespace: &str, + secondary_namespace: &str, + key: &str, + buf: Vec, + ) -> Result<(), io::Error> { + block_on(KVStore::write( + self, + primary_namespace, + secondary_namespace, + key, + buf, + )) + } + + fn remove( + &self, + primary_namespace: &str, + secondary_namespace: &str, + key: &str, + lazy: bool, + ) -> Result<(), io::Error> { + block_on(KVStore::remove( + self, + primary_namespace, + secondary_namespace, + key, + lazy, + )) + } + + fn list( + &self, + primary_namespace: &str, + secondary_namespace: &str, + ) -> Result, io::Error> { + block_on(KVStore::list(self, primary_namespace, secondary_namespace)) + } +} diff --git a/src/ldk.rs b/src/ldk.rs index 0c2e51e3..be362c30 100644 --- a/src/ldk.rs +++ b/src/ldk.rs @@ -1,3 +1,4 @@ +use crate::kv_store::SeaOrmKvStore; use amplify::{map, s}; use bitcoin::blockdata::locktime::absolute::LockTime; use bitcoin::hex::DisplayHex; @@ -23,10 +24,11 @@ use lightning::onion_message::messenger::{ DefaultMessageRouter, OnionMessenger as LdkOnionMessenger, }; use lightning::rgb_utils::{ - get_rgb_channel_info_pending, is_channel_rgb, parse_rgb_payment_info, read_rgb_transfer_info, - update_rgb_channel_amount, write_rgb_channel_info, BITCOIN_NETWORK_FNAME, INDEXER_URL_FNAME, - STATIC_BLINDING, WALLET_ACCOUNT_XPUB_COLORED_FNAME, WALLET_ACCOUNT_XPUB_VANILLA_FNAME, - WALLET_FINGERPRINT_FNAME, WALLET_MASTER_FINGERPRINT_FNAME, + get_rgb_channel_info_pending, is_channel_rgb, update_rgb_channel_amount, RgbKvStoreExt, + RgbPaymentInfo, BITCOIN_NETWORK_FNAME, INDEXER_URL_FNAME, RGB_PAYMENT_INFO_INBOUND_NS, + RGB_PAYMENT_INFO_OUTBOUND_NS, RGB_PRIMARY_NS, STATIC_BLINDING, + WALLET_ACCOUNT_XPUB_COLORED_FNAME, WALLET_ACCOUNT_XPUB_VANILLA_FNAME, WALLET_FINGERPRINT_FNAME, + WALLET_MASTER_FINGERPRINT_FNAME, }; use lightning::routing::gossip; use lightning::routing::gossip::{NodeId, P2PGossipSync}; @@ -39,12 +41,14 @@ use lightning::sign::{ use lightning::types::payment::{PaymentHash, PaymentPreimage}; use lightning::util::config::UserConfig; use lightning::util::hash_tables::hash_map::Entry; -use lightning::util::hash_tables::HashMap as LdkHashMap; +use lightning::util::hash_tables::{new_hash_map, HashMap as LdkHashMap}; use lightning::util::persist::{ - KVStoreSync, MonitorUpdatingPersister, OUTPUT_SWEEPER_PERSISTENCE_KEY, - OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, + KVStoreSync, KVStoreSyncWrapper, MonitorUpdatingPersister, CHANNEL_MANAGER_PERSISTENCE_KEY, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_KEY, OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, }; -use lightning::util::ser::{ReadableArgs, Writeable}; +use lightning::util::ser::{Readable, ReadableArgs, Writeable}; use lightning::util::sweep as ldk_sweep; use lightning::{chain, impl_writeable_tlv_based}; use lightning_background_processor::{process_events_async, GossipSync, NO_LIQUIDITY_MANAGER}; @@ -56,7 +60,6 @@ use lightning_block_sync::UnboundedCache; use lightning_dns_resolver::OMDomainResolver; use lightning_invoice::PaymentSecret; use lightning_net_tokio::SocketDescriptor; -use lightning_persister::fs_store::FilesystemStore; use rand::RngCore; use rgb_lib::{ bdk_wallet::keys::{bip39::Mnemonic, DerivableKey, ExtendedKey}, @@ -80,10 +83,9 @@ use std::collections::HashMap; use std::convert::TryInto; use std::fs; use std::hash::{DefaultHasher, Hash, Hasher}; -use std::io::BufReader; use std::net::ToSocketAddrs; use std::net::{SocketAddr, TcpListener}; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, MutexGuard, RwLock}; @@ -94,10 +96,17 @@ use tokio::sync::watch::Sender; use tokio::task::JoinHandle; use crate::bitcoind::BitcoindClient; -use crate::disk::{ - self, FilesystemLogger, CHANNEL_IDS_FNAME, CHANNEL_PEER_DATA, INBOUND_PAYMENTS_FNAME, - MAKER_SWAPS_FNAME, OUTBOUND_PAYMENTS_FNAME, OUTPUT_SPENDER_TXES, TAKER_SWAPS_FNAME, -}; +use crate::disk::{self, FilesystemLogger}; + +const INBOUND_PAYMENTS_KEY: &str = "inbound_payments"; +const OUTBOUND_PAYMENTS_KEY: &str = "outbound_payments"; +const CHANNEL_IDS_KEY: &str = "channel_ids"; +const MAKER_SWAPS_KEY: &str = "maker_swaps"; +const TAKER_SWAPS_KEY: &str = "taker_swaps"; +const OUTPUT_SPENDER_TXES_KEY: &str = "output_spender_txes"; +const PSBT_NAMESPACE: &str = "psbt"; +const PENDING_FUNDING_NAMESPACE: &str = "pending_funding"; + use crate::error::APIError; use crate::rgb::{check_rgb_proxy_endpoint, get_rgb_channel_info_optional, RgbLibWalletWrapper}; use crate::routes::{HTLCStatus, SwapStatus, UnlockRequest, DUST_LIMIT_MSAT}; @@ -229,14 +238,14 @@ impl UnlockedAppState { } fn save_maker_swaps(&self, swaps: MutexGuard) { - self.fs_store - .write("", "", MAKER_SWAPS_FNAME, swaps.encode()) + self.kv_store + .write("", "", MAKER_SWAPS_KEY, swaps.encode()) .unwrap(); } fn save_taker_swaps(&self, swaps: MutexGuard) { - self.fs_store - .write("", "", TAKER_SWAPS_FNAME, swaps.encode()) + self.kv_store + .write("", "", TAKER_SWAPS_KEY, swaps.encode()) .unwrap(); } @@ -324,14 +333,14 @@ impl UnlockedAppState { } fn save_inbound_payments(&self, inbound: MutexGuard) { - self.fs_store - .write("", "", INBOUND_PAYMENTS_FNAME, inbound.encode()) + self.kv_store + .write("", "", INBOUND_PAYMENTS_KEY, inbound.encode()) .unwrap(); } fn save_outbound_payments(&self, outbound: MutexGuard) { - self.fs_store - .write("", "", OUTBOUND_PAYMENTS_FNAME, outbound.encode()) + self.kv_store + .write("", "", OUTBOUND_PAYMENTS_KEY, outbound.encode()) .unwrap(); } @@ -433,8 +442,8 @@ impl UnlockedAppState { } fn save_channel_ids_map(&self, channel_ids: MutexGuard) { - self.fs_store - .write("", "", CHANNEL_IDS_FNAME, channel_ids.encode()) + self.kv_store + .write("", "", CHANNEL_IDS_KEY, channel_ids.encode()) .unwrap(); } } @@ -447,7 +456,7 @@ pub(crate) type ChainMonitor = chainmonitor::ChainMonitor< Arc, Arc< MonitorUpdatingPersister< - Arc, + Arc, Arc, Arc, Arc, @@ -516,7 +525,7 @@ pub(crate) struct RgbOutputSpender { static_state: Arc, rgb_wallet_wrapper: Arc, keys_manager: Arc, - fs_store: Arc, + kv_store: Arc, txes: Arc>, proxy_endpoint: String, } @@ -526,35 +535,62 @@ pub(crate) type OutputSweeper = ldk_sweep::OutputSweeper< Arc, Arc, Arc, - Arc, + KVStoreSyncWrapper>, Arc, Arc, >; -fn find_and_update_rgb_chan_amt(ldk_data_dir: &Path, payment_hash: &PaymentHash, receiver: bool) { +fn find_and_update_rgb_chan_amt( + payment_hash: &PaymentHash, + receiver: bool, + kv_store: &Arc, +) { let payment_hash_str = hex_str(&payment_hash.0); - for entry in fs::read_dir(ldk_data_dir).unwrap() { - let file = entry.unwrap(); - let file_name = file.file_name(); - let file_name_str = file_name.to_string_lossy(); - let mut file_path_no_ext = file.path().clone(); - file_path_no_ext.set_extension(""); - let file_name_str_no_ext = file_path_no_ext.file_name().unwrap().to_string_lossy(); - if file_name_str.contains(&payment_hash_str) && file_name_str_no_ext != payment_hash_str { - let rgb_payment_info = parse_rgb_payment_info(&file.path()); - let channel_id_str = file_name_str_no_ext.replace(&payment_hash_str, ""); - - if rgb_payment_info.swap_payment && receiver != rgb_payment_info.inbound { - continue; - } - let (offered, received) = if receiver { - (0, rgb_payment_info.amount) - } else { - (rgb_payment_info.amount, 0) - }; - update_rgb_channel_amount(&channel_id_str, offered, received, ldk_data_dir, false); - break; + for inbound in [true, false] { + let namespace = if inbound { + RGB_PAYMENT_INFO_INBOUND_NS + } else { + RGB_PAYMENT_INFO_OUTBOUND_NS + }; + + if let Ok(keys) = kv_store.list(RGB_PRIMARY_NS, namespace) { + for key in keys { + // proxy keys have format `{channel_id}{payment_hash}`; bare payment_hash is skipped + if key.contains(&payment_hash_str) && key != payment_hash_str { + if let Ok(data) = kv_store.read(RGB_PRIMARY_NS, namespace, &key) { + let rgb_payment_info: RgbPaymentInfo = match bincode::deserialize(&data) { + Ok(info) => info, + Err(e) => { + tracing::warn!("failed to parse payment info for key {key}: {e}"); + continue; + } + }; + + let channel_id_str = key.replace(&payment_hash_str, ""); + + if rgb_payment_info.swap_payment && receiver != rgb_payment_info.inbound { + continue; + } + + let (offered, received) = if receiver { + (0, rgb_payment_info.amount) + } else { + (rgb_payment_info.amount, 0) + }; + update_rgb_channel_amount( + &channel_id_str, + offered, + received, + false, + kv_store.as_ref(), + ); + return; + } + } + } + } else { + tracing::warn!("failed to list keys in namespace {namespace}"); } } } @@ -592,17 +628,13 @@ fn handle_funding_prepare_err( } } -async fn handle_open_chan_fail( - channel_id: &ChannelId, - static_state: &StaticState, - unlocked_state: Arc, -) { +async fn handle_open_chan_fail(channel_id: &ChannelId, unlocked_state: Arc) { tracing::info!("Handling open channel failure for channel {channel_id}"); - let pending_funding_path = static_state - .ldk_data_dir - .join(format!("pending_funding_{}", channel_id.0.as_hex())); - if let Some((rgb_info, _)) = - get_rgb_channel_info_optional(channel_id, &PathBuf::from(&static_state.ldk_data_dir), true) + let channel_id_str = channel_id.0.as_hex().to_string(); + let kv_store_dyn: Arc = + Arc::clone(&unlocked_state.kv_store) as Arc; + if let Some(rgb_info) = + get_rgb_channel_info_optional(channel_id, true, unlocked_state.kv_store.as_ref()) { if let Some(batch_transfer_idx) = rgb_info.batch_transfer_idx { let unlocked_state_copy = unlocked_state.clone(); @@ -634,35 +666,43 @@ async fn handle_open_chan_fail( } } } - } else if pending_funding_path.exists() { - let funding_txid = fs::read_to_string(&pending_funding_path).unwrap(); - let unlocked_state_copy = unlocked_state.clone(); - let txid_copy = funding_txid.clone(); - let result = tokio::task::spawn_blocking(move || { - unlocked_state_copy.rgb_abort_pending_vanilla_tx(txid_copy) - }) - .await - .unwrap(); - match result { - Ok(()) => { - tracing::info!( - "Aborted pending vanilla tx {} for channel {}", - funding_txid, - channel_id - ); - } - Err(e) => { - tracing::error!( - "Error aborting pending vanilla tx {} for channel {}: {:?}", - funding_txid, - channel_id, - e - ); + } else { + match kv_store_dyn.read(PENDING_FUNDING_NAMESPACE, "", &channel_id_str) { + Ok(bytes) => { + let funding_txid = String::from_utf8(bytes).unwrap(); + let unlocked_state_copy = unlocked_state.clone(); + let txid_copy = funding_txid.clone(); + let result = tokio::task::spawn_blocking(move || { + unlocked_state_copy.rgb_abort_pending_vanilla_tx(txid_copy) + }) + .await + .unwrap(); + match result { + Ok(()) => { + tracing::info!( + "Aborted pending vanilla tx {} for channel {}", + funding_txid, + channel_id + ); + } + Err(e) => { + tracing::error!( + "Error aborting pending vanilla tx {} for channel {}: {:?}", + funding_txid, + channel_id, + e + ); + } + } } + Err(e) if e.kind() == io::ErrorKind::NotFound => {} + Err(e) => panic!("Failed to read pending funding entry: {e}"), } } - if pending_funding_path.exists() { - fs::remove_file(&pending_funding_path).unwrap(); + match kv_store_dyn.remove(PENDING_FUNDING_NAMESPACE, "", &channel_id_str, false) { + Ok(()) => {} + Err(e) if e.kind() == io::ErrorKind::NotFound => {} + Err(e) => panic!("Failed to remove pending funding entry: {e}"), } unlocked_state.delete_channel_id(*channel_id); } @@ -696,14 +736,12 @@ async fn handle_ldk_events( .expect("Lightning funding tx should always be to a SegWit output"); let script_buf = ScriptBuf::from_bytes(addr.to_scriptpubkey()); - let is_colored = is_channel_rgb( - &temporary_channel_id, - &PathBuf::from(&static_state.ldk_data_dir), - ); + let is_colored = + is_channel_rgb(&temporary_channel_id, unlocked_state.kv_store.as_ref()); let (unsigned_psbt, asset_id) = if is_colored { - let (rgb_info, _) = get_rgb_channel_info_pending( + let rgb_info = get_rgb_channel_info_pending( &temporary_channel_id, - &PathBuf::from(&static_state.ldk_data_dir), + unlocked_state.kv_store.as_ref(), ); let channel_rgb_amount = rgb_info.local_rgb_amount + rgb_info.remote_rgb_amount; @@ -757,13 +795,17 @@ async fn handle_ldk_events( ); } Ok((unsigned_psbt, batch_transfer_idx)) => { - if let Some((mut rgb_info, info_path)) = get_rgb_channel_info_optional( + if let Some(mut rgb_info) = get_rgb_channel_info_optional( &temporary_channel_id, - &PathBuf::from(&static_state.ldk_data_dir), true, + unlocked_state.kv_store.as_ref(), ) { rgb_info.batch_transfer_idx = batch_transfer_idx; - write_rgb_channel_info(&info_path, &rgb_info); + unlocked_state.kv_store.write_rgb_channel_info( + &temporary_channel_id.0.as_hex().to_string(), + &rgb_info, + true, + ); } (unsigned_psbt, Some(asset_id)) } @@ -814,15 +856,25 @@ async fn handle_ldk_events( bitcoin::hashes::Hash::as_byte_array(&funding_txid), funding_output_index, ); - let pending_funding_path = static_state - .ldk_data_dir - .join(format!("pending_funding_{}", final_channel_id.0.as_hex())); - fs::write(&pending_funding_path, &funding_txid_str).unwrap(); + unlocked_state + .kv_store + .write( + PENDING_FUNDING_NAMESPACE, + "", + &final_channel_id.0.as_hex().to_string(), + funding_txid_str.as_bytes().to_vec(), + ) + .unwrap(); - let psbt_path = static_state - .ldk_data_dir - .join(format!("psbt_{funding_txid_str}")); - fs::write(psbt_path, psbt.to_string()).unwrap(); + unlocked_state + .kv_store + .write( + PSBT_NAMESPACE, + "", + &funding_txid_str, + psbt.to_string().into_bytes(), + ) + .unwrap(); if let Some(asset_id) = asset_id { let unlocked_state_copy = unlocked_state.clone(); @@ -875,8 +927,7 @@ async fn handle_ldk_events( { tracing::error!( "ERROR: Channel went away before we could fund it. The peer disconnected or refused the channel."); - handle_open_chan_fail(&final_channel_id, &static_state, unlocked_state.clone()) - .await; + handle_open_chan_fail(&final_channel_id, unlocked_state.clone()).await; } } Event::FundingTxBroadcastSafe { .. } => { @@ -968,7 +1019,9 @@ async fn handle_ldk_events( } } - find_and_update_rgb_chan_amt(&static_state.ldk_data_dir, &payment_hash, true); + let kv_store_dyn: Arc = + Arc::clone(&unlocked_state.kv_store) as Arc; + find_and_update_rgb_chan_amt(&payment_hash, true, &kv_store_dyn); if is_maker_swap { unlocked_state.update_maker_swap_status(&payment_hash, SwapStatus::Succeeded); } else { @@ -989,7 +1042,9 @@ async fn handle_ldk_events( payment_id, .. } => { - find_and_update_rgb_chan_amt(&static_state.ldk_data_dir, &payment_hash, false); + let kv_store_dyn: Arc = + Arc::clone(&unlocked_state.kv_store) as Arc; + find_and_update_rgb_chan_amt(&payment_hash, false, &kv_store_dyn); if unlocked_state.is_maker_swap(&payment_hash) { tracing::info!( @@ -1128,8 +1183,8 @@ async fn handle_ldk_events( &next_channel_id_str, outbound_amount_forwarded_rgb, 0, - &static_state.ldk_data_dir, false, + unlocked_state.kv_store.as_ref(), ); } if let Some(inbound_amount_forwarded_rgb) = inbound_amount_forwarded_rgb { @@ -1137,8 +1192,8 @@ async fn handle_ldk_events( &prev_channel_id_str, 0, inbound_amount_forwarded_rgb, - &static_state.ldk_data_dir, false, + unlocked_state.kv_store.as_ref(), ); } @@ -1241,55 +1296,65 @@ async fn handle_ldk_events( unlocked_state.add_channel_id(former_temporary_channel_id.unwrap(), channel_id); let funding_txid = funding_txo.txid.to_string(); - let psbt_path = static_state - .ldk_data_dir - .join(format!("psbt_{funding_txid}")); - if psbt_path.exists() { - let psbt_str = fs::read_to_string(psbt_path).unwrap(); + // stored PSBT means we are the initiator + match unlocked_state + .kv_store + .read(PSBT_NAMESPACE, "", &funding_txid) + { + Ok(psbt_bytes) => { + let psbt_str = String::from_utf8(psbt_bytes).unwrap(); - let state_copy = unlocked_state.clone(); - let psbt_str_copy = psbt_str.clone(); + let state_copy = unlocked_state.clone(); + let psbt_str_copy = psbt_str.clone(); - let is_chan_colored = - is_channel_rgb(&channel_id, &PathBuf::from(&static_state.ldk_data_dir)); - tracing::info!("Initiator of the channel (colored: {})", is_chan_colored); + let is_chan_colored = + is_channel_rgb(&channel_id, unlocked_state.kv_store.as_ref()); + tracing::info!("Initiator of the channel (colored: {})", is_chan_colored); - let _txid = tokio::task::spawn_blocking(move || { - if is_chan_colored { - state_copy.rgb_send_end(psbt_str_copy).map(|r| r.txid) - } else { - state_copy.rgb_send_btc_end(psbt_str_copy) - } - }) - .await - .unwrap() - .map_err(|e| { - tracing::error!("Error completing channel opening: {e:?}"); - ReplayEvent() - })?; - - let pending_funding_path = static_state - .ldk_data_dir - .join(format!("pending_funding_{}", channel_id.0.as_hex())); - fs::remove_file(&pending_funding_path).unwrap(); - } else { - // acceptor - let consignment_path = static_state - .ldk_data_dir - .join(format!("consignment_{funding_txid}")); - if !consignment_path.exists() { - // vanilla channel - return Ok(()); + let _txid = tokio::task::spawn_blocking(move || { + if is_chan_colored { + state_copy.rgb_send_end(psbt_str_copy).map(|r| r.txid) + } else { + state_copy.rgb_send_btc_end(psbt_str_copy) + } + }) + .await + .unwrap() + .map_err(|e| { + tracing::error!("Error completing channel opening: {e:?}"); + ReplayEvent() + })?; + + unlocked_state + .kv_store + .remove( + PENDING_FUNDING_NAMESPACE, + "", + &channel_id.0.as_hex().to_string(), + false, + ) + .unwrap(); } - let consignment = - RgbTransfer::load_file(consignment_path).expect("successful consignment load"); + Err(e) if e.kind() == io::ErrorKind::NotFound => { + // acceptor + let consignment_path = static_state + .ldk_data_dir + .join(format!("consignment_{funding_txid}")); + if !consignment_path.exists() { + // vanilla channel + return Ok(()); + } + let consignment = RgbTransfer::load_file(consignment_path) + .expect("successful consignment load"); - match unlocked_state.rgb_save_new_asset(consignment, funding_txid) { - Ok(_) => {} - Err(e) if e.to_string().contains("UNIQUE constraint failed") => {} - Err(e) => panic!("Failed saving asset: {e}"), + match unlocked_state.rgb_save_new_asset(consignment, funding_txid) { + Ok(_) => {} + Err(e) if e.to_string().contains("UNIQUE constraint failed") => {} + Err(e) => panic!("Failed saving asset: {e}"), + } } + Err(e) => panic!("Failed to read PSBT from KVStore: {e}"), } } Event::ChannelReady { @@ -1332,7 +1397,7 @@ async fn handle_ldk_events( // the ChannelClosed event gets fired also after node crashes/restarts, so it's better // to handle the failure here (regardless what the DiscardFunding event documents) - handle_open_chan_fail(&channel_id, &static_state, unlocked_state.clone()).await; + handle_open_chan_fail(&channel_id, unlocked_state.clone()).await; } Event::DiscardFunding { channel_id, .. } => { tracing::info!( @@ -1343,7 +1408,7 @@ async fn handle_ldk_events( // this will probably do nothing, since the ChannelClosed event will be triggered // before, but in case of splicing this should be the correct place to handle the // failure - handle_open_chan_fail(&channel_id, &static_state, unlocked_state.clone()).await; + handle_open_chan_fail(&channel_id, unlocked_state.clone()).await; } Event::HTLCIntercepted { is_swap, @@ -1366,18 +1431,14 @@ async fn handle_ldk_events( } let get_rgb_info = |channel_id| { - get_rgb_channel_info_optional( - channel_id, - &PathBuf::from(&static_state.ldk_data_dir), - true, - ) - .map(|(rgb_info, _)| { - ( - rgb_info.contract_id, - rgb_info.local_rgb_amount, - rgb_info.remote_rgb_amount, - ) - }) + get_rgb_channel_info_optional(channel_id, true, unlocked_state.kv_store.as_ref()) + .map(|rgb_info| { + ( + rgb_info.contract_id, + rgb_info.local_rgb_amount, + rgb_info.remote_rgb_amount, + ) + }) }; let inbound_channel = unlocked_state @@ -1553,14 +1614,18 @@ impl OutputSpender for RgbOutputSpender { let txid = outpoint.txid; let txid_str = txid.to_string(); - let transfer_info_path = self - .static_state - .ldk_data_dir - .join(format!("{txid_str}_transfer_info")); - if !transfer_info_path.exists() { + let transfer_info_exists = self + .kv_store + .read( + RGB_PRIMARY_NS, + lightning::rgb_utils::RGB_TRANSFER_INFO_NS, + &txid_str, + ) + .is_ok(); + if !transfer_info_exists { continue; - }; - let transfer_info = read_rgb_transfer_info(&transfer_info_path); + } + let transfer_info = self.kv_store.read_rgb_transfer_info(&txid_str); if transfer_info.rgb_amount == 0 { continue; } @@ -1723,8 +1788,8 @@ impl OutputSpender for RgbOutputSpender { } txes.insert(descriptors_hash, spending_tx.clone()); - self.fs_store - .write("", "", OUTPUT_SPENDER_TXES, txes.encode()) + self.kv_store + .write("", "", OUTPUT_SPENDER_TXES_KEY, txes.encode()) .unwrap(); Ok(spending_tx) @@ -1814,13 +1879,16 @@ pub(crate) async fn start_ldk( BitcoinNetwork::Regtest => PROXY_ENDPOINT_LOCAL, } }; - let storage_dir_path = app_state.static_state.storage_dir_path.clone(); - fs::write(storage_dir_path.join(INDEXER_URL_FNAME), indexer_url).expect("able to write"); - fs::write( - storage_dir_path.join(BITCOIN_NETWORK_FNAME), - bitcoin_network.to_string(), - ) - .expect("able to write"); + let kv_store = Arc::new(SeaOrmKvStore::from_connection(Arc::clone( + &static_state.database, + ))); + let kv_store_dyn: Arc = + Arc::clone(&kv_store) as Arc; + kv_store.set_indexer_url(indexer_url)?; + kv_store.write_config(INDEXER_URL_FNAME, indexer_url); + let bitcoin_network_str = bitcoin_network.to_string(); + kv_store.set_bitcoin_network(&bitcoin_network_str)?; + kv_store.write_config(BITCOIN_NETWORK_FNAME, &bitcoin_network_str); // Initialize the FeeEstimator // BitcoindClient implements the FeeEstimator trait, so it'll act as our fee estimator. @@ -1848,18 +1916,18 @@ pub(crate) async fn start_ldk( let cur = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap(); + let keys_manager = Arc::new(KeysManager::new( &ldk_seed, cur.as_secs(), cur.subsec_nanos(), true, ldk_data_dir_path.clone(), + kv_store_dyn.clone(), )); - // Initialize Persistence - let fs_store = Arc::new(FilesystemStore::new(ldk_data_dir.clone())); let persister = Arc::new(MonitorUpdatingPersister::new( - Arc::clone(&fs_store), + Arc::clone(&kv_store), Arc::clone(&logger), 1000, Arc::clone(&keys_manager), @@ -1927,52 +1995,64 @@ pub(crate) async fn start_ldk( user_config.manually_accept_inbound_channels = true; let mut restarting_node = true; let (channel_manager_blockhash, channel_manager) = { - if let Ok(f) = fs::File::open(ldk_data_dir.join("manager")) { - let mut channel_monitor_references = Vec::new(); - for (_, channel_monitor) in channelmonitors.iter() { - channel_monitor_references.push(channel_monitor); + match kv_store.read( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ) { + Ok(bytes) => { + let mut channel_monitor_references = Vec::new(); + for (_, channel_monitor) in channelmonitors.iter() { + channel_monitor_references.push(channel_monitor); + } + let read_args = ChannelManagerReadArgs::new( + keys_manager.clone(), + keys_manager.clone(), + keys_manager.clone(), + fee_estimator.clone(), + chain_monitor.clone(), + broadcaster.clone(), + router.clone(), + Arc::clone(&message_router), + logger.clone(), + user_config, + channel_monitor_references, + ldk_data_dir_path.clone(), + Arc::clone(&kv_store) as Arc, + ); + <(BlockHash, ChannelManager)>::read(&mut &bytes[..], read_args).unwrap() + } + Err(e) if e.kind() == io::ErrorKind::NotFound => { + // We're starting a fresh node. + restarting_node = false; + + let polled_best_block = polled_chain_tip.to_best_block(); + let polled_best_block_hash = polled_best_block.block_hash; + let chain_params = ChainParameters { + network, + best_block: polled_best_block, + }; + let fresh_channel_manager = channelmanager::ChannelManager::new( + fee_estimator.clone(), + chain_monitor.clone(), + broadcaster.clone(), + router.clone(), + Arc::clone(&message_router), + logger.clone(), + keys_manager.clone(), + keys_manager.clone(), + keys_manager.clone(), + user_config, + chain_params, + cur.as_secs() as u32, + ldk_data_dir_path.clone(), + Arc::clone(&kv_store) as Arc, + ); + (polled_best_block_hash, fresh_channel_manager) + } + Err(e) => { + panic!("Failed to read channel manager from KVStore: {e}"); } - let read_args = ChannelManagerReadArgs::new( - keys_manager.clone(), - keys_manager.clone(), - keys_manager.clone(), - fee_estimator.clone(), - chain_monitor.clone(), - broadcaster.clone(), - router.clone(), - Arc::clone(&message_router), - logger.clone(), - user_config, - channel_monitor_references, - ldk_data_dir_path.clone(), - ); - <(BlockHash, ChannelManager)>::read(&mut BufReader::new(f), read_args).unwrap() - } else { - // We're starting a fresh node. - restarting_node = false; - - let polled_best_block = polled_chain_tip.to_best_block(); - let polled_best_block_hash = polled_best_block.block_hash; - let chain_params = ChainParameters { - network, - best_block: polled_best_block, - }; - let fresh_channel_manager = channelmanager::ChannelManager::new( - fee_estimator.clone(), - chain_monitor.clone(), - broadcaster.clone(), - router.clone(), - Arc::clone(&message_router), - logger.clone(), - keys_manager.clone(), - keys_manager.clone(), - keys_manager.clone(), - user_config, - chain_params, - cur.as_secs() as u32, - ldk_data_dir_path.clone(), - ); - (polled_best_block_hash, fresh_channel_manager) } }; @@ -2021,32 +2101,18 @@ pub(crate) async fn start_ldk( skip_consistency_check: false, vanilla_sync_lookback: VANILLA_SYNC_LOOKBACK, })?; - fs::write( - static_state.storage_dir_path.join(WALLET_FINGERPRINT_FNAME), - account_xpub_colored.fingerprint().to_string(), - ) - .expect("able to write"); - fs::write( - static_state - .storage_dir_path - .join(WALLET_ACCOUNT_XPUB_COLORED_FNAME), - account_xpub_colored.to_string(), - ) - .expect("able to write"); - fs::write( - static_state - .storage_dir_path - .join(WALLET_ACCOUNT_XPUB_VANILLA_FNAME), - account_xpub_vanilla.to_string(), - ) - .expect("able to write"); - fs::write( - static_state - .storage_dir_path - .join(WALLET_MASTER_FINGERPRINT_FNAME), - master_fingerprint.to_string(), - ) - .expect("able to write"); + let fingerprint = account_xpub_colored.fingerprint().to_string(); + let xpub_colored = account_xpub_colored.to_string(); + let xpub_vanilla = account_xpub_vanilla.to_string(); + let master_fingerprint_str = master_fingerprint.to_string(); + kv_store.set_wallet_fingerprint(&fingerprint)?; + kv_store.write_config(WALLET_FINGERPRINT_FNAME, &fingerprint); + kv_store.set_wallet_account_xpub_colored(&xpub_colored)?; + kv_store.write_config(WALLET_ACCOUNT_XPUB_COLORED_FNAME, &xpub_colored); + kv_store.set_wallet_account_xpub_vanilla(&xpub_vanilla)?; + kv_store.write_config(WALLET_ACCOUNT_XPUB_VANILLA_FNAME, &xpub_vanilla); + kv_store.set_wallet_master_fingerprint(&master_fingerprint_str)?; + kv_store.write_config(WALLET_MASTER_FINGERPRINT_FNAME, &master_fingerprint_str); let rgb_wallet_wrapper = Arc::new(RgbLibWalletWrapper::new( Arc::new(Mutex::new(rgb_wallet)), @@ -2054,18 +2120,21 @@ pub(crate) async fn start_ldk( )); // Initialize the OutputSweeper. - let txes = Arc::new(Mutex::new(disk::read_output_spender_txes( - &ldk_data_dir.join(OUTPUT_SPENDER_TXES), - ))); + let txes: OutputSpenderTxes = match kv_store.read("", "", OUTPUT_SPENDER_TXES_KEY) { + Ok(bytes) => OutputSpenderTxes::read(&mut &bytes[..]).unwrap_or_else(|_| new_hash_map()), + Err(e) if e.kind() == io::ErrorKind::NotFound => new_hash_map(), + Err(e) => panic!("Failed to read output spender txes from KVStore: {e}"), + }; + let txes = Arc::new(Mutex::new(txes)); let rgb_output_spender = Arc::new(RgbOutputSpender { static_state: static_state.clone(), rgb_wallet_wrapper: rgb_wallet_wrapper.clone(), keys_manager: keys_manager.clone(), - fs_store: fs_store.clone(), + kv_store: kv_store.clone(), txes, proxy_endpoint: proxy_endpoint.to_string(), }); - let (sweeper_best_block, output_sweeper) = match fs_store.read( + let (sweeper_best_block, output_sweeper) = match kv_store.read( OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_KEY, @@ -2078,7 +2147,7 @@ pub(crate) async fn start_ldk( None, rgb_output_spender, rgb_wallet_wrapper.clone(), - fs_store.clone(), + KVStoreSyncWrapper(kv_store.clone()), logger.clone(), ); (channel_manager.current_best_block(), sweeper) @@ -2090,7 +2159,7 @@ pub(crate) async fn start_ldk( None, rgb_output_spender.clone(), rgb_wallet_wrapper.clone(), - fs_store.clone(), + KVStoreSyncWrapper(kv_store.clone()), logger.clone(), ); let mut reader = io::Cursor::new(&mut bytes); @@ -2285,12 +2354,32 @@ pub(crate) async fn start_ldk( } }); - let inbound_payments = Arc::new(Mutex::new(disk::read_inbound_payment_info( - &ldk_data_dir.join(INBOUND_PAYMENTS_FNAME), - ))); - let outbound_payments = Arc::new(Mutex::new(disk::read_outbound_payment_info( - &ldk_data_dir.join(OUTBOUND_PAYMENTS_FNAME), - ))); + let inbound_payments = Arc::new(Mutex::new({ + match kv_store.read("", "", INBOUND_PAYMENTS_KEY) { + Ok(bytes) => InboundPaymentInfoStorage::read(&mut &bytes[..]).unwrap_or_else(|_| { + InboundPaymentInfoStorage { + payments: new_hash_map(), + } + }), + Err(e) if e.kind() == io::ErrorKind::NotFound => InboundPaymentInfoStorage { + payments: new_hash_map(), + }, + Err(e) => panic!("Failed to read inbound payments from KVStore: {e}"), + } + })); + let outbound_payments = Arc::new(Mutex::new({ + match kv_store.read("", "", OUTBOUND_PAYMENTS_KEY) { + Ok(bytes) => OutboundPaymentInfoStorage::read(&mut &bytes[..]).unwrap_or_else(|_| { + OutboundPaymentInfoStorage { + payments: new_hash_map(), + } + }), + Err(e) if e.kind() == io::ErrorKind::NotFound => OutboundPaymentInfoStorage { + payments: new_hash_map(), + }, + Err(e) => panic!("Failed to read outbound payments from KVStore: {e}"), + } + })); let bump_tx_event_handler = Arc::new(BumpTransactionEventHandler::new( Arc::clone(&broadcaster), @@ -2300,20 +2389,42 @@ pub(crate) async fn start_ldk( )); // Persist ChannelManager and NetworkGraph - let persister = Arc::new(FilesystemStore::new(ldk_data_dir_path.clone())); - - // Read swaps info - let maker_swaps = Arc::new(Mutex::new(disk::read_swaps_info( - &ldk_data_dir.join(MAKER_SWAPS_FNAME), - ))); - let taker_swaps = Arc::new(Mutex::new(disk::read_swaps_info( - &ldk_data_dir.join(TAKER_SWAPS_FNAME), - ))); - - // Read channel IDs info - let channel_ids_map = Arc::new(Mutex::new(disk::read_channel_ids_info( - &ldk_data_dir.join(CHANNEL_IDS_FNAME), - ))); + let persister = KVStoreSyncWrapper(Arc::clone(&kv_store)); + + let maker_swaps = Arc::new(Mutex::new({ + match kv_store.read("", "", MAKER_SWAPS_KEY) { + Ok(bytes) => SwapMap::read(&mut &bytes[..]).unwrap_or_else(|_| SwapMap { + swaps: new_hash_map(), + }), + Err(e) if e.kind() == io::ErrorKind::NotFound => SwapMap { + swaps: new_hash_map(), + }, + Err(e) => panic!("Failed to read maker swaps from KVStore: {e}"), + } + })); + let taker_swaps = Arc::new(Mutex::new({ + match kv_store.read("", "", TAKER_SWAPS_KEY) { + Ok(bytes) => SwapMap::read(&mut &bytes[..]).unwrap_or_else(|_| SwapMap { + swaps: new_hash_map(), + }), + Err(e) if e.kind() == io::ErrorKind::NotFound => SwapMap { + swaps: new_hash_map(), + }, + Err(e) => panic!("Failed to read taker swaps from KVStore: {e}"), + } + })); + + let channel_ids_map = Arc::new(Mutex::new({ + match kv_store.read("", "", CHANNEL_IDS_KEY) { + Ok(bytes) => ChannelIdsMap::read(&mut &bytes[..]).unwrap_or_else(|_| ChannelIdsMap { + channel_ids: new_hash_map(), + }), + Err(e) if e.kind() == io::ErrorKind::NotFound => ChannelIdsMap { + channel_ids: new_hash_map(), + }, + Err(e) => panic!("Failed to read channel IDs from KVStore: {e}"), + } + })); let unlocked_state = Arc::new(UnlockedAppState { channel_manager: Arc::clone(&channel_manager), @@ -2324,7 +2435,7 @@ pub(crate) async fn start_ldk( onion_messenger: onion_messenger.clone(), outbound_payments, peer_manager: Arc::clone(&peer_manager), - fs_store: Arc::clone(&fs_store), + kv_store: Arc::clone(&kv_store), bump_tx_event_handler, rgb_wallet_wrapper, maker_swaps, @@ -2392,14 +2503,15 @@ pub(crate) async fn start_ldk( // Regularly reconnect to channel peers. let connect_cm = Arc::clone(&channel_manager); let connect_pm = Arc::clone(&peer_manager); - let peer_data_path = ldk_data_dir.join(CHANNEL_PEER_DATA); + let connect_db = Arc::clone(&static_state.database); let stop_connect = Arc::clone(&stop_processing); tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(1)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); loop { interval.tick().await; - match disk::read_channel_peer_data(&peer_data_path) { + let db = SeaOrmKvStore::from_connection(Arc::clone(&connect_db)); + match db.read_channel_peer_data() { Ok(info) => { for node_id in connect_cm .list_channels() @@ -2420,7 +2532,7 @@ pub(crate) async fn start_ldk( } } Err(e) => tracing::error!( - "ERROR: errored reading channel peer info from disk: {:?}", + "ERROR: errored reading channel peer info from database: {:?}", e ), } diff --git a/src/main.rs b/src/main.rs index 4fcc5bbd..fc79ebbc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,8 +2,10 @@ mod args; mod auth; mod backup; mod bitcoind; +mod database; mod disk; mod error; +mod kv_store; mod ldk; mod rgb; mod routes; diff --git a/src/rgb.rs b/src/rgb.rs index 86d0b927..7f65b6fe 100644 --- a/src/rgb.rs +++ b/src/rgb.rs @@ -7,11 +7,10 @@ use bitcoin::{Address, Network, OutPoint, Transaction, TxOut, WPubkeyHash}; use hex::DisplayHex; use lightning::events::bump_transaction::{Utxo, WalletSource}; use lightning::ln::types::ChannelId; -use lightning::rgb_utils::{ - get_rgb_channel_info_path, is_channel_rgb, parse_rgb_channel_info, RgbInfo, -}; +use lightning::rgb_utils::{RgbInfo, RgbKvStoreExt}; use lightning::sign::ChangeDestinationSource; use lightning::util::async_poll::AsyncResult; +use lightning::util::persist::KVStoreSync; use rgb_lib::{ bdk_wallet::SignOptions, bitcoin::psbt::Psbt as BitcoinPsbt, @@ -836,14 +835,11 @@ pub(crate) async fn check_rgb_proxy_endpoint(proxy_endpoint: &str) -> Result<(), pub(crate) fn get_rgb_channel_info_optional( channel_id: &ChannelId, - ldk_data_dir: &Path, pending: bool, -) -> Option<(RgbInfo, PathBuf)> { - if !is_channel_rgb(channel_id, ldk_data_dir) { - return None; - } - let info_file_path = - get_rgb_channel_info_path(&channel_id.0.as_hex().to_string(), ldk_data_dir, pending); - let rgb_info = parse_rgb_channel_info(&info_file_path); - Some((rgb_info, info_file_path)) + kv_store: &dyn KVStoreSync, +) -> Option { + let channel_id_str = channel_id.0.as_hex().to_string(); + kv_store + .read_rgb_channel_info(&channel_id_str, pending) + .ok() } diff --git a/src/routes.rs b/src/routes.rs index ae6c8ed0..9932ff4b 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -13,14 +13,12 @@ use hex::DisplayHex; use lightning::ln::{channelmanager::OptionalOfferPaymentParams, types::ChannelId}; use lightning::offers::offer::{self, Offer}; use lightning::onion_message::messenger::Destination; -use lightning::rgb_utils::{ - get_rgb_channel_info_path, get_rgb_payment_info_path, parse_rgb_channel_info, - parse_rgb_payment_info, -}; +use lightning::rgb_utils::RgbKvStoreExt; use lightning::routing::gossip::RoutingFees; use lightning::routing::router::{Path as LnPath, Route, RouteHint, RouteHintHop}; use lightning::sign::EntropySource; use lightning::util::config::ChannelConfig; +use lightning::util::persist::KVStoreSync; use lightning::{chain::channelmonitor::Balance, impl_writeable_tlv_based_enum}; use lightning::{ ln::channel_state::ChannelShutdownState, onion_message::messenger::MessageSendInstructions, @@ -32,7 +30,7 @@ use lightning::{ }; use lightning::{ ln::channelmanager::{PaymentId, RecipientOnionFields, Retry}, - rgb_utils::{write_rgb_channel_info, write_rgb_payment_info_file, RgbInfo}, + rgb_utils::{write_rgb_payment_info, RgbInfo}, routing::{ gossip::NodeId, router::{PaymentParameters, RouteParameters}, @@ -63,12 +61,7 @@ use rgb_lib::{ }; use serde::{Deserialize, Serialize}; use std::{ - collections::HashMap, - net::ToSocketAddrs, - path::{Path, PathBuf}, - str::FromStr, - sync::Arc, - time::Duration, + collections::HashMap, net::ToSocketAddrs, path::Path, str::FromStr, sync::Arc, time::Duration, }; use tokio::{ fs::File, @@ -80,7 +73,7 @@ use crate::ldk::{start_ldk, stop_ldk, LdkBackgroundServices, MIN_CHANNEL_CONFIRM use crate::swap::{SwapData, SwapInfo, SwapString}; use crate::utils::{ check_already_initialized, check_channel_id, check_password_strength, check_password_validity, - encrypt_and_save_mnemonic, get_max_local_rgb_amount, get_mnemonic_path, get_route, hex_str, + encrypt_and_save_mnemonic, get_max_local_rgb_amount, get_route, hex_str, hex_str_to_compressed_pubkey, hex_str_to_vec, UnlockedAppState, UserOnionMessageContents, }; use crate::{ @@ -88,7 +81,6 @@ use crate::{ rgb::{check_rgb_proxy_endpoint, get_rgb_channel_info_optional}, }; use crate::{ - disk::{self, CHANNEL_PEER_DATA}, error::APIError, ldk::{PaymentInfo, UTXO_SIZE_SAT}, utils::{ @@ -1425,15 +1417,16 @@ pub(crate) async fn asset_balance( let mut offchain_outbound = 0; let mut offchain_inbound = 0; for chan_info in unlocked_state.channel_manager.list_channels() { - let info_file_path = get_rgb_channel_info_path( - &chan_info.channel_id.0.as_hex().to_string(), - &state.static_state.ldk_data_dir, - false, - ); - if !info_file_path.exists() { - continue; - } - let rgb_info = parse_rgb_channel_info(&info_file_path); + let channel_id_str = chan_info.channel_id.0.as_hex().to_string(); + + let rgb_info = match unlocked_state + .kv_store + .read_rgb_channel_info(&channel_id_str, false) + { + Ok(info) => info, + Err(_) => continue, + }; + if rgb_info.contract_id == contract_id { offchain_outbound += rgb_info.local_rgb_amount; offchain_inbound += rgb_info.remote_rgb_amount; @@ -1484,8 +1477,7 @@ pub(crate) async fn backup( no_cancel(async move { let _guard = state.check_locked().await?; - let _mnemonic = - check_password_validity(&payload.password, &state.static_state.storage_dir_path)?; + let _mnemonic = check_password_validity(&payload.password, &state.static_state.database)?; do_backup( &state.static_state.storage_dir_path, @@ -1532,12 +1524,12 @@ pub(crate) async fn change_password( check_password_strength(payload.new_password.clone())?; let mnemonic = - check_password_validity(&payload.old_password, &state.static_state.storage_dir_path)?; + check_password_validity(&payload.old_password, &state.static_state.database)?; encrypt_and_save_mnemonic( payload.new_password, mnemonic.to_string(), - &get_mnemonic_path(&state.static_state.storage_dir_path), + &state.static_state.database, )?; Ok(Json(EmptyResponse {})) @@ -1653,11 +1645,8 @@ pub(crate) async fn connect_peer( if let Some(peer_addr) = peer_addr { connect_peer_if_necessary(peer_pubkey, peer_addr, unlocked_state.peer_manager.clone()) .await?; - disk::persist_channel_peer( - &state.static_state.ldk_data_dir.join(CHANNEL_PEER_DATA), - &peer_pubkey, - &peer_addr, - )?; + let db = state.get_db(); + db.persist_channel_peer(&peer_pubkey, &peer_addr)?; } else { return Err(APIError::InvalidPeerInfo(s!( "incorrectly formatted peer info. Should be formatted as: `pubkey@host:port`" @@ -1758,10 +1747,8 @@ pub(crate) async fn disconnect_peer( } } - disk::delete_channel_peer( - &state.static_state.ldk_data_dir.join(CHANNEL_PEER_DATA), - payload.peer_pubkey, - )?; + let db = state.get_db(); + db.delete_channel_peer(&payload.peer_pubkey)?; //check the pubkey matches a valid connected peer if unlocked_state @@ -1877,15 +1864,12 @@ pub(crate) async fn get_payment( for (payment_hash, payment_info) in &inbound_payments { if payment_hash == &requested_ph { - let rgb_payment_info_path_inbound = - get_rgb_payment_info_path(payment_hash, &state.static_state.ldk_data_dir, true); - - let (asset_amount, asset_id) = if rgb_payment_info_path_inbound.exists() { - let info = parse_rgb_payment_info(&rgb_payment_info_path_inbound); - (Some(info.amount), Some(info.contract_id.to_string())) - } else { - (None, None) - }; + let (asset_amount, asset_id) = unlocked_state + .kv_store + .read_rgb_payment_info(payment_hash, true) + .ok() + .map(|info| (Some(info.amount), Some(info.contract_id.to_string()))) + .unwrap_or((None, None)); return Ok(Json(GetPaymentResponse { payment: Payment { @@ -1907,15 +1891,12 @@ pub(crate) async fn get_payment( for (payment_id, payment_info) in &outbound_payments { let payment_hash = &PaymentHash(payment_id.0); if payment_hash == &requested_ph { - let rgb_payment_info_path_outbound = - get_rgb_payment_info_path(payment_hash, &state.static_state.ldk_data_dir, false); - - let (asset_amount, asset_id) = if rgb_payment_info_path_outbound.exists() { - let info = parse_rgb_payment_info(&rgb_payment_info_path_outbound); - (Some(info.amount), Some(info.contract_id.to_string())) - } else { - (None, None) - }; + let (asset_amount, asset_id) = unlocked_state + .kv_store + .read_rgb_payment_info(payment_hash, false) + .ok() + .map(|info| (Some(info.amount), Some(info.contract_id.to_string()))) + .unwrap_or((None, None)); return Ok(Json(GetPaymentResponse { payment: Payment { @@ -2035,8 +2016,7 @@ pub(crate) async fn init( check_password_strength(payload.password.clone())?; - let mnemonic_path = get_mnemonic_path(&state.static_state.storage_dir_path); - check_already_initialized(&mnemonic_path)?; + check_already_initialized(&state.static_state.database)?; let mnemonic = match payload.mnemonic { Some(mnemonic) => Mnemonic::from_str(&mnemonic) @@ -2045,7 +2025,11 @@ pub(crate) async fn init( None => generate_keys(state.static_state.network, WitnessVersion::Taproot).mnemonic, }; - encrypt_and_save_mnemonic(payload.password, mnemonic.clone(), &mnemonic_path)?; + encrypt_and_save_mnemonic( + payload.password, + mnemonic.clone(), + &state.static_state.database, + )?; Ok(Json(InitResponse { mnemonic })) }) @@ -2251,13 +2235,13 @@ pub(crate) async fn keysend( }, )?; if let Some((contract_id, rgb_amount)) = rgb_payment { - write_rgb_payment_info_file( - &PathBuf::from(&state.static_state.ldk_data_dir), + write_rgb_payment_info( &payment_hash, contract_id, rgb_amount, false, false, + &(Arc::clone(&unlocked_state.kv_store) as Arc), ); } @@ -2309,15 +2293,16 @@ pub(crate) async fn list_assets( let mut offchain_balances = HashMap::new(); for chan_info in unlocked_state.channel_manager.list_channels() { - let info_file_path = get_rgb_channel_info_path( - &chan_info.channel_id.0.as_hex().to_string(), - &state.static_state.ldk_data_dir, - false, - ); - if !info_file_path.exists() { - continue; - } - let rgb_info = parse_rgb_channel_info(&info_file_path); + let channel_id_str = chan_info.channel_id.0.as_hex().to_string(); + + let rgb_info = match unlocked_state + .kv_store + .read_rgb_channel_info(&channel_id_str, false) + { + Ok(info) => info, + Err(_) => continue, + }; + offchain_balances .entry(rgb_info.contract_id.to_string()) .and_modify(|(offchain_outbound, offchain_inbound)| { @@ -2445,17 +2430,15 @@ pub(crate) async fn list_channels( channel.short_channel_id = Some(id); } - let info_file_path = get_rgb_channel_info_path( - &chan_info.channel_id.0.as_hex().to_string(), - &state.static_state.ldk_data_dir, - false, - ); - if info_file_path.exists() { - let rgb_info = parse_rgb_channel_info(&info_file_path); + let channel_id_str = chan_info.channel_id.0.as_hex().to_string(); + if let Ok(rgb_info) = unlocked_state + .kv_store + .read_rgb_channel_info(&channel_id_str, false) + { channel.asset_id = Some(rgb_info.contract_id.to_string()); channel.asset_local_amount = Some(rgb_info.local_rgb_amount); channel.asset_remote_amount = Some(rgb_info.remote_rgb_amount); - }; + } channels.push(channel); } @@ -2474,14 +2457,12 @@ pub(crate) async fn list_payments( let mut payments = vec![]; for (payment_hash, payment_info) in &inbound_payments { - let rgb_payment_info_path_inbound = - get_rgb_payment_info_path(payment_hash, &state.static_state.ldk_data_dir, true); - - let (asset_amount, asset_id) = if rgb_payment_info_path_inbound.exists() { - let info = parse_rgb_payment_info(&rgb_payment_info_path_inbound); - (Some(info.amount), Some(info.contract_id.to_string())) - } else { - (None, None) + let (asset_amount, asset_id) = match unlocked_state + .kv_store + .read_rgb_payment_info(payment_hash, true) + { + Ok(info) => (Some(info.amount), Some(info.contract_id.to_string())), + Err(_) => (None, None), }; payments.push(Payment { @@ -2501,14 +2482,12 @@ pub(crate) async fn list_payments( for (payment_id, payment_info) in &outbound_payments { let payment_hash = &PaymentHash(payment_id.0); - let rgb_payment_info_path_outbound = - get_rgb_payment_info_path(payment_hash, &state.static_state.ldk_data_dir, false); - - let (asset_amount, asset_id) = if rgb_payment_info_path_outbound.exists() { - let info = parse_rgb_payment_info(&rgb_payment_info_path_outbound); - (Some(info.amount), Some(info.contract_id.to_string())) - } else { - (None, None) + let (asset_amount, asset_id) = match unlocked_state + .kv_store + .read_rgb_payment_info(payment_hash, false) + { + Ok(info) => (Some(info.amount), Some(info.contract_id.to_string())), + Err(_) => (None, None), }; payments.push(Payment { @@ -2836,13 +2815,11 @@ pub(crate) async fn maker_execute( .filter(|details| { match get_rgb_channel_info_optional( &details.channel_id, - &state.static_state.ldk_data_dir, false, + unlocked_state.kv_store.as_ref(), ) { _ if swap_info.is_from_btc() => true, - Some((rgb_info, _)) if Some(rgb_info.contract_id) == swap_info.from_asset => { - true - } + Some(rgb_info) if Some(rgb_info.contract_id) == swap_info.from_asset => true, _ => false, } }) @@ -2869,7 +2846,7 @@ pub(crate) async fn maker_execute( let first_leg = get_route( &unlocked_state.channel_manager, &unlocked_state.router, - &state.static_state.ldk_data_dir, + unlocked_state.kv_store.as_ref(), unlocked_state.channel_manager.get_our_node_id(), taker_pk, if swap_info.is_to_btc() { @@ -2887,7 +2864,7 @@ pub(crate) async fn maker_execute( let second_leg = get_route( &unlocked_state.channel_manager, &unlocked_state.router, - &state.static_state.ldk_data_dir, + unlocked_state.kv_store.as_ref(), taker_pk, unlocked_state.channel_manager.get_our_node_id(), if swap_info.is_to_btc() || swap_info.is_asset_asset() { @@ -2971,13 +2948,13 @@ pub(crate) async fn maker_execute( }; if swap_info.is_to_asset() { - write_rgb_payment_info_file( - &state.static_state.ldk_data_dir, + write_rgb_payment_info( &swapstring.payment_hash, swap_info.to_asset.unwrap(), swap_info.qty_to, true, false, + &(Arc::clone(&unlocked_state.kv_store) as Arc), ); } @@ -3064,8 +3041,8 @@ pub(crate) async fn maker_init( if let Some(to_asset) = to_asset { let max_balance = get_max_local_rgb_amount( to_asset, - &state.static_state.ldk_data_dir, unlocked_state.channel_manager.list_channels().iter(), + unlocked_state.kv_store.as_ref(), ); if swap_info.qty_to > max_balance { return Err(APIError::InsufficientAssets); @@ -3243,7 +3220,7 @@ pub(crate) async fn open_channel( let (peer_pubkey, mut peer_addr) = parse_peer_info(payload.peer_pubkey_and_opt_addr.to_string())?; - let peer_data_path = state.static_state.ldk_data_dir.join(CHANNEL_PEER_DATA); + let db = state.get_db(); if peer_addr.is_none() { if let Some(peer) = unlocked_state.peer_manager.peer_by_node_id(&peer_pubkey) { if let Some(socket_address) = peer.socket_address { @@ -3255,7 +3232,7 @@ pub(crate) async fn open_channel( } } if peer_addr.is_none() { - let peer_info = disk::read_channel_peer_data(&peer_data_path)?; + let peer_info = db.read_channel_peer_data()?; for (pubkey, addr) in peer_info.into_iter() { if pubkey == peer_pubkey { peer_addr = Some(addr); @@ -3266,7 +3243,7 @@ pub(crate) async fn open_channel( if let Some(peer_addr) = peer_addr { connect_peer_if_necessary(peer_pubkey, peer_addr, unlocked_state.peer_manager.clone()) .await?; - disk::persist_channel_peer(&peer_data_path, &peer_pubkey, &peer_addr)?; + db.persist_channel_peer(&peer_pubkey, &peer_addr)?; } else { return Err(APIError::InvalidPeerInfo(s!( "cannot find the address for the provided pubkey" @@ -3364,22 +3341,12 @@ pub(crate) async fn open_channel( remote_rgb_amount: push_amount, batch_transfer_idx: None, }; - write_rgb_channel_info( - &get_rgb_channel_info_path( - &temporary_channel_id, - &state.static_state.ldk_data_dir, - true, - ), - &rgb_info, - ); - write_rgb_channel_info( - &get_rgb_channel_info_path( - &temporary_channel_id, - &state.static_state.ldk_data_dir, - false, - ), - &rgb_info, - ); + unlocked_state + .kv_store + .write_rgb_channel_info(&temporary_channel_id, &rgb_info, true); + unlocked_state + .kv_store + .write_rgb_channel_info(&temporary_channel_id, &rgb_info, false); } Ok(Json(OpenChannelResponse { @@ -3464,8 +3431,7 @@ pub(crate) async fn restore( no_cancel(async move { let _unlocked_state = state.check_locked().await?; - let mnemonic_path = get_mnemonic_path(&state.static_state.storage_dir_path); - check_already_initialized(&mnemonic_path)?; + check_already_initialized(&state.static_state.database)?; restore_backup( Path::new(&payload.backup_path), @@ -3473,8 +3439,7 @@ pub(crate) async fn restore( &state.static_state.storage_dir_path, )?; - let _mnemonic = - check_password_validity(&payload.password, &state.static_state.storage_dir_path)?; + let _mnemonic = check_password_validity(&payload.password, &state.static_state.database)?; Ok(Json(EmptyResponse {})) }) @@ -3760,13 +3725,13 @@ pub(crate) async fn send_payment( )?; let payment_hash = PaymentHash(invoice.payment_hash().to_byte_array()); if let Some((contract_id, rgb_amount)) = rgb_payment { - write_rgb_payment_info_file( - &PathBuf::from(&state.static_state.ldk_data_dir), + write_rgb_payment_info( &payment_hash, contract_id, rgb_amount, false, false, + &(Arc::clone(&unlocked_state.kv_store) as Arc), ); } @@ -3903,8 +3868,8 @@ pub(crate) async fn taker( if let Some(from_asset) = swapstring.swap_info.from_asset { let max_balance = get_max_local_rgb_amount( from_asset, - &state.static_state.ldk_data_dir, unlocked_state.channel_manager.list_channels().iter(), + unlocked_state.kv_store.as_ref(), ); if swapstring.swap_info.qty_from > max_balance { return Err(APIError::InsufficientAssets); @@ -3938,16 +3903,14 @@ pub(crate) async fn unlock( } } - let mnemonic = match check_password_validity( - &payload.password, - &state.static_state.storage_dir_path, - ) { - Ok(mnemonic) => mnemonic, - Err(e) => { - state.update_changing_state(false); - return Err(e); - } - }; + let mnemonic = + match check_password_validity(&payload.password, &state.static_state.database) { + Ok(mnemonic) => mnemonic, + Err(e) => { + state.update_changing_state(false); + return Err(e); + } + }; tracing::debug!("Starting LDK..."); let (new_ldk_background_services, new_unlocked_app_state) = diff --git a/src/test/authentication.rs b/src/test/authentication.rs index b914182c..1491444c 100644 --- a/src/test/authentication.rs +++ b/src/test/authentication.rs @@ -201,6 +201,52 @@ async fn authentication() { .unwrap(); check_unauthorized(res).await; + // revoked token remains rejected after node restart (DB persistence check) + let res = reqwest::Client::new() + .post(format!("http://{node_address}/shutdown")) + .bearer_auth(&admin_token) + .send() + .await + .unwrap(); + check_response_is_ok(res).await; + let t_0 = OffsetDateTime::now_utc(); + loop { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + if TcpListener::bind(node_address).await.is_ok() { + break; + } + if (OffsetDateTime::now_utc() - t_0).as_seconds_f32() > 10.0 { + panic!("node socket not becoming available") + } + } + let node_address = start_daemon( + &test_dir_node1, + NODE1_PEER_PORT, + Some(root_public_key), + true, + ) + .await; + let payload = unlock_req(password); + let res = reqwest::Client::new() + .post(format!("http://{node_address}/unlock")) + .json(&payload) + .bearer_auth(&admin_token) + .send() + .await + .unwrap(); + check_response_is_ok(res) + .await + .json::() + .await + .unwrap(); + let res = reqwest::Client::new() + .get(format!("http://{node_address}/nodeinfo")) + .bearer_auth(&user_token) + .send() + .await + .unwrap(); + assert_eq!(res.status(), reqwest::StatusCode::UNAUTHORIZED); + // with no token no API can be called let res = reqwest::Client::new() .get(format!("http://{node_address}/nodeinfo")) diff --git a/src/utils.rs b/src/utils.rs index 34a0c046..4ce5b099 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,3 +1,4 @@ +use crate::kv_store::SeaOrmKvStore; use amplify::s; use bitcoin::io; use bitcoin::secp256k1::PublicKey; @@ -11,15 +12,16 @@ use lightning::routing::router::{ use lightning::{ onion_message::packet::OnionMessageContents, sign::KeysManager, + util::persist::KVStoreSync, util::ser::{Writeable, Writer}, }; -use lightning_persister::fs_store::FilesystemStore; use magic_crypt::{new_magic_crypt, MagicCryptTrait}; use rgb_lib::{bdk_wallet::keys::bip39::Mnemonic, BitcoinNetwork, ContractId}; +use rln_migration::{Migrator, MigratorTrait}; +use sea_orm::{ConnectOptions, Database, DatabaseConnection}; use std::{ collections::HashSet, fmt::Write, - fs, net::{SocketAddr, TcpStream, ToSocketAddrs}, path::Path, path::PathBuf, @@ -66,6 +68,10 @@ pub(crate) struct AppState { } impl AppState { + pub(crate) fn get_db(&self) -> SeaOrmKvStore { + SeaOrmKvStore::from_connection(Arc::clone(&self.static_state.database)) + } + pub(crate) fn get_changing_state(&self) -> MutexGuard<'_, bool> { self.changing_state.lock().unwrap() } @@ -90,6 +96,7 @@ pub(crate) struct StaticState { pub(crate) ldk_data_dir: PathBuf, pub(crate) logger: Arc, pub(crate) max_media_upload_size_mb: u16, + pub(crate) database: Arc, } pub(crate) struct UnlockedAppState { @@ -101,7 +108,7 @@ pub(crate) struct UnlockedAppState { pub(crate) onion_messenger: Arc, pub(crate) outbound_payments: Arc>, pub(crate) peer_manager: Arc, - pub(crate) fs_store: Arc, + pub(crate) kv_store: Arc, pub(crate) bump_tx_event_handler: Arc, pub(crate) maker_swaps: Arc>, pub(crate) taker_swaps: Arc>, @@ -155,8 +162,9 @@ impl Writeable for UserOnionMessageContents { } } -pub(crate) fn check_already_initialized(mnemonic_path: &Path) -> Result<(), APIError> { - if mnemonic_path.exists() { +pub(crate) fn check_already_initialized(database: &DatabaseConnection) -> Result<(), APIError> { + let db = SeaOrmKvStore::from_connection(Arc::new(database.clone())); + if db.is_initialized()? { return Err(APIError::AlreadyInitialized); } Ok(()) @@ -173,13 +181,13 @@ pub(crate) fn check_password_strength(password: String) -> Result<(), APIError> pub(crate) fn check_password_validity( password: &str, - storage_dir_path: &Path, + database: &DatabaseConnection, ) -> Result { - let mnemonic_path = get_mnemonic_path(storage_dir_path); - if let Ok(encrypted_mnemonic) = fs::read_to_string(mnemonic_path) { + let db = SeaOrmKvStore::from_connection(Arc::new(database.clone())); + if let Some(config) = db.get_config()? { let mcrypt = new_magic_crypt!(password, 256); let mnemonic_str = mcrypt - .decrypt_base64_to_string(encrypted_mnemonic) + .decrypt_base64_to_string(config.encrypted_mnemonic) .map_err(|_| APIError::WrongPassword)?; Ok(Mnemonic::from_str(&mnemonic_str).expect("valid mnemonic")) } else { @@ -205,27 +213,37 @@ pub(crate) fn check_port_is_available(port: u16) -> Result<(), AppError> { Ok(()) } -pub(crate) fn get_mnemonic_path(storage_dir_path: &Path) -> PathBuf { - storage_dir_path.join("mnemonic") +pub(crate) fn get_db_path(storage_dir_path: &Path) -> PathBuf { + storage_dir_path.join("rln_db") +} + +#[cfg(not(target_os = "windows"))] +pub(crate) fn adjust_canonicalization>(p: P) -> String { + p.as_ref().display().to_string() +} + +#[cfg(target_os = "windows")] +pub(crate) fn adjust_canonicalization>(p: P) -> String { + const VERBATIM_PREFIX: &str = r#"\\?\"#; + let p = p.as_ref().display().to_string(); + if let Some(stripped) = p.strip_prefix(VERBATIM_PREFIX) { + stripped.to_string() + } else { + p + } } pub(crate) fn encrypt_and_save_mnemonic( password: String, mnemonic: String, - mnemonic_path: &Path, + database: &DatabaseConnection, ) -> Result<(), APIError> { let mcrypt = new_magic_crypt!(password, 256); let encrypted_mnemonic = mcrypt.encrypt_str_to_base64(mnemonic); - match fs::write(mnemonic_path, encrypted_mnemonic) { - Ok(()) => { - tracing::info!("Created a new wallet"); - Ok(()) - } - Err(e) => Err(APIError::FailedKeysCreation( - mnemonic_path.to_string_lossy().to_string(), - e.to_string(), - )), - } + let db = SeaOrmKvStore::from_connection(Arc::new(database.clone())); + db.save_mnemonic(encrypted_mnemonic)?; + tracing::info!("Created a new wallet"); + Ok(()) } pub(crate) async fn connect_peer_if_necessary( @@ -349,6 +367,27 @@ pub(crate) async fn start_daemon(args: &UserArgs) -> Result, AppEr let ldk_data_dir = args.storage_dir_path.join(LDK_DIR); let logger = Arc::new(FilesystemLogger::new(ldk_data_dir.clone())); + let db_path = get_db_path(&args.storage_dir_path); + let connection_string = format!("sqlite:{}?mode=rwc", adjust_canonicalization(&db_path)); + let mut opt = ConnectOptions::new(connection_string); + opt.max_connections(8) + .min_connections(1) + .connect_timeout(Duration::from_secs(8)) + .idle_timeout(Duration::from_secs(5 * 60)) + .max_lifetime(Duration::from_secs(60 * 60)); + + let database = Database::connect(opt).await.map_err(|e| { + AppError::IO(std::io::Error::other(format!( + "Database connection failed: {e}" + ))) + })?; + + Migrator::up(&database, None) + .await + .map_err(|e| AppError::IO(std::io::Error::other(format!("Migration failed: {e}"))))?; + + tracing::info!(db_path = %db_path.display(), "Shared database initialized"); + let cancel_token = CancellationToken::new(); let static_state = Arc::new(StaticState { @@ -358,6 +397,7 @@ pub(crate) async fn start_daemon(args: &UserArgs) -> Result, AppEr ldk_data_dir, logger, max_media_upload_size_mb: args.max_media_upload_size_mb, + database: Arc::new(database), }); let app_state = Arc::new(AppState { @@ -370,7 +410,6 @@ pub(crate) async fn start_daemon(args: &UserArgs) -> Result, AppEr revoked_tokens: Arc::new(Mutex::new(HashSet::new())), }); - // Load revoked tokens from file if authentication is enabled if app_state.root_public_key.is_some() { let loaded_tokens = app_state.load_revoked_tokens()?; *app_state.revoked_tokens.lock().unwrap() = loaded_tokens; @@ -388,13 +427,13 @@ pub(crate) fn get_current_timestamp() -> u64 { pub(crate) fn get_max_local_rgb_amount<'r>( contract_id: ContractId, - ldk_data_dir_path: &Path, channels: impl Iterator, + kv_store: &dyn KVStoreSync, ) -> u64 { let mut max_balance = 0; for chan_info in channels { - if let Some((rgb_info, _)) = - get_rgb_channel_info_optional(&chan_info.channel_id, ldk_data_dir_path, false) + if let Some(rgb_info) = + get_rgb_channel_info_optional(&chan_info.channel_id, false, kv_store) { if rgb_info.contract_id == contract_id && rgb_info.local_rgb_amount > max_balance { max_balance = rgb_info.local_rgb_amount; @@ -409,7 +448,7 @@ pub(crate) fn get_max_local_rgb_amount<'r>( pub(crate) fn get_route( channel_manager: &crate::ldk::ChannelManager, router: &crate::ldk::Router, - ldk_data_dir_path: &Path, + kv_store: &dyn KVStoreSync, start: PublicKey, dest: PublicKey, final_value_msat: Option, @@ -424,8 +463,8 @@ pub(crate) fn get_route( .iter() .filter(|channel| match rgb_payment { Some((contract_id, _)) => { - get_rgb_channel_info_optional(&channel.channel_id, ldk_data_dir_path, false) - .is_some_and(|(rgb_info, _)| rgb_info.contract_id == contract_id) + get_rgb_channel_info_optional(&channel.channel_id, false, kv_store) + .is_some_and(|rgb_info| rgb_info.contract_id == contract_id) } None => true, })