Stop reading all the blocks for every FindHashes and FindHeaders request (#4825)

But don't spawn that work concurrently, yet.
This commit is contained in:
teor 2022-07-27 06:21:15 +10:00 committed by GitHub
parent ed553a9eca
commit 15a55ee3f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 181 additions and 93 deletions

View File

@ -18,6 +18,7 @@
use std::{
convert,
future::Future,
ops::{RangeBounds, RangeInclusive},
pin::Pin,
sync::Arc,
task::{Context, Poll},
@ -33,7 +34,7 @@ use tracing::{instrument, Instrument, Span};
use tower::buffer::Buffer;
use zebra_chain::{
block,
block::{self, CountedHeader, Height},
diagnostic::CodeTimer,
parameters::{Network, NetworkUpgrade},
transparent,
@ -553,24 +554,19 @@ impl StateService {
.cloned()
}
/// Returns a list of block hashes in the best chain, following the `intersection` with the best
/// chain. If there is no intersection with the best chain, starts from the genesis hash.
/// Returns a range of [`Height`]s in the best chain,
/// starting after the `intersection` hash on the best chain.
///
/// Includes finalized and non-finalized blocks.
///
/// Stops the list of hashes after:
/// * adding the best tip,
/// * adding the `stop` hash to the list, if it is in the best chain, or
/// * adding `max_len` hashes to the list.
///
/// Returns an empty list if the state is empty,
/// or if the `intersection` is the best chain tip.
pub fn collect_best_chain_hashes(
/// See [`Self::find_best_chain_hashes()`] for details.
fn find_best_chain_height_range(
&self,
intersection: Option<block::Hash>,
stop: Option<block::Hash>,
max_len: usize,
) -> Vec<block::Hash> {
max_len: u32,
) -> impl RangeBounds<u32> + Iterator<Item = u32> {
#[allow(clippy::reversed_empty_ranges)]
const EMPTY_RANGE: RangeInclusive<u32> = 1..=0;
assert!(max_len > 0, "max_len must be at least 1");
// We can get a block locator request before we have downloaded the genesis block
@ -582,25 +578,38 @@ impl StateService {
"responding to peer GetBlocks or GetHeaders with empty state",
);
return Vec::new();
return EMPTY_RANGE;
};
let intersection_height = intersection.map(|hash| {
self.best_height_by_hash(hash)
.expect("the intersection hash must be in the best chain")
});
let max_len_height = if let Some(intersection_height) = intersection_height {
let max_len = i32::try_from(max_len).expect("max_len fits in i32");
// Find the intersection height
let intersection_height = match intersection {
Some(intersection_hash) => match self.best_height_by_hash(intersection_hash) {
Some(intersection_height) => Some(intersection_height),
// start after the intersection_height, and return max_len hashes
(intersection_height + max_len)
.expect("the Find response height does not exceed Height::MAX")
} else {
let max_len = u32::try_from(max_len).expect("max_len fits in u32");
let max_height = block::Height(max_len);
// A recently committed block dropped the intersection we previously found
None => {
info!(
?intersection,
?stop,
?max_len,
"state found intersection but then dropped it, ignoring request",
);
return EMPTY_RANGE;
}
},
// There is no intersection
None => None,
};
// start at genesis, and return max_len hashes
(max_height - 1).expect("max_len is at least 1, and does not exceed Height::MAX + 1")
// Now find the start and maximum heights
let (start_height, max_height) = match intersection_height {
// start after the intersection_height, and return max_len hashes or headers
Some(intersection_height) => (
Height(intersection_height.0 + 1),
Height(intersection_height.0 + max_len),
),
// start at genesis, and return max_len hashes or headers
None => (Height(0), Height(max_len - 1)),
};
let stop_height = stop.and_then(|hash| self.best_height_by_hash(hash));
@ -608,58 +617,142 @@ impl StateService {
// Compute the final height, making sure it is:
// * at or below our chain tip, and
// * at or below the height of the stop hash.
let final_height = std::cmp::min(max_len_height, chain_tip_height);
let final_height = std::cmp::min(max_height, chain_tip_height);
let final_height = stop_height
.map(|stop_height| std::cmp::min(final_height, stop_height))
.unwrap_or(final_height);
let final_hash = self
.best_hash(final_height)
.expect("final height must have a hash");
// We can use an "any chain" method here, because `final_hash` is in the best chain
let mut res: Vec<_> = self
.any_ancestor_blocks(final_hash)
.map(|block| block.hash())
.take_while(|&hash| Some(hash) != intersection)
.inspect(|hash| {
tracing::trace!(
?hash,
height = ?self.best_height_by_hash(*hash)
.expect("if hash is in the state then it should have an associated height"),
"adding hash to peer Find response",
)
})
.collect();
res.reverse();
// TODO: implement Step for Height, when Step stabilises
// https://github.com/rust-lang/rust/issues/42168
let height_range = start_height.0..=final_height.0;
let response_len = height_range.clone().into_iter().count();
tracing::debug!(
?start_height,
?final_height,
response_len = ?res.len(),
?response_len,
?chain_tip_height,
?stop_height,
?intersection_height,
?intersection,
?stop,
?max_len,
"responding to peer GetBlocks or GetHeaders",
);
// Check the function implements the Find protocol
assert!(
res.len() <= max_len,
"a Find response must not exceed the maximum response length"
response_len <= max_len.try_into().expect("fits in usize"),
"a Find response must not exceed the maximum response length",
);
height_range
}
/// Returns a list of [`block::Hash`]es in the best chain,
/// following the `intersection` with the best chain.
///
///
/// See [`Self::find_best_chain_hashes()`] for details.
fn collect_best_chain_hashes(
&self,
intersection: Option<block::Hash>,
stop: Option<block::Hash>,
max_len: u32,
) -> Vec<block::Hash> {
let height_range = self.find_best_chain_height_range(intersection, stop, max_len);
// All the hashes should be in the best chain.
// If they are not, we don't want to return them.
let hashes: Vec<block::Hash> = height_range.into_iter().map_while(|height| {
let hash = self.best_hash(Height(height));
// A recently committed block dropped the intersection we previously found
if hash.is_none() {
info!(
?intersection,
?stop,
?max_len,
"state found height range, but then partially dropped it, returning partial response",
);
}
tracing::trace!(
?hash,
?height,
?intersection,
?stop,
?max_len,
"adding hash to peer Find response",
);
hash
}).collect();
// Check the function implements the Find protocol
assert!(
intersection
.map(|hash| !res.contains(&hash))
.map(|hash| !hashes.contains(&hash))
.unwrap_or(true),
"the list must not contain the intersection hash"
"the list must not contain the intersection hash",
);
if let (Some(stop), Some((_, res_except_last))) = (stop, res.split_last()) {
if let (Some(stop), Some((_, hashes_except_last))) = (stop, hashes.split_last()) {
assert!(
!res_except_last.contains(&stop),
"if the stop hash is in the list, it must be the final hash"
!hashes_except_last.contains(&stop),
"if the stop hash is in the list, it must be the final hash",
);
}
res
hashes
}
/// Returns a list of [`block::Header`]s in the best chain,
/// following the `intersection` with the best chain.
///
/// See [`Self::find_best_chain_hashes()`] for details.
fn collect_best_chain_headers(
&self,
intersection: Option<block::Hash>,
stop: Option<block::Hash>,
max_len: u32,
) -> Vec<Arc<block::Header>> {
let height_range = self.find_best_chain_height_range(intersection, stop, max_len);
// We don't check that this function implements the Find protocol,
// because fetching extra hashes (or re-calculating hashes) is expensive.
// (This was one of the most expensive and longest-running functions in the state.)
// Save a copy of the non-finalized chain state
// (but the finalized state is still concurrently mutable).
let best_chain = self.mem.best_chain().cloned();
let db = self.disk.db().clone();
// All the headers should be in the best chain.
// If they are not, we don't want to return them.
height_range.into_iter().map_while(|height| {
let header = read::block_header(best_chain.clone(), &db, Height(height).into());
// A recently committed block dropped the intersection we previously found
if header.is_none() {
info!(
?intersection,
?stop,
?max_len,
"state found height range, but then partially dropped it, returning partial response",
);
}
tracing::trace!(
?height,
?intersection,
?stop,
?max_len,
"adding header to peer Find response",
);
header
}).collect()
}
/// Finds the first hash that's in the peer's `known_blocks` and the local best chain.
@ -676,17 +769,32 @@ impl StateService {
/// * adding the `stop` hash to the list, if it is in the best chain, or
/// * adding 500 hashes to the list.
///
/// Returns an empty list if the state is empty.
/// Returns an empty list if the state is empty,
/// and a partial or empty list if the found heights are concurrently modified.
pub fn find_best_chain_hashes(
&self,
known_blocks: Vec<block::Hash>,
stop: Option<block::Hash>,
max_len: usize,
max_len: u32,
) -> Vec<block::Hash> {
let intersection = self.find_best_chain_intersection(known_blocks);
self.collect_best_chain_hashes(intersection, stop, max_len)
}
/// Finds the first hash that's in the peer's `known_blocks` and the local best chain.
/// Returns a list of headers that follow that intersection, from the best chain.
///
/// See [`Self::find_best_chain_hashes()`] for details.
pub fn find_best_chain_headers(
&self,
known_blocks: Vec<block::Hash>,
stop: Option<block::Hash>,
max_len: u32,
) -> Vec<Arc<block::Header>> {
let intersection = self.find_best_chain_intersection(known_blocks);
self.collect_best_chain_headers(intersection, stop, max_len)
}
/// Assert some assumptions about the prepared `block` before it is validated.
fn assert_block_can_be_validated(&self, block: &PreparedBlock) {
// required by validate_and_commit, moved here to make testing easier
@ -1001,7 +1109,7 @@ impl Service<Request> for StateService {
"type" => "find_block_hashes",
);
const MAX_FIND_BLOCK_HASHES_RESULTS: usize = 500;
const MAX_FIND_BLOCK_HASHES_RESULTS: u32 = 500;
let timer = CodeTimer::start();
@ -1024,50 +1132,30 @@ impl Service<Request> for StateService {
// Before we spawn the future, get a consistent set of chain hashes from the state.
const MAX_FIND_BLOCK_HEADERS_RESULTS: usize = 160;
const MAX_FIND_BLOCK_HEADERS_RESULTS: u32 = 160;
// Zcashd will blindly request more block headers as long as it
// got 160 block headers in response to a previous query, EVEN
// IF THOSE HEADERS ARE ALREADY KNOWN. To dodge this behavior,
// return slightly fewer than the maximum, to get it to go away.
//
// https://github.com/bitcoin/bitcoin/pull/4468/files#r17026905
let count = MAX_FIND_BLOCK_HEADERS_RESULTS - 2;
let max_len = MAX_FIND_BLOCK_HEADERS_RESULTS - 2;
let timer = CodeTimer::start();
// TODO: move this work into the future, like Block and Transaction?
// return heights instead, to improve lookup performance?
let res = self.find_best_chain_hashes(known_blocks, stop, count);
let res = self.find_best_chain_headers(known_blocks, stop, max_len);
// And prepare data for concurrent execution
let best_chain = self.mem.best_chain().cloned();
let db = self.disk.db().clone();
// The work is all done, the future just returns the result.
timer.finish(module_path!(), line!(), "FindBlockHeaders");
// # Performance
//
// Now we have the chain hashes, we can read the headers concurrently,
// which allows other async tasks to make progress while data is being read from disk.
let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let res = res
.iter()
.map(|&hash| {
let header =
read::block_header(best_chain.clone(), &db, hash.into())
.expect("block header for found hash is in the best chain");
block::CountedHeader { header }
})
.collect();
// Some of the work is done in the future.
timer.finish(module_path!(), line!(), "FindBlockHeaders");
Ok(Response::BlockHeaders(res))
})
})
.map(|join_result| join_result.expect("panic in Request::FindBlockHeaders"))
async move {
Ok(Response::BlockHeaders(
res.into_iter()
.map(|header| CountedHeader { header })
.collect(),
))
}
.boxed()
}
}