Make FindHeaders and FindHashes run concurrently with state updates (#4826)

And speed up the contains_block_hash state query.
This commit is contained in:
teor 2022-07-27 06:26:17 +10:00 committed by GitHub
parent 15a55ee3f2
commit 9b2185ad3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 419 additions and 281 deletions

View File

@ -18,7 +18,6 @@
use std::{
convert,
future::Future,
ops::{RangeBounds, RangeInclusive},
pin::Pin,
sync::Arc,
task::{Context, Poll},
@ -34,7 +33,7 @@ use tracing::{instrument, Instrument, Span};
use tower::buffer::Buffer;
use zebra_chain::{
block::{self, CountedHeader, Height},
block::{self, CountedHeader},
diagnostic::CodeTimer,
parameters::{Network, NetworkUpgrade},
transparent,
@ -489,15 +488,15 @@ impl StateService {
}
/// Return true if `hash` is in the current best chain.
#[allow(dead_code)]
pub fn best_chain_contains(&self, hash: block::Hash) -> bool {
self.best_height_by_hash(hash).is_some()
read::chain_contains_hash(self.mem.best_chain(), self.disk.db(), hash)
}
/// Return the height for the block at `hash`, if `hash` is in the best chain.
#[allow(dead_code)]
pub fn best_height_by_hash(&self, hash: block::Hash) -> Option<block::Height> {
self.mem
.best_height_by_hash(hash)
.or_else(|| self.disk.db().height(hash))
read::height_by_hash(self.mem.best_chain(), self.disk.db(), hash)
}
/// Return the height for the block at `hash` in any chain.
@ -539,262 +538,6 @@ impl StateService {
}
}
/// Find the first hash that's in the peer's `known_blocks` and the local best chain.
///
/// Returns `None` if:
/// * there is no matching hash in the best chain, or
/// * the state is empty.
fn find_best_chain_intersection(&self, known_blocks: Vec<block::Hash>) -> Option<block::Hash> {
// We can get a block locator request before we have downloaded the genesis block
self.best_tip()?;
known_blocks
.iter()
.find(|&&hash| self.best_chain_contains(hash))
.cloned()
}
/// Returns a range of [`Height`]s in the best chain,
/// starting after the `intersection` hash on the best chain.
///
/// See [`Self::find_best_chain_hashes()`] for details.
fn find_best_chain_height_range(
&self,
intersection: Option<block::Hash>,
stop: Option<block::Hash>,
max_len: u32,
) -> impl RangeBounds<u32> + Iterator<Item = u32> {
#[allow(clippy::reversed_empty_ranges)]
const EMPTY_RANGE: RangeInclusive<u32> = 1..=0;
assert!(max_len > 0, "max_len must be at least 1");
// We can get a block locator request before we have downloaded the genesis block
let chain_tip_height = if let Some((height, _)) = self.best_tip() {
height
} else {
tracing::debug!(
response_len = ?0,
"responding to peer GetBlocks or GetHeaders with empty state",
);
return EMPTY_RANGE;
};
// Find the intersection height
let intersection_height = match intersection {
Some(intersection_hash) => match self.best_height_by_hash(intersection_hash) {
Some(intersection_height) => Some(intersection_height),
// A recently committed block dropped the intersection we previously found
None => {
info!(
?intersection,
?stop,
?max_len,
"state found intersection but then dropped it, ignoring request",
);
return EMPTY_RANGE;
}
},
// There is no intersection
None => None,
};
// Now find the start and maximum heights
let (start_height, max_height) = match intersection_height {
// start after the intersection_height, and return max_len hashes or headers
Some(intersection_height) => (
Height(intersection_height.0 + 1),
Height(intersection_height.0 + max_len),
),
// start at genesis, and return max_len hashes or headers
None => (Height(0), Height(max_len - 1)),
};
let stop_height = stop.and_then(|hash| self.best_height_by_hash(hash));
// Compute the final height, making sure it is:
// * at or below our chain tip, and
// * at or below the height of the stop hash.
let final_height = std::cmp::min(max_height, chain_tip_height);
let final_height = stop_height
.map(|stop_height| std::cmp::min(final_height, stop_height))
.unwrap_or(final_height);
// TODO: implement Step for Height, when Step stabilises
// https://github.com/rust-lang/rust/issues/42168
let height_range = start_height.0..=final_height.0;
let response_len = height_range.clone().into_iter().count();
tracing::debug!(
?start_height,
?final_height,
?response_len,
?chain_tip_height,
?stop_height,
?intersection_height,
?intersection,
?stop,
?max_len,
"responding to peer GetBlocks or GetHeaders",
);
// Check the function implements the Find protocol
assert!(
response_len <= max_len.try_into().expect("fits in usize"),
"a Find response must not exceed the maximum response length",
);
height_range
}
/// Returns a list of [`block::Hash`]es in the best chain,
/// following the `intersection` with the best chain.
///
///
/// See [`Self::find_best_chain_hashes()`] for details.
fn collect_best_chain_hashes(
&self,
intersection: Option<block::Hash>,
stop: Option<block::Hash>,
max_len: u32,
) -> Vec<block::Hash> {
let height_range = self.find_best_chain_height_range(intersection, stop, max_len);
// All the hashes should be in the best chain.
// If they are not, we don't want to return them.
let hashes: Vec<block::Hash> = height_range.into_iter().map_while(|height| {
let hash = self.best_hash(Height(height));
// A recently committed block dropped the intersection we previously found
if hash.is_none() {
info!(
?intersection,
?stop,
?max_len,
"state found height range, but then partially dropped it, returning partial response",
);
}
tracing::trace!(
?hash,
?height,
?intersection,
?stop,
?max_len,
"adding hash to peer Find response",
);
hash
}).collect();
// Check the function implements the Find protocol
assert!(
intersection
.map(|hash| !hashes.contains(&hash))
.unwrap_or(true),
"the list must not contain the intersection hash",
);
if let (Some(stop), Some((_, hashes_except_last))) = (stop, hashes.split_last()) {
assert!(
!hashes_except_last.contains(&stop),
"if the stop hash is in the list, it must be the final hash",
);
}
hashes
}
/// Returns a list of [`block::Header`]s in the best chain,
/// following the `intersection` with the best chain.
///
/// See [`Self::find_best_chain_hashes()`] for details.
fn collect_best_chain_headers(
&self,
intersection: Option<block::Hash>,
stop: Option<block::Hash>,
max_len: u32,
) -> Vec<Arc<block::Header>> {
let height_range = self.find_best_chain_height_range(intersection, stop, max_len);
// We don't check that this function implements the Find protocol,
// because fetching extra hashes (or re-calculating hashes) is expensive.
// (This was one of the most expensive and longest-running functions in the state.)
// Save a copy of the non-finalized chain state
// (but the finalized state is still concurrently mutable).
let best_chain = self.mem.best_chain().cloned();
let db = self.disk.db().clone();
// All the headers should be in the best chain.
// If they are not, we don't want to return them.
height_range.into_iter().map_while(|height| {
let header = read::block_header(best_chain.clone(), &db, Height(height).into());
// A recently committed block dropped the intersection we previously found
if header.is_none() {
info!(
?intersection,
?stop,
?max_len,
"state found height range, but then partially dropped it, returning partial response",
);
}
tracing::trace!(
?height,
?intersection,
?stop,
?max_len,
"adding header to peer Find response",
);
header
}).collect()
}
/// Finds the first hash that's in the peer's `known_blocks` and the local best chain.
/// Returns a list of hashes that follow that intersection, from the best chain.
///
/// Starts from the first matching hash in the best chain, ignoring all other hashes in
/// `known_blocks`. If there is no matching hash in the best chain, starts from the genesis
/// hash.
///
/// Includes finalized and non-finalized blocks.
///
/// Stops the list of hashes after:
/// * adding the best tip,
/// * adding the `stop` hash to the list, if it is in the best chain, or
/// * adding 500 hashes to the list.
///
/// Returns an empty list if the state is empty,
/// and a partial or empty list if the found heights are concurrently modified.
pub fn find_best_chain_hashes(
&self,
known_blocks: Vec<block::Hash>,
stop: Option<block::Hash>,
max_len: u32,
) -> Vec<block::Hash> {
let intersection = self.find_best_chain_intersection(known_blocks);
self.collect_best_chain_hashes(intersection, stop, max_len)
}
/// Finds the first hash that's in the peer's `known_blocks` and the local best chain.
/// Returns a list of headers that follow that intersection, from the best chain.
///
/// See [`Self::find_best_chain_hashes()`] for details.
pub fn find_best_chain_headers(
&self,
known_blocks: Vec<block::Hash>,
stop: Option<block::Hash>,
max_len: u32,
) -> Vec<Arc<block::Header>> {
let intersection = self.find_best_chain_intersection(known_blocks);
self.collect_best_chain_headers(intersection, stop, max_len)
}
/// Assert some assumptions about the prepared `block` before it is validated.
fn assert_block_can_be_validated(&self, block: &PreparedBlock) {
// required by validate_and_commit, moved here to make testing easier
@ -1113,14 +856,32 @@ impl Service<Request> for StateService {
let timer = CodeTimer::start();
// TODO: move this work into the future, like Block and Transaction?
let res =
self.find_best_chain_hashes(known_blocks, stop, MAX_FIND_BLOCK_HASHES_RESULTS);
// Prepare data for concurrent execution
let best_chain = self.mem.best_chain().cloned();
let db = self.disk.db().clone();
// The work is all done, the future just returns the result.
timer.finish(module_path!(), line!(), "FindBlockHashes");
// # Performance
//
// Allow other async tasks to make progress while the block is being read from disk.
let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let res = read::find_chain_hashes(
best_chain,
&db,
known_blocks,
stop,
MAX_FIND_BLOCK_HASHES_RESULTS,
);
async move { Ok(Response::BlockHashes(res)) }.boxed()
// The work is done in the future.
timer.finish(module_path!(), line!(), "FindBlockHashes");
Ok(Response::BlockHashes(res))
})
})
.map(|join_result| join_result.expect("panic in Request::Block"))
.boxed()
}
Request::FindBlockHeaders { known_blocks, stop } => {
metrics::counter!(
@ -1143,19 +904,30 @@ impl Service<Request> for StateService {
let timer = CodeTimer::start();
// TODO: move this work into the future, like Block and Transaction?
let res = self.find_best_chain_headers(known_blocks, stop, max_len);
// Prepare data for concurrent execution
let best_chain = self.mem.best_chain().cloned();
let db = self.disk.db().clone();
// The work is all done, the future just returns the result.
timer.finish(module_path!(), line!(), "FindBlockHeaders");
async move {
Ok(Response::BlockHeaders(
res.into_iter()
// # Performance
//
// Allow other async tasks to make progress while the block is being read from disk.
let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let res =
read::find_chain_headers(best_chain, &db, known_blocks, stop, max_len);
let res = res
.into_iter()
.map(|header| CountedHeader { header })
.collect(),
))
}
.collect();
// The work is done in the future.
timer.finish(module_path!(), line!(), "FindBlockHeaders");
Ok(Response::BlockHeaders(res))
})
})
.map(|join_result| join_result.expect("panic in Request::Block"))
.boxed()
}
}

