2. add(log): Log when state requests take a long time (#4815)

* Fix clippy::let_and_return

* Increase lightwalletd test timeouts for zebrad slowness

* Add a `zebrad_update_sync()` test, that update syncs Zebra without lightwalletd

* Run the zebrad-update-sync test in CI

* Add extra zebrad time to workaround lightwalletd bugs

* Add a CodeTimer diagnostic struct for long-running code

* Time state init and each state request, log when it takes too long

* Add code locations to execution timers

* Instrument state futures and functions with tracing spans

* Only log each code timer once

* Make displayed times shorter
This commit is contained in:
teor 2022-07-26 08:33:00 +10:00 committed by GitHub
parent a9fcde3ebf
commit ed553a9eca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 348 additions and 51 deletions

View File

@ -56,6 +56,7 @@ humantime = "2.1.0"
displaydoc = "0.2.3" displaydoc = "0.2.3"
static_assertions = "1.1.0" static_assertions = "1.1.0"
thiserror = "1.0.31" thiserror = "1.0.31"
tracing = "0.1.31"
# Serialization # Serialization
hex = { version = "0.4.3", features = ["serde"] } hex = { version = "0.4.3", features = ["serde"] }

View File

@ -0,0 +1,120 @@
//! Tracing the execution time of functions.
//!
//! TODO: also trace polling time for futures, using a `Future` wrapper
use std::time::{Duration, Instant};
use crate::fmt::duration_short;
/// The default minimum info-level message time.
pub const DEFAULT_MIN_INFO_TIME: Duration = Duration::from_secs(5);
/// The default minimum warning message time.
pub const DEFAULT_MIN_WARN_TIME: Duration = Duration::from_secs(20);
/// A guard that logs code execution time when dropped.
#[derive(Debug)]
pub struct CodeTimer {
/// The time that the code started executing.
start: Instant,
/// The minimum duration for info-level messages.
min_info_time: Duration,
/// The minimum duration for warning messages.
min_warn_time: Duration,
/// `true` if this timer has finished.
has_finished: bool,
}
impl CodeTimer {
/// Start timing the execution of a function, method, or other code region.
///
/// Returns a guard that finishes timing the code when dropped,
/// or when [`CodeTimer::finish()`] is called.
#[track_caller]
pub fn start() -> Self {
let start = Instant::now();
trace!(?start, "starting code timer");
Self {
start,
min_info_time: DEFAULT_MIN_INFO_TIME,
min_warn_time: DEFAULT_MIN_WARN_TIME,
has_finished: false,
}
}
/// Finish timing the execution of a function, method, or other code region.
#[track_caller]
pub fn finish<S>(
mut self,
module_path: &'static str,
line: u32,
description: impl Into<Option<S>>,
) where
S: ToString,
{
self.finish_inner(Some(module_path), Some(line), description);
}
/// Finish timing the execution of a function, method, or other code region.
///
/// This private method can be called from [`CodeTimer::finish()`] or `drop()`.
#[track_caller]
fn finish_inner<S>(
&mut self,
module_path: impl Into<Option<&'static str>>,
line: impl Into<Option<u32>>,
description: impl Into<Option<S>>,
) where
S: ToString,
{
if self.has_finished {
return;
}
self.has_finished = true;
let execution = self.start.elapsed();
let execution_time = duration_short(execution);
let module_path = module_path.into();
let line = line.into();
let description = description
.into()
.map(|desc| desc.to_string() + " ")
.unwrap_or_default();
if execution >= self.min_warn_time {
warn!(
?execution_time,
?module_path,
?line,
"{description}code took a long time to execute",
);
} else if execution >= self.min_info_time {
info!(
?execution_time,
?module_path,
?line,
"{description}code took longer than expected to execute",
);
} else {
trace!(
?execution_time,
?module_path,
?line,
"{description}code timer finished",
);
}
}
}
impl Drop for CodeTimer {
#[track_caller]
fn drop(&mut self) {
self.finish_inner(None, None, "(dropped, cancelled, or aborted)")
}
}

View File

@ -8,7 +8,8 @@ use proptest::prelude::*;
use proptest_derive::Arbitrary; use proptest_derive::Arbitrary;
pub mod time; pub mod time;
pub use time::{humantime_milliseconds, humantime_seconds};
pub use time::{duration_short, humantime_milliseconds, humantime_seconds};
/// Wrapper to override `Debug`, redirecting it to only output the type's name. /// Wrapper to override `Debug`, redirecting it to only output the type's name.
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]

