Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 106 additions & 66 deletions crates/floresta-electrum/src/electrum_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,10 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
"blockchain.relayfee" => json_rpc_res!(request, 0.00001),
"blockchain.scripthash.get_balance" => {
let script_hash = get_arg!(request, sha256::Hash, 0);
let balance = self.address_cache.get_address_balance(&script_hash);
let balance = self
.address_cache
.get_address_balance(&script_hash)
.map_err(|e| super::error::Error::Blockchain(Box::new(e)))?;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could just import Error::Blockchain, and if possible, it would be better to add automatic handling using the impl_error_from trait here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error::Blockchain takes a boxed trait object so impl_error_from wont work directly since it doesnt box. also the error type here is generic. do you have anything specific in mind or is just the import fine?

let result = json!({
"confirmed": balance,
"unconfirmed": 0
Expand All @@ -319,32 +322,35 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
}
"blockchain.scripthash.get_history" => {
let script_hash = get_arg!(request, sha256::Hash, 0);
self.address_cache
let transactions = self
.address_cache
.get_address_history(&script_hash)
.map(|transactions| {
let res = Self::process_history(&transactions);
json_rpc_res!(request, res)
})
.unwrap_or_else(|| {
Ok(json!({
"jsonrpc": "2.0",
"result": [],
"id": request.id
}))
})
.map_err(|e| super::error::Error::Blockchain(Box::new(e)))?;
let res = Self::process_history(&transactions);
json_rpc_res!(request, res)
}
"blockchain.scripthash.get_mempool" => json_rpc_res!(request, []),
"blockchain.scripthash.listunspent" => {
let hash = get_arg!(request, sha256::Hash, 0);
let utxos = self.address_cache.get_address_utxos(&hash);
if utxos.is_none() {
let utxos = self
.address_cache
.get_address_utxos(&hash)
.map_err(|e| super::error::Error::Blockchain(Box::new(e)))?;
if utxos.is_empty() {
return json_rpc_res!(request, []);
}
let mut final_utxos = Vec::new();
for (utxo, prevout) in utxos.unwrap().into_iter() {
let height = self.address_cache.get_height(&prevout.txid).unwrap();

let position = self.address_cache.get_position(&prevout.txid).unwrap();
for (utxo, prevout) in utxos.into_iter() {
let height = self
.address_cache
.get_height(&prevout.txid)
.map_err(|e| super::error::Error::Blockchain(Box::new(e)))?
.unwrap_or(0);
let position = self
.address_cache
.get_position(&prevout.txid)
.map_err(|e| super::error::Error::Blockchain(Box::new(e)))?
.unwrap_or(0);

final_utxos.push(json!({
"height": height,
Expand All @@ -362,7 +368,7 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {

let history = self.address_cache.get_address_history(&hash);
match history {
Some(transactions) if !transactions.is_empty() => {
Ok(transactions) if !transactions.is_empty() => {
let res = get_status(transactions);
json_rpc_res!(request, res)
}
Expand All @@ -382,8 +388,14 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
let script = get_arg!(request, ScriptBuf, 0);
let hash = get_spk_hash(&script);

if !self.address_cache.is_address_cached(&hash) {
self.address_cache.cache_address(script.clone());
if !self
.address_cache
.is_address_cached(&hash)
.map_err(|e| super::error::Error::Blockchain(Box::new(e)))?
{
self.address_cache
.cache_address(script.clone())
.map_err(|e| super::error::Error::Blockchain(Box::new(e)))?;
self.addresses_to_scan.push(script);
let res = json!({
"confirmed": 0,
Expand All @@ -392,7 +404,10 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
return json_rpc_res!(request, res);
}

let balance = self.address_cache.get_address_balance(&hash);
let balance = self
.address_cache
.get_address_balance(&hash)
.map_err(|e| super::error::Error::Blockchain(Box::new(e)))?;
let result = json!({
"confirmed": balance,
"unconfirmed": 0
Expand All @@ -403,25 +418,24 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
let script = get_arg!(request, ScriptBuf, 0);
let hash = get_spk_hash(&script);

if !self.address_cache.is_address_cached(&hash) {
self.address_cache.cache_address(script.clone());
if !self
.address_cache
.is_address_cached(&hash)
.map_err(|e| super::error::Error::Blockchain(Box::new(e)))?
{
self.address_cache
.cache_address(script.clone())
.map_err(|e| super::error::Error::Blockchain(Box::new(e)))?;
self.addresses_to_scan.push(script);
return json_rpc_res!(request, null);
}

self.address_cache
let transactions = self
.address_cache
.get_address_history(&hash)
.map(|transactions| {
let res = Self::process_history(&transactions);
json_rpc_res!(request, res)
})
.unwrap_or_else(|| {
Ok(json!({
"jsonrpc": "2.0",
"result": null,
"id": request.id
}))
})
.map_err(|e| super::error::Error::Blockchain(Box::new(e)))?;
let res = Self::process_history(&transactions);
json_rpc_res!(request, res)
}
"blockchain.scriptpubkey.subscribe" => {
let script = get_arg!(request, ScriptBuf, 0);
Expand All @@ -430,14 +444,14 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {

let history = self.address_cache.get_address_history(&hash);
match history {
Some(transactions) if !transactions.is_empty() => {
Ok(transactions) if !transactions.is_empty() => {
let res = get_status(transactions);
json_rpc_res!(request, res)
}
Some(_) => {
Ok(_) => {
json_rpc_res!(request, null)
}
None => {
Err(_) => {
self.addresses_to_scan.push(script);
json_rpc_res!(request, null)
}
Expand Down Expand Up @@ -468,19 +482,27 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
return Err(super::error::Error::Mempool(Box::new(e)));
};

let updated = self
.address_cache
.cache_mempool_transaction(&tx)
.into_iter()
.map(|spend| (tx.clone(), spend))
.collect::<Vec<_>>();
let updated = match self.address_cache.cache_mempool_transaction(&tx) {
Ok(coins) => coins
.into_iter()
.map(|spend| (tx.clone(), spend))
.collect::<Vec<_>>(),
Err(e) => {
error!("Error caching mempool transaction: {e}");
vec![]
}
};

self.wallet_notify(&updated);
json_rpc_res!(request, txid)
}
"blockchain.transaction.get" => {
let tx_id = get_arg!(request, Txid, 0);
let tx = self.address_cache.get_cached_transaction(&tx_id);
let tx = self
.address_cache
.get_cached_transaction(&tx_id)
.ok()
.flatten();
if let Some(tx) = tx {
return json_rpc_res!(request, tx);
}
Expand All @@ -489,8 +511,8 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
}
"blockchain.transaction.get_merkle" => {
let tx_id = get_arg!(request, Txid, 0);
let proof = self.address_cache.get_merkle_proof(&tx_id);
let height = self.address_cache.get_height(&tx_id);
let proof = self.address_cache.get_merkle_proof(&tx_id).ok().flatten();
let height = self.address_cache.get_height(&tx_id).ok().flatten();
if let Some(proof) = proof {
let result = json!({
"merkle": proof.hashes,
Expand Down Expand Up @@ -595,7 +617,9 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
}

self.addresses_to_scan.iter().for_each(|address| {
self.address_cache.cache_address(address.clone());
if let Err(e) = self.address_cache.cache_address(address.clone()) {
error!("Could not cache address: {e}");
}
});

info!("Catching up with addresses {:?}", self.addresses_to_scan);
Expand Down Expand Up @@ -702,10 +726,12 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
}]
});

let current_height = self.address_cache.get_cache_height();
let current_height = self.address_cache.get_cache_height().unwrap_or(0);

if (!self.chain.is_in_ibd() || height % 1000 == 0) && (height > current_height) {
self.address_cache.bump_height(height);
if let Err(e) = self.address_cache.bump_height(height) {
error!("Could not update cache height: {e}");
}
}

if self.chain.get_height().unwrap() == height {
Expand All @@ -717,7 +743,13 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
}
}

let transactions = self.address_cache.block_process(&block, height);
let transactions = match self.address_cache.block_process(&block, height) {
Ok(txs) => txs,
Err(e) => {
error!("Error processing block at height {height}: {e}");
vec![]
}
};

self.wallet_notify(&transactions);
}
Expand Down Expand Up @@ -813,9 +845,15 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
for (_, out) in transactions {
let hash = get_spk_hash(&out.script_pubkey);
if let Some(client) = self.client_addresses.get(&hash) {
let history = self.address_cache.get_address_history(&hash);
let history = match self.address_cache.get_address_history(&hash) {
Ok(h) => h,
Err(e) => {
error!("{e}");
continue;
}
};

let status_hash = get_status(history.unwrap());
let status_hash = get_status(history);
let notify = json!({
"jsonrpc": "2.0",
"method": "blockchain.scripthash.subscribe",
Expand Down Expand Up @@ -1022,20 +1060,22 @@ mod test {
fn get_test_cache() -> Arc<AddressCache<KvDatabase>> {
let test_id: u32 = rand::random();
let cache = KvDatabase::new(format!("./tmp-db/{test_id}.floresta")).unwrap();
let cache = AddressCache::new(cache);
let cache = AddressCache::new(cache).unwrap();

// Inserting test transactions in the wallet
let (transaction, proof) = get_test_transaction();
cache.cache_transaction(
&transaction,
118511,
transaction.output[0].value.to_sat(),
proof,
1,
0,
false,
get_spk_hash(&transaction.output[0].script_pubkey),
);
cache
.cache_transaction(
&transaction,
118511,
transaction.output[0].value.to_sat(),
proof,
1,
0,
false,
get_spk_hash(&transaction.output[0].script_pubkey),
)
.unwrap();

Arc::new(cache)
}
Expand Down
7 changes: 5 additions & 2 deletions crates/floresta-node/src/florestad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,8 @@ impl Florestad {
let database = KvDatabase::new(self.config.data_dir.clone())
.map_err(FlorestadError::CouldNotOpenKvDatabase)?;

let wallet = AddressCache::new(database);
let wallet =
AddressCache::new(database).map_err(FlorestadError::CouldNotInitializeWallet)?;

wallet
.setup()
Expand Down Expand Up @@ -744,7 +745,9 @@ impl Florestad {
}

for address in self.get_addresses()? {
wallet.cache_address(address);
wallet
.cache_address(address)
.map_err(|e| FlorestadError::CouldNotSetupWallet(e.to_string()))?;
}

info!("Wallet setup completed!");
Expand Down
27 changes: 18 additions & 9 deletions crates/floresta-node/src/json_rpc/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use miniscript::descriptor::checksum;
use serde_json::json;
use serde_json::Value;
use tracing::debug;
use tracing::error;

use super::res::GetBlockchainInfoRes;
use super::res::GetTxOutProof;
Expand Down Expand Up @@ -52,6 +53,8 @@ impl<Blockchain: RpcChain> RpcImpl<Blockchain> {
let height = self
.wallet
.get_height(txid)
.ok()
.flatten()
.ok_or(JsonRpcError::TxNotFound)?;
let blockhash = self.chain.get_block_hash(height).unwrap();
self.chain
Expand Down Expand Up @@ -476,12 +479,14 @@ impl<Blockchain: RpcChain> RpcImpl<Blockchain> {
_include_mempool: bool,
) -> Result<Option<GetTxOut>, JsonRpcError> {
let res = match (
self.wallet.get_transaction(&txid),
self.wallet.get_height(&txid),
self.wallet.get_utxo(&OutPoint {
txid,
vout: outpoint,
}),
self.wallet.get_transaction(&txid).ok().flatten(),
self.wallet.get_height(&txid).ok().flatten(),
self.wallet
.get_utxo(&OutPoint {
txid,
vout: outpoint,
})
.ok(),
) {
(Some(cached_tx), Some(height), Some(txout)) => {
let is_coinbase = cached_tx.tx.is_coinbase();
Expand Down Expand Up @@ -599,7 +604,7 @@ impl<Blockchain: RpcChain> RpcImpl<Blockchain> {
script: ScriptBuf,
height: u32,
) -> Result<Value, JsonRpcError> {
if let Some(txout) = self.wallet.get_utxo(&OutPoint { txid, vout }) {
if let Ok(txout) = self.wallet.get_utxo(&OutPoint { txid, vout }) {
return Ok(serde_json::to_value(txout).unwrap());
}

Expand All @@ -613,7 +618,9 @@ impl<Blockchain: RpcChain> RpcImpl<Blockchain> {
return Err(JsonRpcError::NoBlockFilters);
};

self.wallet.cache_address(script.clone());
if let Err(e) = self.wallet.cache_address(script.clone()) {
error!("Could not cache address: {e}");
}
let filter_key = script.to_bytes();
let candidates = cfilters
.match_any(
Expand Down Expand Up @@ -642,7 +649,9 @@ impl<Blockchain: RpcChain> RpcImpl<Blockchain> {
return Err(JsonRpcError::BlockNotFound);
};

self.wallet.block_process(&candidate, height);
if let Err(e) = self.wallet.block_process(&candidate, height) {
error!("Error processing block at height {height}: {e}");
}
}

let val = match self.get_tx_out(txid, vout, false)? {
Expand Down
Loading
Loading