View File

@ -73,6 +73,14 @@ impl ZebraDb {
self.db.zs_get(&hash_by_height, &height)
}
/// Returns `true` if `hash` is present in the finalized state.
#[allow(clippy::unwrap_in_result)]
pub fn contains_hash(&self, hash: block::Hash) -> bool {
let height_by_hash = self.db.cf_handle("height_by_hash").unwrap();
self.db.zs_contains(&height_by_hash, &hash)
}
/// Returns the height of the given block if it exists.
#[allow(clippy::unwrap_in_result)]
pub fn height(&self, hash: block::Hash) -> Option<block::Height> {

View File

@ -457,6 +457,18 @@ impl Chain {
.get(tx_loc.index.as_usize())
}
/// Returns the [`block::Hash`] for `height`, if it exists in this chain.
pub fn hash_by_height(&self, height: Height) -> Option<block::Hash> {
let hash = self.blocks.get(&height)?.hash;
Some(hash)
}
/// Returns the [`Height`] for `hash`, if it exists in this chain.
pub fn height_by_hash(&self, hash: block::Hash) -> Option<Height> {
self.height_by_hash.get(&hash).cloned()
}
/// Returns the non-finalized tip block hash and height.
#[allow(dead_code)]
pub fn non_finalized_tip(&self) -> (block::Hash, block::Height) {

View File

@ -11,7 +11,7 @@
//! [5]: super::Chain
use std::{
collections::{BTreeMap, BTreeSet, HashSet},
ops::RangeInclusive,
ops::{RangeBounds, RangeInclusive},
sync::Arc,
};
@ -50,6 +50,8 @@ const FINALIZED_ADDRESS_INDEX_RETRIES: usize = 3;
/// so they are not included in any address indexes.
pub const ADDRESS_HEIGHTS_FULL_RANGE: RangeInclusive<Height> = Height(1)..=Height::MAX;
// Blocks and Transactions
/// Returns the [`Block`] with [`block::Hash`](zebra_chain::block::Hash) or
/// [`Height`], if it exists in the non-finalized `chain` or finalized `db`.
pub(crate) fn block<C>(
@ -108,7 +110,7 @@ pub(crate) fn transaction<C>(
chain: Option<C>,
db: &ZebraDb,
hash: transaction::Hash,
) -> Option<(Arc<Transaction>, block::Height)>
) -> Option<(Arc<Transaction>, Height)>
where
C: AsRef<Chain>,
{
@ -131,6 +133,344 @@ where
.or_else(|| db.transaction(hash))
}
// FindBlockHeaders / FindBlockHashes
/// Returns the tip of `chain`.
/// If there is no chain, returns the tip of `db`.
pub(crate) fn tip_height<C>(chain: Option<C>, db: &ZebraDb) -> Option<Height>
where
C: AsRef<Chain>,
{
chain
.map(|chain| chain.as_ref().non_finalized_tip_height())
.or_else(|| db.finalized_tip_height())
}
/// Return the height for the block at `hash`, if `hash` is in the chain.
pub fn height_by_hash<C>(chain: Option<C>, db: &ZebraDb, hash: block::Hash) -> Option<Height>
where
C: AsRef<Chain>,
{
chain
.and_then(|chain| chain.as_ref().height_by_hash(hash))
.or_else(|| db.height(hash))
}
/// Return the hash for the block at `height`, if `height` is in the chain.
pub fn hash_by_height<C>(chain: Option<C>, db: &ZebraDb, height: Height) -> Option<block::Hash>
where
C: AsRef<Chain>,
{
chain
.and_then(|chain| chain.as_ref().hash_by_height(height))
.or_else(|| db.hash(height))
}
/// Return true if `hash` is in the chain.
pub fn chain_contains_hash<C>(chain: Option<C>, db: &ZebraDb, hash: block::Hash) -> bool
where
C: AsRef<Chain>,
{
chain
.map(|chain| chain.as_ref().height_by_hash.contains_key(&hash))
.unwrap_or(false)
|| db.contains_hash(hash)
}
/// Find the first hash that's in the peer's `known_blocks` and the chain.
///
/// Returns `None` if:
/// * there is no matching hash in the chain, or
/// * the state is empty.
fn find_chain_intersection<C>(
chain: Option<C>,
db: &ZebraDb,
known_blocks: Vec<block::Hash>,
) -> Option<block::Hash>
where
C: AsRef<Chain>,
{
// We can get a block locator request before we have downloaded the genesis block
if chain.is_none() && db.is_empty() {
return None;
}
let chain = chain.as_ref();
known_blocks
.iter()
.find(|&&hash| chain_contains_hash(chain, db, hash))
.cloned()
}
/// Returns a range of [`Height`]s in the chain,
/// starting after the `intersection` hash on the chain.
///
/// See [`find_chain_hashes()`] for details.
fn find_chain_height_range<C>(
chain: Option<C>,
db: &ZebraDb,
intersection: Option<block::Hash>,
stop: Option<block::Hash>,
max_len: u32,
) -> impl RangeBounds<u32> + Iterator<Item = u32>
where
C: AsRef<Chain>,
{
#[allow(clippy::reversed_empty_ranges)]
const EMPTY_RANGE: RangeInclusive<u32> = 1..=0;
assert!(max_len > 0, "max_len must be at least 1");
let chain = chain.as_ref();
// We can get a block locator request before we have downloaded the genesis block
let chain_tip_height = if let Some(height) = tip_height(chain, db) {
height
} else {
tracing::debug!(
response_len = ?0,
"responding to peer GetBlocks or GetHeaders with empty state",
);
return EMPTY_RANGE;
};
// Find the intersection height
let intersection_height = match intersection {
Some(intersection_hash) => match height_by_hash(chain, db, intersection_hash) {
Some(intersection_height) => Some(intersection_height),
// A recently committed block dropped the intersection we previously found
None => {
info!(
?intersection,
?stop,
?max_len,
"state found intersection but then dropped it, ignoring request",
);
return EMPTY_RANGE;
}
},
// There is no intersection
None => None,
};
// Now find the start and maximum heights
let (start_height, max_height) = match intersection_height {
// start after the intersection_height, and return max_len hashes or headers
Some(intersection_height) => (
Height(intersection_height.0 + 1),
Height(intersection_height.0 + max_len),
),
// start at genesis, and return max_len hashes or headers
None => (Height(0), Height(max_len - 1)),
};
let stop_height = stop.and_then(|hash| height_by_hash(chain, db, hash));
// Compute the final height, making sure it is:
// * at or below our chain tip, and
// * at or below the height of the stop hash.
let final_height = std::cmp::min(max_height, chain_tip_height);
let final_height = stop_height
.map(|stop_height| std::cmp::min(final_height, stop_height))
.unwrap_or(final_height);
// TODO: implement Step for Height, when Step stabilises
// https://github.com/rust-lang/rust/issues/42168
let height_range = start_height.0..=final_height.0;
let response_len = height_range.clone().into_iter().count();
tracing::debug!(
?start_height,
?final_height,
?response_len,
?chain_tip_height,
?stop_height,
?intersection_height,
?intersection,
?stop,
?max_len,
"responding to peer GetBlocks or GetHeaders",
);
// Check the function implements the Find protocol
assert!(
response_len <= max_len.try_into().expect("fits in usize"),
"a Find response must not exceed the maximum response length",
);
height_range
}
/// Returns a list of [`block::Hash`]es in the chain,
/// following the `intersection` with the chain.
///
///
/// See [`find_chain_hashes()`] for details.
fn collect_chain_hashes<C>(
chain: Option<C>,
db: &ZebraDb,
intersection: Option<block::Hash>,
stop: Option<block::Hash>,
max_len: u32,
) -> Vec<block::Hash>
where
C: AsRef<Chain>,
{
let chain = chain.as_ref();
let height_range = find_chain_height_range(chain, db, intersection, stop, max_len);
// All the hashes should be in the chain.
// If they are not, we don't want to return them.
let hashes: Vec<block::Hash> = height_range.into_iter().map_while(|height| {
let hash = hash_by_height(chain, db, Height(height));
// A recently committed block dropped the intersection we previously found.
if hash.is_none() {
info!(
?intersection,
?stop,
?max_len,
"state found height range, but then partially dropped it, returning partial response",
);
}
tracing::trace!(
?hash,
?height,
?intersection,
?stop,
?max_len,
"adding hash to peer Find response",
);
hash
}).collect();
// Check the function implements the Find protocol
assert!(
intersection
.map(|hash| !hashes.contains(&hash))
.unwrap_or(true),
"the list must not contain the intersection hash",
);
if let (Some(stop), Some((_, hashes_except_last))) = (stop, hashes.split_last()) {
assert!(
!hashes_except_last.contains(&stop),
"if the stop hash is in the list, it must be the final hash",
);
}
hashes
}
/// Returns a list of [`block::Header`]s in the chain,
/// following the `intersection` with the chain.
///
/// See [`find_chain_hashes()`] for details.
fn collect_chain_headers<C>(
chain: Option<C>,
db: &ZebraDb,
intersection: Option<block::Hash>,
stop: Option<block::Hash>,
max_len: u32,
) -> Vec<Arc<block::Header>>
where
C: AsRef<Chain>,
{
let chain = chain.as_ref();
let height_range = find_chain_height_range(chain, db, intersection, stop, max_len);
// We don't check that this function implements the Find protocol,
// because fetching extra hashes (or re-calculating hashes) is expensive.
// (This was one of the most expensive and longest-running functions in the state.)
// All the headers should be in the chain.
// If they are not, we don't want to return them.
height_range.into_iter().map_while(|height| {
let header = block_header(chain, db, Height(height).into());
// A recently committed block dropped the intersection we previously found
if header.is_none() {
info!(
?intersection,
?stop,
?max_len,
"state found height range, but then partially dropped it, returning partial response",
);
}
tracing::trace!(
?height,
?intersection,
?stop,
?max_len,
"adding header to peer Find response",
);
header
}).collect()
}
/// Finds the first hash that's in the peer's `known_blocks` and the chain.
/// Returns a list of hashes that follow that intersection, from the chain.
///
/// Starts from the first matching hash in the chain, ignoring all other hashes in
/// `known_blocks`. If there is no matching hash in the chain, starts from the genesis
/// hash.
///
/// Includes finalized and non-finalized blocks.
///
/// Stops the list of hashes after:
/// * adding the tip,
/// * adding the `stop` hash to the list, if it is in the chain, or
/// * adding 500 hashes to the list.
///
/// Returns an empty list if the state is empty,
/// and a partial or empty list if the found heights are concurrently modified.
pub fn find_chain_hashes<C>(
chain: Option<C>,
db: &ZebraDb,
known_blocks: Vec<block::Hash>,
stop: Option<block::Hash>,
max_len: u32,
) -> Vec<block::Hash>
where
C: AsRef<Chain>,
{
let chain = chain.as_ref();
let intersection = find_chain_intersection(chain, db, known_blocks);
collect_chain_hashes(chain, db, intersection, stop, max_len)
}
/// Finds the first hash that's in the peer's `known_blocks` and the chain.
/// Returns a list of headers that follow that intersection, from the chain.
///
/// See [`find_chain_hashes()`] for details.
pub fn find_chain_headers<C>(
chain: Option<C>,
db: &ZebraDb,
known_blocks: Vec<block::Hash>,
stop: Option<block::Hash>,
max_len: u32,
) -> Vec<Arc<block::Header>>
where
C: AsRef<Chain>,
{
let chain = chain.as_ref();
let intersection = find_chain_intersection(chain, db, known_blocks);
collect_chain_headers(chain, db, intersection, stop, max_len)
}
// Note Commitment Trees
/// Returns the Sapling
/// [`NoteCommitmentTree`](sapling::tree::NoteCommitmentTree) specified by a
/// hash or height, if it exists in the non-finalized `chain` or finalized `db`.
@ -181,6 +521,8 @@ where
.or_else(|| db.orchard_tree(hash_or_height))
}
// Address Balance
/// Returns the total transparent balance for the supplied [`transparent::Address`]es.
///
/// If the addresses do not exist in the non-finalized `chain` or finalized `db`, returns zero.
@ -301,6 +643,8 @@ fn apply_balance_change(
balance?.constrain()
}
// Address UTXOs
/// Returns the unspent transparent outputs (UTXOs) for the supplied [`transparent::Address`]es,
/// in chain order; and the transaction IDs for the transactions containing those UTXOs.
///
@ -633,6 +977,8 @@ where
.collect()
}
// Address TX IDs
/// Returns the transaction IDs that sent or received funds from the supplied [`transparent::Address`]es,
/// within `query_height_range`, in chain order.
///