View File

@ -2,6 +2,22 @@
use std::time::Duration; use std::time::Duration;
/// The minimum amount of time displayed with only seconds (no milliseconds).
pub const MIN_SECONDS_ONLY_TIME: Duration = Duration::from_secs(5);
/// Returns a human-friendly formatted string for the whole number of seconds in `duration`.
pub fn duration_short(duration: impl Into<Duration>) -> String {
let duration = duration.into();
if duration >= MIN_SECONDS_ONLY_TIME {
humantime_seconds(duration)
} else {
humantime_milliseconds(duration)
}
}
// TODO: rename these functions to duration_*
/// Returns a human-friendly formatted string for the whole number of seconds in `duration`. /// Returns a human-friendly formatted string for the whole number of seconds in `duration`.
pub fn humantime_seconds(duration: impl Into<Duration>) -> String { pub fn humantime_seconds(duration: impl Into<Duration>) -> String {
let duration = duration.into(); let duration = duration.into();

View File

@ -9,15 +9,19 @@
// Required by bitvec! macro // Required by bitvec! macro
#![recursion_limit = "256"] #![recursion_limit = "256"]
#[macro_use]
extern crate bitflags;
#[macro_use] #[macro_use]
extern crate serde; extern crate serde;
#[macro_use] #[macro_use]
extern crate bitflags; extern crate tracing;
pub mod amount; pub mod amount;
pub mod block; pub mod block;
pub mod chain_tip; pub mod chain_tip;
pub mod diagnostic;
pub mod fmt; pub mod fmt;
pub mod history_tree; pub mod history_tree;
pub mod orchard; pub mod orchard;

View File

@ -27,13 +27,14 @@ use std::{
use futures::future::FutureExt; use futures::future::FutureExt;
use tokio::sync::{oneshot, watch}; use tokio::sync::{oneshot, watch};
use tower::{util::BoxService, Service}; use tower::{util::BoxService, Service};
use tracing::instrument; use tracing::{instrument, Instrument, Span};
#[cfg(any(test, feature = "proptest-impl"))] #[cfg(any(test, feature = "proptest-impl"))]
use tower::buffer::Buffer; use tower::buffer::Buffer;
use zebra_chain::{ use zebra_chain::{
block, block,
diagnostic::CodeTimer,
parameters::{Network, NetworkUpgrade}, parameters::{Network, NetworkUpgrade},
transparent, transparent,
}; };
@ -166,12 +167,19 @@ impl StateService {
config: Config, config: Config,
network: Network, network: Network,
) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) { ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
let timer = CodeTimer::start();
let disk = FinalizedState::new(&config, network); let disk = FinalizedState::new(&config, network);
timer.finish(module_path!(), line!(), "opening finalized state database");
let timer = CodeTimer::start();
let initial_tip = disk let initial_tip = disk
.db() .db()
.tip_block() .tip_block()
.map(FinalizedBlock::from) .map(FinalizedBlock::from)
.map(ChainTipBlock::from); .map(ChainTipBlock::from);
timer.finish(module_path!(), line!(), "fetching database tip");
let timer = CodeTimer::start();
let (chain_tip_sender, latest_chain_tip, chain_tip_change) = let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
ChainTipSender::new(initial_tip, network); ChainTipSender::new(initial_tip, network);
@ -192,8 +200,11 @@ impl StateService {
chain_tip_sender, chain_tip_sender,
best_chain_sender, best_chain_sender,
}; };
timer.finish(module_path!(), line!(), "initializing state service");
tracing::info!("starting legacy chain check"); tracing::info!("starting legacy chain check");
let timer = CodeTimer::start();
if let Some(tip) = state.best_tip() { if let Some(tip) = state.best_tip() {
if let Some(nu5_activation_height) = NetworkUpgrade::Nu5.activation_height(network) { if let Some(nu5_activation_height) = NetworkUpgrade::Nu5.activation_height(network) {
if check::legacy_chain( if check::legacy_chain(
@ -216,6 +227,7 @@ impl StateService {
} }
} }
tracing::info!("no legacy chain found"); tracing::info!("no legacy chain found");
timer.finish(module_path!(), line!(), "legacy chain check");
(state, read_only_service, latest_chain_tip, chain_tip_change) (state, read_only_service, latest_chain_tip, chain_tip_change)
} }
@ -754,6 +766,8 @@ impl Service<Request> for StateService {
"type" => "commit_block", "type" => "commit_block",
); );
let timer = CodeTimer::start();
self.assert_block_can_be_validated(&prepared); self.assert_block_can_be_validated(&prepared);
self.pending_utxos self.pending_utxos
@ -768,10 +782,16 @@ impl Service<Request> for StateService {
// Since each block is spawned into its own task, // Since each block is spawned into its own task,
// there shouldn't be any other code running in the same task, // there shouldn't be any other code running in the same task,
// so we don't need to worry about blocking it: // so we don't need to worry about blocking it:
// https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html# // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
let rsp_rx = let span = Span::current();
tokio::task::block_in_place(|| self.queue_and_commit_non_finalized(prepared)); let rsp_rx = tokio::task::block_in_place(move || {
span.in_scope(|| self.queue_and_commit_non_finalized(prepared))
});
// The work is all done, the future just waits on a channel for the result
timer.finish(module_path!(), line!(), "CommitBlock");
let span = Span::current();
async move { async move {
rsp_rx rsp_rx
.await .await
@ -784,6 +804,7 @@ impl Service<Request> for StateService {
.map(Response::Committed) .map(Response::Committed)
.map_err(Into::into) .map_err(Into::into)
} }
.instrument(span)
.boxed() .boxed()
} }
Request::CommitFinalizedBlock(finalized) => { Request::CommitFinalizedBlock(finalized) => {
@ -794,6 +815,8 @@ impl Service<Request> for StateService {
"type" => "commit_finalized_block", "type" => "commit_finalized_block",
); );
let timer = CodeTimer::start();
self.pending_utxos.check_against(&finalized.new_outputs); self.pending_utxos.check_against(&finalized.new_outputs);
// # Performance // # Performance
@ -802,9 +825,15 @@ impl Service<Request> for StateService {
// and written to disk. // and written to disk.
// //
// See the note in `CommitBlock` for more details. // See the note in `CommitBlock` for more details.
let rsp_rx = let span = Span::current();
tokio::task::block_in_place(|| self.queue_and_commit_finalized(finalized)); let rsp_rx = tokio::task::block_in_place(move || {
span.in_scope(|| self.queue_and_commit_finalized(finalized))
});
// The work is all done, the future just waits on a channel for the result
timer.finish(module_path!(), line!(), "CommitFinalizedBlock");
let span = Span::current();
async move { async move {
rsp_rx rsp_rx
.await .await
@ -819,6 +848,7 @@ impl Service<Request> for StateService {
.map(Response::Committed) .map(Response::Committed)
.map_err(Into::into) .map_err(Into::into)
} }
.instrument(span)
.boxed() .boxed()
} }
Request::Depth(hash) => { Request::Depth(hash) => {
@ -829,7 +859,14 @@ impl Service<Request> for StateService {
"type" => "depth", "type" => "depth",
); );
let timer = CodeTimer::start();
// TODO: move this work into the future, like Block and Transaction?
let rsp = Ok(Response::Depth(self.best_depth(hash))); let rsp = Ok(Response::Depth(self.best_depth(hash)));
// The work is all done, the future just returns the result.
timer.finish(module_path!(), line!(), "Depth");
async move { rsp }.boxed() async move { rsp }.boxed()
} }
// TODO: consider spawning small reads into blocking tasks, // TODO: consider spawning small reads into blocking tasks,
@ -842,7 +879,14 @@ impl Service<Request> for StateService {
"type" => "tip", "type" => "tip",
); );
let timer = CodeTimer::start();
// TODO: move this work into the future, like Block and Transaction?
let rsp = Ok(Response::Tip(self.best_tip())); let rsp = Ok(Response::Tip(self.best_tip()));
// The work is all done, the future just returns the result.
timer.finish(module_path!(), line!(), "Tip");
async move { rsp }.boxed() async move { rsp }.boxed()
} }
Request::BlockLocator => { Request::BlockLocator => {
@ -853,9 +897,16 @@ impl Service<Request> for StateService {
"type" => "block_locator", "type" => "block_locator",
); );
let timer = CodeTimer::start();
// TODO: move this work into the future, like Block and Transaction?
let rsp = Ok(Response::BlockLocator( let rsp = Ok(Response::BlockLocator(
self.block_locator().unwrap_or_default(), self.block_locator().unwrap_or_default(),
)); ));
// The work is all done, the future just returns the result.
timer.finish(module_path!(), line!(), "BlockLocator");
async move { rsp }.boxed() async move { rsp }.boxed()
} }
Request::Transaction(hash) => { Request::Transaction(hash) => {
@ -866,6 +917,8 @@ impl Service<Request> for StateService {
"type" => "transaction", "type" => "transaction",
); );
let timer = CodeTimer::start();
// Prepare data for concurrent execution // Prepare data for concurrent execution
let best_chain = self.mem.best_chain().cloned(); let best_chain = self.mem.best_chain().cloned();
let db = self.disk.db().clone(); let db = self.disk.db().clone();
@ -873,10 +926,16 @@ impl Service<Request> for StateService {
// # Performance // # Performance
// //
// Allow other async tasks to make progress while the transaction is being read from disk. // Allow other async tasks to make progress while the transaction is being read from disk.
let span = Span::current();
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let rsp = read::transaction(best_chain, &db, hash); span.in_scope(|| {
let rsp = read::transaction(best_chain, &db, hash);
Ok(Response::Transaction(rsp.map(|(tx, _height)| tx))) // The work is done in the future.
timer.finish(module_path!(), line!(), "Transaction");
Ok(Response::Transaction(rsp.map(|(tx, _height)| tx)))
})
}) })
.map(|join_result| join_result.expect("panic in Request::Transaction")) .map(|join_result| join_result.expect("panic in Request::Transaction"))
.boxed() .boxed()
@ -889,6 +948,8 @@ impl Service<Request> for StateService {
"type" => "block", "type" => "block",
); );
let timer = CodeTimer::start();
// Prepare data for concurrent execution // Prepare data for concurrent execution
let best_chain = self.mem.best_chain().cloned(); let best_chain = self.mem.best_chain().cloned();
let db = self.disk.db().clone(); let db = self.disk.db().clone();
@ -896,10 +957,16 @@ impl Service<Request> for StateService {
// # Performance // # Performance
// //
// Allow other async tasks to make progress while the block is being read from disk. // Allow other async tasks to make progress while the block is being read from disk.
let span = Span::current();
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let rsp = read::block(best_chain, &db, hash_or_height); span.in_scope(move || {
let rsp = read::block(best_chain, &db, hash_or_height);
Ok(Response::Block(rsp)) // The work is done in the future.
timer.finish(module_path!(), line!(), "Block");
Ok(Response::Block(rsp))
})
}) })
.map(|join_result| join_result.expect("panic in Request::Block")) .map(|join_result| join_result.expect("panic in Request::Block"))
.boxed() .boxed()
@ -912,13 +979,19 @@ impl Service<Request> for StateService {
"type" => "await_utxo", "type" => "await_utxo",
); );
let timer = CodeTimer::start();
let span = Span::current();
let fut = self.pending_utxos.queue(outpoint); let fut = self.pending_utxos.queue(outpoint);
if let Some(utxo) = self.any_utxo(&outpoint) { if let Some(utxo) = self.any_utxo(&outpoint) {
self.pending_utxos.respond(&outpoint, utxo); self.pending_utxos.respond(&outpoint, utxo);
} }
fut.boxed() // The future waits on a channel for a response.
timer.finish(module_path!(), line!(), "AwaitUtxo");
fut.instrument(span).boxed()
} }
Request::FindBlockHashes { known_blocks, stop } => { Request::FindBlockHashes { known_blocks, stop } => {
metrics::counter!( metrics::counter!(
@ -929,8 +1002,16 @@ impl Service<Request> for StateService {
); );
const MAX_FIND_BLOCK_HASHES_RESULTS: usize = 500; const MAX_FIND_BLOCK_HASHES_RESULTS: usize = 500;
let timer = CodeTimer::start();
// TODO: move this work into the future, like Block and Transaction?
let res = let res =
self.find_best_chain_hashes(known_blocks, stop, MAX_FIND_BLOCK_HASHES_RESULTS); self.find_best_chain_hashes(known_blocks, stop, MAX_FIND_BLOCK_HASHES_RESULTS);
// The work is all done, the future just returns the result.
timer.finish(module_path!(), line!(), "FindBlockHashes");
async move { Ok(Response::BlockHashes(res)) }.boxed() async move { Ok(Response::BlockHashes(res)) }.boxed()
} }
Request::FindBlockHeaders { known_blocks, stop } => { Request::FindBlockHeaders { known_blocks, stop } => {
@ -951,6 +1032,11 @@ impl Service<Request> for StateService {
// //
// https://github.com/bitcoin/bitcoin/pull/4468/files#r17026905 // https://github.com/bitcoin/bitcoin/pull/4468/files#r17026905
let count = MAX_FIND_BLOCK_HEADERS_RESULTS - 2; let count = MAX_FIND_BLOCK_HEADERS_RESULTS - 2;
let timer = CodeTimer::start();
// TODO: move this work into the future, like Block and Transaction?
// return heights instead, to improve lookup performance?
let res = self.find_best_chain_hashes(known_blocks, stop, count); let res = self.find_best_chain_hashes(known_blocks, stop, count);
// And prepare data for concurrent execution // And prepare data for concurrent execution
@ -961,18 +1047,25 @@ impl Service<Request> for StateService {
// //
// Now we have the chain hashes, we can read the headers concurrently, // Now we have the chain hashes, we can read the headers concurrently,
// which allows other async tasks to make progress while data is being read from disk. // which allows other async tasks to make progress while data is being read from disk.
let span = Span::current();
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let res = res span.in_scope(move || {
.iter() let res = res
.map(|&hash| { .iter()
let header = read::block_header(best_chain.clone(), &db, hash.into()) .map(|&hash| {
.expect("block header for found hash is in the best chain"); let header =
read::block_header(best_chain.clone(), &db, hash.into())
.expect("block header for found hash is in the best chain");
block::CountedHeader { header } block::CountedHeader { header }
}) })
.collect(); .collect();
Ok(Response::BlockHeaders(res)) // Some of the work is done in the future.
timer.finish(module_path!(), line!(), "FindBlockHeaders");
Ok(Response::BlockHeaders(res))
})
}) })
.map(|join_result| join_result.expect("panic in Request::FindBlockHeaders")) .map(|join_result| join_result.expect("panic in Request::FindBlockHeaders"))
.boxed() .boxed()
@ -1003,17 +1096,25 @@ impl Service<ReadRequest> for ReadStateService {
"type" => "block", "type" => "block",
); );
let timer = CodeTimer::start();
let state = self.clone(); let state = self.clone();
// # Performance // # Performance
// //
// Allow other async tasks to make progress while concurrently reading blocks from disk. // Allow other async tasks to make progress while concurrently reading blocks from disk.
let span = Span::current();
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let block = state.best_chain_receiver.with_watch_data(|best_chain| { span.in_scope(move || {
read::block(best_chain, &state.db, hash_or_height) let block = state.best_chain_receiver.with_watch_data(|best_chain| {
}); read::block(best_chain, &state.db, hash_or_height)
});
Ok(ReadResponse::Block(block)) // The work is done in the future.
timer.finish(module_path!(), line!(), "ReadRequest::Block");
Ok(ReadResponse::Block(block))
})
}) })
.map(|join_result| join_result.expect("panic in ReadRequest::Block")) .map(|join_result| join_result.expect("panic in ReadRequest::Block"))
.boxed() .boxed()
@ -1028,18 +1129,26 @@ impl Service<ReadRequest> for ReadStateService {
"type" => "transaction", "type" => "transaction",
); );
let timer = CodeTimer::start();
let state = self.clone(); let state = self.clone();
// # Performance // # Performance
// //
// Allow other async tasks to make progress while concurrently reading transactions from disk. // Allow other async tasks to make progress while concurrently reading transactions from disk.
let span = Span::current();
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let transaction_and_height = span.in_scope(move || {
state.best_chain_receiver.with_watch_data(|best_chain| { let transaction_and_height =
read::transaction(best_chain, &state.db, hash) state.best_chain_receiver.with_watch_data(|best_chain| {
}); read::transaction(best_chain, &state.db, hash)
});
Ok(ReadResponse::Transaction(transaction_and_height)) // The work is done in the future.
timer.finish(module_path!(), line!(), "ReadRequest::Transaction");
Ok(ReadResponse::Transaction(transaction_and_height))
})
}) })
.map(|join_result| join_result.expect("panic in ReadRequest::Transaction")) .map(|join_result| join_result.expect("panic in ReadRequest::Transaction"))
.boxed() .boxed()
@ -1053,17 +1162,26 @@ impl Service<ReadRequest> for ReadStateService {
"type" => "sapling_tree", "type" => "sapling_tree",
); );
let timer = CodeTimer::start();
let state = self.clone(); let state = self.clone();
// # Performance // # Performance
// //
// Allow other async tasks to make progress while concurrently reading trees from disk. // Allow other async tasks to make progress while concurrently reading trees from disk.
let span = Span::current();
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let sapling_tree = state.best_chain_receiver.with_watch_data(|best_chain| { span.in_scope(move || {
read::sapling_tree(best_chain, &state.db, hash_or_height) let sapling_tree =
}); state.best_chain_receiver.with_watch_data(|best_chain| {
read::sapling_tree(best_chain, &state.db, hash_or_height)
});
Ok(ReadResponse::SaplingTree(sapling_tree)) // The work is done in the future.
timer.finish(module_path!(), line!(), "ReadRequest::SaplingTree");
Ok(ReadResponse::SaplingTree(sapling_tree))
})
}) })
.map(|join_result| join_result.expect("panic in ReadRequest::SaplingTree")) .map(|join_result| join_result.expect("panic in ReadRequest::SaplingTree"))
.boxed() .boxed()
@ -1077,17 +1195,26 @@ impl Service<ReadRequest> for ReadStateService {
"type" => "orchard_tree", "type" => "orchard_tree",
); );
let timer = CodeTimer::start();
let state = self.clone(); let state = self.clone();
// # Performance // # Performance
// //
// Allow other async tasks to make progress while concurrently reading trees from disk. // Allow other async tasks to make progress while concurrently reading trees from disk.
let span = Span::current();
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let orchard_tree = state.best_chain_receiver.with_watch_data(|best_chain| { span.in_scope(move || {
read::orchard_tree(best_chain, &state.db, hash_or_height) let orchard_tree =
}); state.best_chain_receiver.with_watch_data(|best_chain| {
read::orchard_tree(best_chain, &state.db, hash_or_height)
});
Ok(ReadResponse::OrchardTree(orchard_tree)) // The work is done in the future.
timer.finish(module_path!(), line!(), "ReadRequest::OrchardTree");
Ok(ReadResponse::OrchardTree(orchard_tree))
})
}) })
.map(|join_result| join_result.expect("panic in ReadRequest::OrchardTree")) .map(|join_result| join_result.expect("panic in ReadRequest::OrchardTree"))
.boxed() .boxed()
@ -1105,17 +1232,29 @@ impl Service<ReadRequest> for ReadStateService {
"type" => "transaction_ids_by_addresses", "type" => "transaction_ids_by_addresses",
); );
let timer = CodeTimer::start();
let state = self.clone(); let state = self.clone();
// # Performance // # Performance
// //
// Allow other async tasks to make progress while concurrently reading transaction IDs from disk. // Allow other async tasks to make progress while concurrently reading transaction IDs from disk.
let span = Span::current();
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let tx_ids = state.best_chain_receiver.with_watch_data(|best_chain| { span.in_scope(move || {
read::transparent_tx_ids(best_chain, &state.db, addresses, height_range) let tx_ids = state.best_chain_receiver.with_watch_data(|best_chain| {
}); read::transparent_tx_ids(best_chain, &state.db, addresses, height_range)
});
tx_ids.map(ReadResponse::AddressesTransactionIds) // The work is done in the future.
timer.finish(
module_path!(),
line!(),
"ReadRequest::TransactionIdsByAddresses",
);
tx_ids.map(ReadResponse::AddressesTransactionIds)
})
}) })
.map(|join_result| { .map(|join_result| {
join_result.expect("panic in ReadRequest::TransactionIdsByAddresses") join_result.expect("panic in ReadRequest::TransactionIdsByAddresses")
@ -1132,17 +1271,25 @@ impl Service<ReadRequest> for ReadStateService {
"type" => "address_balance", "type" => "address_balance",
); );
let timer = CodeTimer::start();
let state = self.clone(); let state = self.clone();
// # Performance // # Performance
// //
// Allow other async tasks to make progress while concurrently reading balances from disk. // Allow other async tasks to make progress while concurrently reading balances from disk.
let span = Span::current();
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let balance = state.best_chain_receiver.with_watch_data(|best_chain| { span.in_scope(move || {
read::transparent_balance(best_chain, &state.db, addresses) let balance = state.best_chain_receiver.with_watch_data(|best_chain| {
})?; read::transparent_balance(best_chain, &state.db, addresses)
})?;
Ok(ReadResponse::AddressBalance(balance)) // The work is done in the future.
timer.finish(module_path!(), line!(), "ReadRequest::AddressBalance");
Ok(ReadResponse::AddressBalance(balance))
})
}) })
.map(|join_result| join_result.expect("panic in ReadRequest::AddressBalance")) .map(|join_result| join_result.expect("panic in ReadRequest::AddressBalance"))
.boxed() .boxed()
@ -1157,17 +1304,25 @@ impl Service<ReadRequest> for ReadStateService {
"type" => "utxos_by_addresses", "type" => "utxos_by_addresses",
); );
let timer = CodeTimer::start();
let state = self.clone(); let state = self.clone();
// # Performance // # Performance
// //
// Allow other async tasks to make progress while concurrently reading UTXOs from disk. // Allow other async tasks to make progress while concurrently reading UTXOs from disk.
let span = Span::current();
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let utxos = state.best_chain_receiver.with_watch_data(|best_chain| { span.in_scope(move || {
read::transparent_utxos(state.network, best_chain, &state.db, addresses) let utxos = state.best_chain_receiver.with_watch_data(|best_chain| {
}); read::transparent_utxos(state.network, best_chain, &state.db, addresses)
});
utxos.map(ReadResponse::Utxos) // The work is done in the future.
timer.finish(module_path!(), line!(), "ReadRequest::UtxosByAddresses");
utxos.map(ReadResponse::Utxos)
})
}) })
.map(|join_result| join_result.expect("panic in ReadRequest::UtxosByAddresses")) .map(|join_result| join_result.expect("panic in ReadRequest::UtxosByAddresses"))
.boxed() .boxed()