From 00eee8652e14b1e381258916b54e617380114b1f Mon Sep 17 00:00:00 2001 From: teor Date: Wed, 14 Sep 2022 10:35:37 +1000 Subject: [PATCH] 1. change(state): Run most StateService read requests without shared mutable chain state (#5132) * Add TODOs for finalized blocks, non-finalized blocks, and reads * Document how state requests use shared state * Add a ReadStateService to the StateService And cleanup service struct fields. * Redirect Block and Transaction Requests to the ReadStateService * Put AddressBalance in a consistent enum order * Turn a repeated comment into documentation * Tidy doc links * Run Tip requests concurrently * Remove some redundant timers * Run Depth requests concurrently * Run BlockLocator requests concurrently * Move BlockLocator tests * Run FindBlockHashes requests concurrently * Run FindBlockHeaders requests concurrently * Use a constant in documentation Co-authored-by: Marek * Link that constant correctly * Expand block_locator() documentation * Clarify the difference between tower::Buffers and the state's ordered queues * Explain block locators better Co-authored-by: Marek --- zebra-state/src/constants.rs | 19 + zebra-state/src/lib.rs | 1 - zebra-state/src/request.rs | 132 +++- zebra-state/src/response.rs | 81 ++- zebra-state/src/service.rs | 606 +++++++++++------- .../service/finalized_state/zebra_db/block.rs | 3 +- .../src/service/non_finalized_state.rs | 2 + .../src/service/non_finalized_state/chain.rs | 7 +- zebra-state/src/service/read.rs | 4 +- zebra-state/src/service/read/find.rs | 122 +++- zebra-state/src/service/read/find/tests.rs | 3 + .../src/service/read/find/tests/vectors.rs | 65 ++ zebra-state/src/tests.rs | 65 +- zebra-state/src/util.rs | 28 - 14 files changed, 744 insertions(+), 394 deletions(-) create mode 100644 zebra-state/src/service/read/find/tests.rs create mode 100644 zebra-state/src/service/read/find/tests/vectors.rs delete mode 100644 zebra-state/src/util.rs diff --git a/zebra-state/src/constants.rs b/zebra-state/src/constants.rs index f4cf6a431..1b3f75f7f 100644 --- a/zebra-state/src/constants.rs +++ b/zebra-state/src/constants.rs @@ -24,6 +24,25 @@ pub const DATABASE_FORMAT_VERSION: u32 = 25; /// before we assume we are on a pre-NU5 legacy chain. pub const MAX_LEGACY_CHAIN_BLOCKS: usize = 1000; +/// The maximum number of block hashes allowed in `getblocks` responses in the Zcash network protocol. +pub const MAX_FIND_BLOCK_HASHES_RESULTS: u32 = 500; + +/// The maximum number of block headers allowed in `getheaders` responses in the Zcash network protocol. +const MAX_FIND_BLOCK_HEADERS_RESULTS_FOR_PROTOCOL: u32 = 160; + +/// The maximum number of block headers sent by Zebra in `getheaders` responses. +/// +/// Older versions of Zcashd will blindly request more block headers as long as it +/// got 160 block headers in response to a previous query, +/// _even if those headers are already known_. +/// +/// To avoid this behavior, return slightly fewer than the maximum, +/// so `zcashd` thinks it has reached our chain tip. +/// +/// +pub const MAX_FIND_BLOCK_HEADERS_RESULTS_FOR_ZEBRA: u32 = + MAX_FIND_BLOCK_HEADERS_RESULTS_FOR_PROTOCOL - 2; + use lazy_static::lazy_static; use regex::Regex; diff --git a/zebra-state/src/lib.rs b/zebra-state/src/lib.rs index 86d87d9ae..7b81d554f 100644 --- a/zebra-state/src/lib.rs +++ b/zebra-state/src/lib.rs @@ -23,7 +23,6 @@ mod error; mod request; mod response; mod service; -mod util; #[cfg(test)] mod tests; diff --git a/zebra-state/src/request.rs b/zebra-state/src/request.rs index e40db4f53..c507d2c07 100644 --- a/zebra-state/src/request.rs +++ b/zebra-state/src/request.rs @@ -19,10 +19,13 @@ use zebra_chain::{ value_balance::{ValueBalance, ValueBalanceError}, }; -/// Allow *only* this unused import, so that rustdoc link resolution +/// Allow *only* these unused imports, so that rustdoc link resolution /// will work with inline links. #[allow(unused_imports)] -use crate::Response; +use crate::{ + constants::{MAX_FIND_BLOCK_HASHES_RESULTS, MAX_FIND_BLOCK_HEADERS_RESULTS_FOR_ZEBRA}, + ReadResponse, Response, +}; /// Identify a block by hash or height. /// @@ -408,7 +411,8 @@ pub enum Request { /// * [`Response::Depth(None)`](Response::Depth) otherwise. Depth(block::Hash), - /// Returns [`Response::Tip`] with the current best chain tip. + /// Returns [`Response::Tip(Option<(Height, block::Hash)>)`](Response::Tip) + /// with the current best chain tip. Tip, /// Computes a block locator object based on the current best chain. @@ -487,7 +491,7 @@ pub enum Request { /// Stops the list of headers after: /// * adding the best tip, /// * adding the header matching the `stop` hash to the list, if it is in the best chain, or - /// * adding 160 headers to the list. + /// * adding [`MAX_FIND_BLOCK_HEADERS_RESULTS_FOR_ZEBRA`] headers to the list. /// /// Returns an empty list if the state is empty. /// @@ -507,12 +511,24 @@ pub enum Request { /// A read-only query about the chain state, via the /// [`ReadStateService`](crate::service::ReadStateService). pub enum ReadRequest { + /// Returns [`ReadResponse::Tip(Option<(Height, block::Hash)>)`](ReadResponse::Tip) + /// with the current best chain tip. + Tip, + + /// Computes the depth in the current best chain of the block identified by the given hash. + /// + /// Returns + /// + /// * [`ReadResponse::Depth(Some(depth))`](ReadResponse::Depth) if the block is in the best chain; + /// * [`ReadResponse::Depth(None)`](ReadResponse::Depth) otherwise. + Depth(block::Hash), + /// Looks up a block by hash or height in the current best chain. /// /// Returns /// - /// * [`Response::Block(Some(Arc))`](Response::Block) if the block is in the best chain; - /// * [`Response::Block(None)`](Response::Block) otherwise. + /// * [`ReadResponse::Block(Some(Arc))`](ReadResponse::Block) if the block is in the best chain; + /// * [`ReadResponse::Block(None)`](ReadResponse::Block) otherwise. /// /// Note: the [`HashOrHeight`] can be constructed from a [`block::Hash`] or /// [`block::Height`] using `.into()`. @@ -522,15 +538,66 @@ pub enum ReadRequest { /// /// Returns /// - /// * [`Response::Transaction(Some(Arc))`](Response::Transaction) if the transaction is in the best chain; - /// * [`Response::Transaction(None)`](Response::Transaction) otherwise. + /// * [`ReadResponse::Transaction(Some(Arc))`](ReadResponse::Transaction) if the transaction is in the best chain; + /// * [`ReadResponse::Transaction(None)`](ReadResponse::Transaction) otherwise. Transaction(transaction::Hash), - /// Looks up the balance of a set of transparent addresses. + /// Computes a block locator object based on the current best chain. /// - /// Returns an [`Amount`](zebra_chain::amount::Amount) with the total - /// balance of the set of addresses. - AddressBalance(HashSet), + /// Returns [`ReadResponse::BlockLocator`] with hashes starting + /// from the best chain tip, and following the chain of previous + /// hashes. The first hash is the best chain tip. The last hash is + /// the tip of the finalized portion of the state. Block locators + /// are not continuous - some intermediate hashes might be skipped. + /// + /// If the state is empty, the block locator is also empty. + BlockLocator, + + /// Finds the first hash that's in the peer's `known_blocks` and the local best chain. + /// Returns a list of hashes that follow that intersection, from the best chain. + /// + /// If there is no matching hash in the best chain, starts from the genesis hash. + /// + /// Stops the list of hashes after: + /// * adding the best tip, + /// * adding the `stop` hash to the list, if it is in the best chain, or + /// * adding [`MAX_FIND_BLOCK_HASHES_RESULTS`] hashes to the list. + /// + /// Returns an empty list if the state is empty. + /// + /// Returns + /// + /// [`ReadResponse::BlockHashes(Vec)`](ReadResponse::BlockHashes). + /// See + FindBlockHashes { + /// Hashes of known blocks, ordered from highest height to lowest height. + known_blocks: Vec, + /// Optionally, the last block hash to request. + stop: Option, + }, + + /// Finds the first hash that's in the peer's `known_blocks` and the local best chain. + /// Returns a list of headers that follow that intersection, from the best chain. + /// + /// If there is no matching hash in the best chain, starts from the genesis header. + /// + /// Stops the list of headers after: + /// * adding the best tip, + /// * adding the header matching the `stop` hash to the list, if it is in the best chain, or + /// * adding [`MAX_FIND_BLOCK_HEADERS_RESULTS_FOR_ZEBRA`] headers to the list. + /// + /// Returns an empty list if the state is empty. + /// + /// Returns + /// + /// [`ReadResponse::BlockHeaders(Vec)`](ReadResponse::BlockHeaders). + /// See + FindBlockHeaders { + /// Hashes of known blocks, ordered from highest height to lowest height. + known_blocks: Vec, + /// Optionally, the hash of the last header to request. + stop: Option, + }, /// Looks up a Sapling note commitment tree either by a hash or height. /// @@ -550,13 +617,19 @@ pub enum ReadRequest { /// * [`ReadResponse::OrchardTree(None)`](crate::ReadResponse::OrchardTree) otherwise. OrchardTree(HashOrHeight), + /// Looks up the balance of a set of transparent addresses. + /// + /// Returns an [`Amount`](zebra_chain::amount::Amount) with the total + /// balance of the set of addresses. + AddressBalance(HashSet), + /// Looks up transaction hashes that were sent or received from addresses, /// in an inclusive blockchain height range. /// /// Returns /// - /// * A set of transaction hashes. - /// * An empty vector if no transactions were found for the given arguments. + /// * An ordered, unique map of transaction locations and hashes. + /// * An empty map if no transactions were found for the given arguments. /// /// Returned txids are in the order they appear in blocks, /// which ensures that they are topologically sorted @@ -574,3 +647,34 @@ pub enum ReadRequest { /// Returns a type with found utxos and transaction information. UtxosByAddresses(HashSet), } + +/// Conversion from read-write [`Request`]s to read-only [`ReadRequest`]s. +/// +/// Used to dispatch read requests concurrently from the [`StateService`](crate::service::StateService). +impl TryFrom for ReadRequest { + type Error = &'static str; + + fn try_from(request: Request) -> Result { + match request { + Request::Tip => Ok(ReadRequest::Tip), + Request::Depth(hash) => Ok(ReadRequest::Depth(hash)), + + Request::Block(hash_or_height) => Ok(ReadRequest::Block(hash_or_height)), + Request::Transaction(tx_hash) => Ok(ReadRequest::Transaction(tx_hash)), + + Request::AwaitUtxo(_) => unimplemented!("use StoredUtxo here"), + + Request::BlockLocator => Ok(ReadRequest::BlockLocator), + Request::FindBlockHashes { known_blocks, stop } => { + Ok(ReadRequest::FindBlockHashes { known_blocks, stop }) + } + Request::FindBlockHeaders { known_blocks, stop } => { + Ok(ReadRequest::FindBlockHeaders { known_blocks, stop }) + } + + Request::CommitBlock(_) | Request::CommitFinalizedBlock(_) => { + Err("ReadService does not write blocks") + } + } + } +} diff --git a/zebra-state/src/response.rs b/zebra-state/src/response.rs index 7c1eae40b..f79338166 100644 --- a/zebra-state/src/response.rs +++ b/zebra-state/src/response.rs @@ -10,18 +10,15 @@ use zebra_chain::{ transparent, }; -// Allow *only* this unused import, so that rustdoc link resolution +// Allow *only* these unused imports, so that rustdoc link resolution // will work with inline links. #[allow(unused_imports)] -use crate::Request; +use crate::{ReadRequest, Request}; use crate::{service::read::AddressUtxos, TransactionLocation}; #[derive(Clone, Debug, PartialEq, Eq)] -/// A response to a [`StateService`][1] [`Request`][2]. -/// -/// [1]: crate::service::StateService -/// [2]: crate::Request +/// A response to a [`StateService`](crate::service::StateService) [`Request`]. pub enum Response { /// Response to [`Request::CommitBlock`] indicating that a block was /// successfully committed to the state. @@ -57,37 +54,71 @@ pub enum Response { /// [`ReadStateService`](crate::service::ReadStateService)'s /// [`ReadRequest`](crate::ReadRequest). pub enum ReadResponse { - /// Response to [`ReadRequest::Block`](crate::ReadRequest::Block) with the - /// specified block. + /// Response to [`ReadRequest::Tip`] with the current best chain tip. + Tip(Option<(block::Height, block::Hash)>), + + /// Response to [`ReadRequest::Depth`] with the depth of the specified block. + Depth(Option), + + /// Response to [`ReadRequest::Block`] with the specified block. Block(Option>), - /// Response to - /// [`ReadRequest::Transaction`](crate::ReadRequest::Transaction) with the - /// specified transaction. + /// Response to [`ReadRequest::Transaction`] with the specified transaction. Transaction(Option<(Arc, block::Height)>), - /// Response to - /// [`ReadRequest::SaplingTree`](crate::ReadRequest::SaplingTree) with the - /// specified Sapling note commitment tree. + /// Response to [`ReadRequest::BlockLocator`] with a block locator object. + BlockLocator(Vec), + + /// The response to a `FindBlockHashes` request. + BlockHashes(Vec), + + /// The response to a `FindBlockHeaders` request. + BlockHeaders(Vec), + + /// Response to [`ReadRequest::SaplingTree`] with the specified Sapling note commitment tree. SaplingTree(Option>), - /// Response to - /// [`ReadRequest::OrchardTree`](crate::ReadRequest::OrchardTree) with the - /// specified Orchard note commitment tree. + /// Response to [`ReadRequest::OrchardTree`] with the specified Orchard note commitment tree. OrchardTree(Option>), - /// Response to - /// [`ReadRequest::AddressBalance`](crate::ReadRequest::AddressBalance) with - /// the total balance of the addresses. + /// Response to [`ReadRequest::AddressBalance`] with the total balance of the addresses. AddressBalance(Amount), - /// Response to - /// [`ReadRequest::TransactionIdsByAddresses`](crate::ReadRequest::TransactionIdsByAddresses) + /// Response to [`ReadRequest::TransactionIdsByAddresses`] /// with the obtained transaction ids, in the order they appear in blocks. AddressesTransactionIds(BTreeMap), - /// Response to - /// [`ReadRequest::UtxosByAddresses`](crate::ReadRequest::UtxosByAddresses) - /// with found utxos and transaction data. + /// Response to [`ReadRequest::UtxosByAddresses`] with found utxos and transaction data. Utxos(AddressUtxos), } + +/// Conversion from read-only [`ReadResponse`]s to read-write [`Response`]s. +/// +/// Used to return read requests concurrently from the [`StateService`](crate::service::StateService). +impl TryFrom for Response { + type Error = &'static str; + + fn try_from(response: ReadResponse) -> Result { + match response { + ReadResponse::Tip(height_and_hash) => Ok(Response::Tip(height_and_hash)), + ReadResponse::Depth(depth) => Ok(Response::Depth(depth)), + + ReadResponse::Block(block) => Ok(Response::Block(block)), + ReadResponse::Transaction(tx_and_height) => { + Ok(Response::Transaction(tx_and_height.map(|(tx, _height)| tx))) + } + + ReadResponse::BlockLocator(hashes) => Ok(Response::BlockLocator(hashes)), + ReadResponse::BlockHashes(hashes) => Ok(Response::BlockHashes(hashes)), + ReadResponse::BlockHeaders(headers) => Ok(Response::BlockHeaders(headers)), + + ReadResponse::SaplingTree(_) => unimplemented!(), + ReadResponse::OrchardTree(_) => unimplemented!(), + + ReadResponse::AddressBalance(_) => unimplemented!(), + ReadResponse::AddressesTransactionIds(_) => unimplemented!(), + // TODO: Rename to AddressUtxos + ReadResponse::Utxos(_) => unimplemented!(), + } + } +} diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 7740fdda3..4a29f612a 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -26,7 +26,7 @@ use std::{ use futures::future::FutureExt; use tokio::sync::{oneshot, watch}; -use tower::{util::BoxService, Service}; +use tower::{util::BoxService, Service, ServiceExt}; use tracing::{instrument, Instrument, Span}; #[cfg(any(test, feature = "proptest-impl"))] @@ -40,6 +40,7 @@ use zebra_chain::{ }; use crate::{ + constants::{MAX_FIND_BLOCK_HASHES_RESULTS, MAX_FIND_BLOCK_HEADERS_RESULTS_FOR_ZEBRA}, service::{ chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip}, finalized_state::{FinalizedState, ZebraDb}, @@ -89,40 +90,55 @@ pub type QueuedFinalized = ( /// Zebra stores the single best chain in the finalized state, /// and re-loads it from disk when restarted. /// -/// Requests to this service are processed in series, -/// so read requests wait for all queued write requests to complete, -/// then return their answers. +/// Read requests to this service are buffered, then processed concurrently. +/// Block write requests are buffered, then queued, then processed in order by a separate task. /// -/// This behaviour is implicitly used by Zebra's syncer, -/// to delay the next ObtainTips until all queued blocks have been committed. +/// Most state users can get faster read responses using the [`ReadStateService`], +/// because its requests do not share a [`tower::buffer::Buffer`] with block write requests. /// -/// But most state users can ignore any queued blocks, and get faster read responses -/// using the [`ReadStateService`]. +/// To quickly get the latest block, use [`LatestChainTip`] or [`ChainTipChange`]. +/// They can read the latest block directly, without queueing any requests. #[derive(Debug)] pub(crate) struct StateService { + // Configuration + // + /// The configured Zcash network. + network: Network, + + // Exclusively Writeable State + // /// The finalized chain state, including its on-disk database. pub(crate) disk: FinalizedState, /// The non-finalized chain state, including its in-memory chain forks. mem: NonFinalizedState, - /// The configured Zcash network. - network: Network, - + // Queued Non-Finalized Blocks + // /// Blocks awaiting their parent blocks for contextual verification. queued_blocks: QueuedBlocks, + // Pending UTXO Request Tracking + // /// The set of outpoints with pending requests for their associated transparent::Output. pending_utxos: PendingUtxos, /// Instant tracking the last time `pending_utxos` was pruned. last_prune: Instant, - /// A sender channel for the current best chain tip. + // Concurrently Readable State + // + /// A sender channel used to update the current best chain tip for + /// [`LatestChainTip`] and [`ChainTipChange`]. chain_tip_sender: ChainTipSender, - /// A sender channel for the current best non-finalized chain. + /// A sender channel used to update the current best non-finalized chain for [`ReadStateService`]. best_chain_sender: watch::Sender>>, + + /// A cloneable [`ReadStateService`], used to answer concurrent read requests. + /// + /// TODO: move concurrent read requests to [`ReadRequest`], and remove `read_service`. + read_service: ReadStateService, } /// A read-only service for accessing Zebra's cached blockchain state. @@ -135,9 +151,16 @@ pub(crate) struct StateService { /// ignoring any blocks queued by the read-write [`StateService`]. /// /// This quick response behavior is better for most state users. -#[allow(dead_code)] +/// It allows other async tasks to make progress while concurrently reading data from disk. #[derive(Clone, Debug)] pub struct ReadStateService { + // Configuration + // + /// The configured Zcash network. + network: Network, + + // Shared Concurrently Readable State + // /// The shared inner on-disk database for the finalized state. /// /// RocksDB allows reads and writes via a shared reference, @@ -152,9 +175,6 @@ pub struct ReadStateService { /// This chain is only updated between requests, /// so it might include some block data that is also on `disk`. best_chain_receiver: WatchReceiver>>, - - /// The configured Zcash network. - network: Network, } impl StateService { @@ -185,20 +205,21 @@ impl StateService { let mem = NonFinalizedState::new(network); - let (read_only_service, best_chain_sender) = ReadStateService::new(&disk); + let (read_service, best_chain_sender) = ReadStateService::new(&disk); let queued_blocks = QueuedBlocks::default(); let pending_utxos = PendingUtxos::default(); let state = Self { + network, disk, mem, queued_blocks, pending_utxos, - network, last_prune: Instant::now(), chain_tip_sender, best_chain_sender, + read_service: read_service.clone(), }; timer.finish(module_path!(), line!(), "initializing state service"); @@ -230,7 +251,7 @@ impl StateService { tracing::info!("cached state consensus branch is valid: no legacy chain found"); timer.finish(module_path!(), line!(), "legacy chain check"); - (state, read_only_service, latest_chain_tip, chain_tip_change) + (state, read_service, latest_chain_tip, chain_tip_change) } /// Queue a finalized block for verification and storage in the finalized state. @@ -240,10 +261,17 @@ impl StateService { ) -> oneshot::Receiver> { let (rsp_tx, rsp_rx) = oneshot::channel(); + // TODO: move this code into the state block commit task: + // - queue_and_commit_finalized()'s commit_finalized() call becomes a send to the block commit channel + // - run commit_finalized() in the state block commit task + // - run the metrics update in queue_and_commit_finalized() in the block commit task + // - run the set_finalized_tip() in this function in the state block commit task + // - move all that code to the inner service let tip_block = self .disk .queue_and_commit_finalized((finalized, rsp_tx)) .map(ChainTipBlock::from); + self.chain_tip_sender.set_finalized_tip(tip_block); rsp_rx @@ -292,6 +320,11 @@ impl StateService { return rsp_rx; } + // TODO: move this code into the state block commit task: + // - process_queued()'s validate_and_commit() call becomes a send to the block commit channel + // - run validate_and_commit() in the state block commit task + // - run all the rest of the code in this function in the state block commit task + // - move all that code to the inner service self.process_queued(parent_hash); while self.mem.best_chain_len() > crate::constants::MAX_BLOCK_REORG_HEIGHT { @@ -449,57 +482,11 @@ impl StateService { Ok(()) } - /// Create a block locator for the current best chain. - fn block_locator(&self) -> Option> { - let tip_height = self.best_tip()?.0; - - let heights = crate::util::block_locator_heights(tip_height); - let mut hashes = Vec::with_capacity(heights.len()); - - for height in heights { - if let Some(hash) = self.best_hash(height) { - hashes.push(hash); - } - } - - Some(hashes) - } - /// Return the tip of the current best chain. pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> { self.mem.best_tip().or_else(|| self.disk.db().tip()) } - /// Return the depth of block `hash` in the current best chain. - pub fn best_depth(&self, hash: block::Hash) -> Option { - let tip = self.best_tip()?.0; - let height = self - .mem - .best_height_by_hash(hash) - .or_else(|| self.disk.db().height(hash))?; - - Some(tip.0 - height.0) - } - - /// Return the hash for the block at `height` in the current best chain. - pub fn best_hash(&self, height: block::Height) -> Option { - self.mem - .best_hash(height) - .or_else(|| self.disk.db().hash(height)) - } - - /// Return true if `hash` is in the current best chain. - #[allow(dead_code)] - pub fn best_chain_contains(&self, hash: block::Hash) -> bool { - read::chain_contains_hash(self.mem.best_chain(), self.disk.db(), hash) - } - - /// Return the height for the block at `hash`, if `hash` is in the best chain. - #[allow(dead_code)] - pub fn best_height_by_hash(&self, hash: block::Hash) -> Option { - read::height_by_hash(self.mem.best_chain(), self.disk.db(), hash) - } - /// Return the height for the block at `hash` in any chain. pub fn any_height_by_hash(&self, hash: block::Hash) -> Option { self.mem @@ -559,15 +546,15 @@ impl ReadStateService { pub(crate) fn new(disk: &FinalizedState) -> (Self, watch::Sender>>) { let (best_chain_sender, best_chain_receiver) = watch::channel(None); - let read_only_service = Self { + let read_service = Self { + network: disk.network(), db: disk.db().clone(), best_chain_receiver: WatchReceiver::new(best_chain_receiver), - network: disk.network(), }; tracing::info!("created new read-only state service"); - (read_only_service, best_chain_sender) + (read_service, best_chain_sender) } } @@ -610,6 +597,8 @@ impl Service for StateService { #[instrument(name = "state", skip(self, req))] fn call(&mut self, req: Request) -> Self::Future { match req { + // Uses queued_blocks and pending_utxos in the StateService + // Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb. Request::CommitBlock(prepared) => { metrics::counter!( "state.requests", @@ -659,6 +648,9 @@ impl Service for StateService { .instrument(span) .boxed() } + + // Uses queued_by_prev_hash in the FinalizedState and pending_utxos in the StateService. + // Accesses shared writeable state in the StateService, FinalizedState, and ZebraDb. Request::CommitFinalizedBlock(finalized) => { metrics::counter!( "state.requests", @@ -711,7 +703,11 @@ impl Service for StateService { .instrument(span) .boxed() } - Request::Depth(hash) => { + + // TODO: add a name() method to Request, and combine all the read requests + // + // Runs concurrently using the ReadStateService + Request::Depth(_) => { metrics::counter!( "state.requests", 1, @@ -719,19 +715,23 @@ impl Service for StateService { "type" => "depth", ); - let timer = CodeTimer::start(); + // Redirect the request to the concurrent ReadStateService + let read_service = self.read_service.clone(); - // TODO: move this work into the future, like Block and Transaction? - // move disk reads to a blocking thread (#2188) - let rsp = Ok(Response::Depth(self.best_depth(hash))); + async move { + let req = req + .try_into() + .expect("ReadRequest conversion should not fail"); - // The work is all done, the future just returns the result. - timer.finish(module_path!(), line!(), "Depth"); + let rsp = read_service.oneshot(req).await?; + let rsp = rsp.try_into().expect("Response conversion should not fail"); - async move { rsp }.boxed() + Ok(rsp) + } + .boxed() } - // TODO: consider spawning small reads into blocking tasks, - // because the database can do large cleanups during small reads. + + // Runs concurrently using the ReadStateService Request::Tip => { metrics::counter!( "state.requests", @@ -740,17 +740,23 @@ impl Service for StateService { "type" => "tip", ); - let timer = CodeTimer::start(); + // Redirect the request to the concurrent ReadStateService + let read_service = self.read_service.clone(); - // TODO: move this work into the future, like Block and Transaction? - // move disk reads to a blocking thread (#2188) - let rsp = Ok(Response::Tip(self.best_tip())); + async move { + let req = req + .try_into() + .expect("ReadRequest conversion should not fail"); - // The work is all done, the future just returns the result. - timer.finish(module_path!(), line!(), "Tip"); + let rsp = read_service.oneshot(req).await?; + let rsp = rsp.try_into().expect("Response conversion should not fail"); - async move { rsp }.boxed() + Ok(rsp) + } + .boxed() } + + // Runs concurrently using the ReadStateService Request::BlockLocator => { metrics::counter!( "state.requests", @@ -759,20 +765,24 @@ impl Service for StateService { "type" => "block_locator", ); - let timer = CodeTimer::start(); + // Redirect the request to the concurrent ReadStateService + let read_service = self.read_service.clone(); - // TODO: move this work into the future, like Block and Transaction? - // move disk reads to a blocking thread (#2188) - let rsp = Ok(Response::BlockLocator( - self.block_locator().unwrap_or_default(), - )); + async move { + let req = req + .try_into() + .expect("ReadRequest conversion should not fail"); - // The work is all done, the future just returns the result. - timer.finish(module_path!(), line!(), "BlockLocator"); + let rsp = read_service.oneshot(req).await?; + let rsp = rsp.try_into().expect("Response conversion should not fail"); - async move { rsp }.boxed() + Ok(rsp) + } + .boxed() } - Request::Transaction(hash) => { + + // Runs concurrently using the ReadStateService + Request::Transaction(_) => { metrics::counter!( "state.requests", 1, @@ -780,30 +790,24 @@ impl Service for StateService { "type" => "transaction", ); - let timer = CodeTimer::start(); + // Redirect the request to the concurrent ReadStateService + let read_service = self.read_service.clone(); - // Prepare data for concurrent execution - let best_chain = self.mem.best_chain().cloned(); - let db = self.disk.db().clone(); + async move { + let req = req + .try_into() + .expect("ReadRequest conversion should not fail"); - // # Performance - // - // Allow other async tasks to make progress while the transaction is being read from disk. - let span = Span::current(); - tokio::task::spawn_blocking(move || { - span.in_scope(|| { - let rsp = read::transaction(best_chain, &db, hash); + let rsp = read_service.oneshot(req).await?; + let rsp = rsp.try_into().expect("Response conversion should not fail"); - // The work is done in the future. - timer.finish(module_path!(), line!(), "Transaction"); - - Ok(Response::Transaction(rsp.map(|(tx, _height)| tx))) - }) - }) - .map(|join_result| join_result.expect("panic in Request::Transaction")) + Ok(rsp) + } .boxed() } - Request::Block(hash_or_height) => { + + // Runs concurrently using the ReadStateService + Request::Block(_) => { metrics::counter!( "state.requests", 1, @@ -811,29 +815,24 @@ impl Service for StateService { "type" => "block", ); - let timer = CodeTimer::start(); + // Redirect the request to the concurrent ReadStateService + let read_service = self.read_service.clone(); - // Prepare data for concurrent execution - let best_chain = self.mem.best_chain().cloned(); - let db = self.disk.db().clone(); + async move { + let req = req + .try_into() + .expect("ReadRequest conversion should not fail"); - // # Performance - // - // Allow other async tasks to make progress while the block is being read from disk. - let span = Span::current(); - tokio::task::spawn_blocking(move || { - span.in_scope(move || { - let rsp = read::block(best_chain, &db, hash_or_height); + let rsp = read_service.oneshot(req).await?; + let rsp = rsp.try_into().expect("Response conversion should not fail"); - // The work is done in the future. - timer.finish(module_path!(), line!(), "Block"); - - Ok(Response::Block(rsp)) - }) - }) - .map(|join_result| join_result.expect("panic in Request::Block")) + Ok(rsp) + } .boxed() } + + // Uses pending_utxos and queued_blocks in the StateService. + // Accesses shared writeable state in the StateService. Request::AwaitUtxo(outpoint) => { metrics::counter!( "state.requests", @@ -857,7 +856,9 @@ impl Service for StateService { fut.instrument(span).boxed() } - Request::FindBlockHashes { known_blocks, stop } => { + + // Runs concurrently using the ReadStateService + Request::FindBlockHashes { .. } => { metrics::counter!( "state.requests", 1, @@ -865,38 +866,24 @@ impl Service for StateService { "type" => "find_block_hashes", ); - const MAX_FIND_BLOCK_HASHES_RESULTS: u32 = 500; + // Redirect the request to the concurrent ReadStateService + let read_service = self.read_service.clone(); - let timer = CodeTimer::start(); + async move { + let req = req + .try_into() + .expect("ReadRequest conversion should not fail"); - // Prepare data for concurrent execution - let best_chain = self.mem.best_chain().cloned(); - let db = self.disk.db().clone(); + let rsp = read_service.oneshot(req).await?; + let rsp = rsp.try_into().expect("Response conversion should not fail"); - // # Performance - // - // Allow other async tasks to make progress while the block is being read from disk. - let span = Span::current(); - tokio::task::spawn_blocking(move || { - span.in_scope(move || { - let res = read::find_chain_hashes( - best_chain, - &db, - known_blocks, - stop, - MAX_FIND_BLOCK_HASHES_RESULTS, - ); - - // The work is done in the future. - timer.finish(module_path!(), line!(), "FindBlockHashes"); - - Ok(Response::BlockHashes(res)) - }) - }) - .map(|join_result| join_result.expect("panic in Request::Block")) + Ok(rsp) + } .boxed() } - Request::FindBlockHeaders { known_blocks, stop } => { + + // Runs concurrently using the ReadStateService + Request::FindBlockHeaders { .. } => { metrics::counter!( "state.requests", 1, @@ -904,43 +891,19 @@ impl Service for StateService { "type" => "find_block_headers", ); - // Before we spawn the future, get a consistent set of chain hashes from the state. + // Redirect the request to the concurrent ReadStateService + let read_service = self.read_service.clone(); - const MAX_FIND_BLOCK_HEADERS_RESULTS: u32 = 160; - // Zcashd will blindly request more block headers as long as it - // got 160 block headers in response to a previous query, EVEN - // IF THOSE HEADERS ARE ALREADY KNOWN. To dodge this behavior, - // return slightly fewer than the maximum, to get it to go away. - // - // https://github.com/bitcoin/bitcoin/pull/4468/files#r17026905 - let max_len = MAX_FIND_BLOCK_HEADERS_RESULTS - 2; + async move { + let req = req + .try_into() + .expect("ReadRequest conversion should not fail"); - let timer = CodeTimer::start(); + let rsp = read_service.oneshot(req).await?; + let rsp = rsp.try_into().expect("Response conversion should not fail"); - // Prepare data for concurrent execution - let best_chain = self.mem.best_chain().cloned(); - let db = self.disk.db().clone(); - - // # Performance - // - // Allow other async tasks to make progress while the block is being read from disk. - let span = Span::current(); - tokio::task::spawn_blocking(move || { - span.in_scope(move || { - let res = - read::find_chain_headers(best_chain, &db, known_blocks, stop, max_len); - let res = res - .into_iter() - .map(|header| CountedHeader { header }) - .collect(); - - // The work is done in the future. - timer.finish(module_path!(), line!(), "FindBlockHeaders"); - - Ok(Response::BlockHeaders(res)) - }) - }) - .map(|join_result| join_result.expect("panic in Request::Block")) + Ok(rsp) + } .boxed() } } @@ -960,6 +923,66 @@ impl Service for ReadStateService { #[instrument(name = "read_state", skip(self))] fn call(&mut self, req: ReadRequest) -> Self::Future { match req { + // Used by the StateService. + ReadRequest::Tip => { + metrics::counter!( + "state.requests", + 1, + "service" => "read_state", + "type" => "tip", + ); + + let timer = CodeTimer::start(); + + let state = self.clone(); + + let span = Span::current(); + tokio::task::spawn_blocking(move || { + span.in_scope(move || { + let tip = state + .best_chain_receiver + .with_watch_data(|best_chain| read::tip(best_chain, &state.db)); + + // The work is done in the future. + timer.finish(module_path!(), line!(), "ReadRequest::Tip"); + + Ok(ReadResponse::Tip(tip)) + }) + }) + .map(|join_result| join_result.expect("panic in ReadRequest::Tip")) + .boxed() + } + + // Used by the StateService. + ReadRequest::Depth(hash) => { + metrics::counter!( + "state.requests", + 1, + "service" => "read_state", + "type" => "depth", + ); + + let timer = CodeTimer::start(); + + let state = self.clone(); + + let span = Span::current(); + tokio::task::spawn_blocking(move || { + span.in_scope(move || { + let depth = state + .best_chain_receiver + .with_watch_data(|best_chain| read::depth(best_chain, &state.db, hash)); + + // The work is done in the future. + timer.finish(module_path!(), line!(), "ReadRequest::Depth"); + + Ok(ReadResponse::Depth(depth)) + }) + }) + .map(|join_result| join_result.expect("panic in ReadRequest::Tip")) + .boxed() + } + // Used by get_block RPC. ReadRequest::Block(hash_or_height) => { metrics::counter!( @@ -973,9 +996,6 @@ impl Service for ReadStateService { let state = self.clone(); - // # Performance - // - // Allow other async tasks to make progress while concurrently reading blocks from disk. let span = Span::current(); tokio::task::spawn_blocking(move || { span.in_scope(move || { @@ -1006,9 +1026,6 @@ impl Service for ReadStateService { let state = self.clone(); - // # Performance - // - // Allow other async tasks to make progress while concurrently reading transactions from disk. let span = Span::current(); tokio::task::spawn_blocking(move || { span.in_scope(move || { @@ -1027,6 +1044,118 @@ impl Service for ReadStateService { .boxed() } + // Used by the StateService. + ReadRequest::BlockLocator => { + metrics::counter!( + "state.requests", + 1, + "service" => "read_state", + "type" => "block_locator", + ); + + let timer = CodeTimer::start(); + + let state = self.clone(); + + let span = Span::current(); + tokio::task::spawn_blocking(move || { + span.in_scope(move || { + let block_locator = + state.best_chain_receiver.with_watch_data(|best_chain| { + read::block_locator(best_chain, &state.db) + }); + + // The work is done in the future. + timer.finish(module_path!(), line!(), "ReadRequest::BlockLocator"); + + Ok(ReadResponse::BlockLocator( + block_locator.unwrap_or_default(), + )) + }) + }) + .map(|join_result| join_result.expect("panic in ReadRequest::Tip")) + .boxed() + } + + // Used by the StateService. + ReadRequest::FindBlockHashes { known_blocks, stop } => { + metrics::counter!( + "state.requests", + 1, + "service" => "read_state", + "type" => "find_block_hashes", + ); + + let timer = CodeTimer::start(); + + let state = self.clone(); + + let span = Span::current(); + tokio::task::spawn_blocking(move || { + span.in_scope(move || { + let block_hashes = + state.best_chain_receiver.with_watch_data(|best_chain| { + read::find_chain_hashes( + best_chain, + &state.db, + known_blocks, + stop, + MAX_FIND_BLOCK_HASHES_RESULTS, + ) + }); + + // The work is done in the future. + timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHashes"); + + Ok(ReadResponse::BlockHashes(block_hashes)) + }) + }) + .map(|join_result| join_result.expect("panic in ReadRequest::Tip")) + .boxed() + } + + // Used by the StateService. + ReadRequest::FindBlockHeaders { known_blocks, stop } => { + metrics::counter!( + "state.requests", + 1, + "service" => "read_state", + "type" => "find_block_headers", + ); + + let timer = CodeTimer::start(); + + let state = self.clone(); + + let span = Span::current(); + tokio::task::spawn_blocking(move || { + span.in_scope(move || { + let block_headers = + state.best_chain_receiver.with_watch_data(|best_chain| { + read::find_chain_headers( + best_chain, + &state.db, + known_blocks, + stop, + MAX_FIND_BLOCK_HEADERS_RESULTS_FOR_ZEBRA, + ) + }); + + let block_headers = block_headers + .into_iter() + .map(|header| CountedHeader { header }) + .collect(); + + // The work is done in the future. + timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHeaders"); + + Ok(ReadResponse::BlockHeaders(block_headers)) + }) + }) + .map(|join_result| join_result.expect("panic in ReadRequest::Tip")) + .boxed() + } + ReadRequest::SaplingTree(hash_or_height) => { metrics::counter!( "state.requests", @@ -1039,9 +1168,6 @@ impl Service for ReadStateService { let state = self.clone(); - // # Performance - // - // Allow other async tasks to make progress while concurrently reading trees from disk. let span = Span::current(); tokio::task::spawn_blocking(move || { span.in_scope(move || { @@ -1072,9 +1198,6 @@ impl Service for ReadStateService { let state = self.clone(); - // # Performance - // - // Allow other async tasks to make progress while concurrently reading trees from disk. let span = Span::current(); tokio::task::spawn_blocking(move || { span.in_scope(move || { @@ -1093,6 +1216,36 @@ impl Service for ReadStateService { .boxed() } + // For the get_address_balance RPC. + ReadRequest::AddressBalance(addresses) => { + metrics::counter!( + "state.requests", + 1, + "service" => "read_state", + "type" => "address_balance", + ); + + let timer = CodeTimer::start(); + + let state = self.clone(); + + let span = Span::current(); + tokio::task::spawn_blocking(move || { + span.in_scope(move || { + let balance = state.best_chain_receiver.with_watch_data(|best_chain| { + read::transparent_balance(best_chain, &state.db, addresses) + })?; + + // The work is done in the future. + timer.finish(module_path!(), line!(), "ReadRequest::AddressBalance"); + + Ok(ReadResponse::AddressBalance(balance)) + }) + }) + .map(|join_result| join_result.expect("panic in ReadRequest::AddressBalance")) + .boxed() + } + // For the get_address_tx_ids RPC. ReadRequest::TransactionIdsByAddresses { addresses, @@ -1109,9 +1262,6 @@ impl Service for ReadStateService { let state = self.clone(); - // # Performance - // - // Allow other async tasks to make progress while concurrently reading transaction IDs from disk. let span = Span::current(); tokio::task::spawn_blocking(move || { span.in_scope(move || { @@ -1135,39 +1285,6 @@ impl Service for ReadStateService { .boxed() } - // For the get_address_balance RPC. - ReadRequest::AddressBalance(addresses) => { - metrics::counter!( - "state.requests", - 1, - "service" => "read_state", - "type" => "address_balance", - ); - - let timer = CodeTimer::start(); - - let state = self.clone(); - - // # Performance - // - // Allow other async tasks to make progress while concurrently reading balances from disk. - let span = Span::current(); - tokio::task::spawn_blocking(move || { - span.in_scope(move || { - let balance = state.best_chain_receiver.with_watch_data(|best_chain| { - read::transparent_balance(best_chain, &state.db, addresses) - })?; - - // The work is done in the future. - timer.finish(module_path!(), line!(), "ReadRequest::AddressBalance"); - - Ok(ReadResponse::AddressBalance(balance)) - }) - }) - .map(|join_result| join_result.expect("panic in ReadRequest::AddressBalance")) - .boxed() - } - // For the get_address_utxos RPC. ReadRequest::UtxosByAddresses(addresses) => { metrics::counter!( @@ -1181,9 +1298,6 @@ impl Service for ReadStateService { let state = self.clone(); - // # Performance - // - // Allow other async tasks to make progress while concurrently reading UTXOs from disk. let span = Span::current(); tokio::task::spawn_blocking(move || { span.in_scope(move || { diff --git a/zebra-state/src/service/finalized_state/zebra_db/block.rs b/zebra-state/src/service/finalized_state/zebra_db/block.rs index 72ce15f89..6e092758a 100644 --- a/zebra-state/src/service/finalized_state/zebra_db/block.rs +++ b/zebra-state/src/service/finalized_state/zebra_db/block.rs @@ -59,7 +59,8 @@ impl ZebraDb { /// Returns the tip height and hash, if there is one. // - // TODO: move this method to the tip section + // TODO: rename to finalized_tip() + // move this method to the tip section #[allow(clippy::unwrap_in_result)] pub fn tip(&self) -> Option<(block::Height, block::Hash)> { let hash_by_height = self.db.cf_handle("hash_by_height").unwrap(); diff --git a/zebra-state/src/service/non_finalized_state.rs b/zebra-state/src/service/non_finalized_state.rs index c1540b247..8c5e3644f 100644 --- a/zebra-state/src/service/non_finalized_state.rs +++ b/zebra-state/src/service/non_finalized_state.rs @@ -366,6 +366,7 @@ impl NonFinalizedState { } /// Returns the hash for a given `block::Height` if it is present in the best chain. + #[allow(dead_code)] pub fn best_hash(&self, height: block::Height) -> Option { self.best_chain()? .blocks @@ -391,6 +392,7 @@ impl NonFinalizedState { } /// Returns the height of `hash` in the best chain. + #[allow(dead_code)] pub fn best_height_by_hash(&self, hash: block::Hash) -> Option { let best_chain = self.best_chain()?; let height = *best_chain.height_by_hash.get(&hash)?; diff --git a/zebra-state/src/service/non_finalized_state/chain.rs b/zebra-state/src/service/non_finalized_state/chain.rs index 214b2d6aa..c04d45176 100644 --- a/zebra-state/src/service/non_finalized_state/chain.rs +++ b/zebra-state/src/service/non_finalized_state/chain.rs @@ -483,12 +483,11 @@ impl Chain { self.height_by_hash.get(&hash).cloned() } - /// Returns the non-finalized tip block hash and height. - #[allow(dead_code)] - pub fn non_finalized_tip(&self) -> (block::Hash, block::Height) { + /// Returns the non-finalized tip block height and hash. + pub fn non_finalized_tip(&self) -> (Height, block::Hash) { ( - self.non_finalized_tip_hash(), self.non_finalized_tip_height(), + self.non_finalized_tip_hash(), ) } diff --git a/zebra-state/src/service/read.rs b/zebra-state/src/service/read.rs index 9a068d62a..2649dc86a 100644 --- a/zebra-state/src/service/read.rs +++ b/zebra-state/src/service/read.rs @@ -25,7 +25,7 @@ pub use address::{ }; pub use block::{block, block_header, transaction}; pub use find::{ - chain_contains_hash, find_chain_hashes, find_chain_headers, hash_by_height, height_by_hash, - tip_height, + block_locator, chain_contains_hash, depth, find_chain_hashes, find_chain_headers, + hash_by_height, height_by_hash, tip, tip_height, }; pub use tree::{orchard_tree, sapling_tree}; diff --git a/zebra-state/src/service/read/find.rs b/zebra-state/src/service/read/find.rs index 262c69a48..b3e9855d4 100644 --- a/zebra-state/src/service/read/find.rs +++ b/zebra-state/src/service/read/find.rs @@ -1,28 +1,66 @@ //! Finding and reading block hashes and headers, in response to peer requests. use std::{ + iter, ops::{RangeBounds, RangeInclusive}, sync::Arc, }; use zebra_chain::block::{self, Height}; -use crate::service::{ - finalized_state::ZebraDb, non_finalized_state::Chain, read::block::block_header, +use crate::{ + constants, + service::{finalized_state::ZebraDb, non_finalized_state::Chain, read::block::block_header}, }; +#[cfg(test)] +mod tests; + /// Returns the tip of `chain`. /// If there is no chain, returns the tip of `db`. +pub fn tip(chain: Option, db: &ZebraDb) -> Option<(Height, block::Hash)> +where + C: AsRef, +{ + chain + .map(|chain| chain.as_ref().non_finalized_tip()) + .or_else(|| db.tip()) +} + +/// Returns the tip [`Height`] of `chain`. +/// If there is no chain, returns the tip of `db`. pub fn tip_height(chain: Option, db: &ZebraDb) -> Option where C: AsRef, { - chain - .map(|chain| chain.as_ref().non_finalized_tip_height()) - .or_else(|| db.finalized_tip_height()) + tip(chain, db).map(|(height, _hash)| height) } -/// Return the height for the block at `hash`, if `hash` is in the chain. +/// Returns the tip [`block::Hash`] of `chain`. +/// If there is no chain, returns the tip of `db`. +#[allow(dead_code)] +pub fn tip_hash(chain: Option, db: &ZebraDb) -> Option +where + C: AsRef, +{ + tip(chain, db).map(|(_height, hash)| hash) +} + +/// Return the depth of block `hash` from the chain tip. +/// Searches `chain` for `hash`, then searches `db`. +pub fn depth(chain: Option, db: &ZebraDb, hash: block::Hash) -> Option +where + C: AsRef, +{ + let chain = chain.as_ref(); + + let tip = tip_height(chain, db)?; + let height = height_by_hash(chain, db, hash)?; + + Some(tip.0 - height.0) +} + +/// Return the height for the block at `hash`, if `hash` is in `chain` or `db`. pub fn height_by_hash(chain: Option, db: &ZebraDb, hash: block::Hash) -> Option where C: AsRef, @@ -32,7 +70,7 @@ where .or_else(|| db.height(hash)) } -/// Return the hash for the block at `height`, if `height` is in the chain. +/// Return the hash for the block at `height`, if `height` is in `chain` or `db`. pub fn hash_by_height(chain: Option, db: &ZebraDb, height: Height) -> Option where C: AsRef, @@ -42,7 +80,7 @@ where .or_else(|| db.hash(height)) } -/// Return true if `hash` is in the chain. +/// Return true if `hash` is in `chain` or `db`. pub fn chain_contains_hash(chain: Option, db: &ZebraDb, hash: block::Hash) -> bool where C: AsRef, @@ -53,7 +91,70 @@ where || db.contains_hash(hash) } -/// Find the first hash that's in the peer's `known_blocks` and the chain. +/// Create a block locator from `chain` and `db`. +/// +/// A block locator is used to efficiently find an intersection of two node's chains. +/// It contains a list of block hashes at decreasing heights, skipping some blocks, +/// so that any intersection can be located, no matter how long or different the chains are. +pub fn block_locator(chain: Option, db: &ZebraDb) -> Option> +where + C: AsRef, +{ + let chain = chain.as_ref(); + + let tip_height = tip_height(chain, db)?; + + let heights = block_locator_heights(tip_height); + let mut hashes = Vec::with_capacity(heights.len()); + + for height in heights { + if let Some(hash) = hash_by_height(chain, db, height) { + hashes.push(hash); + } + } + + Some(hashes) +} + +/// Get the heights of the blocks for constructing a block_locator list. +/// +/// Zebra uses a decreasing list of block heights, starting at the tip, and skipping some heights. +/// See [`block_locator()`] for details. +pub fn block_locator_heights(tip_height: block::Height) -> Vec { + // The initial height in the returned `vec` is the tip height, + // and the final height is `MAX_BLOCK_REORG_HEIGHT` below the tip. + // + // The initial distance between heights is 1, and it doubles between each subsequent height. + // So the number of returned heights is approximately `log_2(MAX_BLOCK_REORG_HEIGHT)`. + + // Limit the maximum locator depth. + let min_locator_height = tip_height + .0 + .saturating_sub(constants::MAX_BLOCK_REORG_HEIGHT); + + // Create an exponentially decreasing set of heights. + let exponential_locators = iter::successors(Some(1u32), |h| h.checked_mul(2)) + .flat_map(move |step| tip_height.0.checked_sub(step)); + + // Start at the tip, add decreasing heights, and end MAX_BLOCK_REORG_HEIGHT below the tip. + let locators = iter::once(tip_height.0) + .chain(exponential_locators) + .take_while(move |&height| height > min_locator_height) + .chain(iter::once(min_locator_height)) + .map(block::Height) + .collect(); + + tracing::debug!( + ?tip_height, + ?min_locator_height, + ?locators, + "created block locator" + ); + + locators +} + +/// Find the first hash that's in the peer's `known_blocks`, and in `chain` or `db`. /// /// Returns `None` if: /// * there is no matching hash in the chain, or @@ -183,7 +284,6 @@ where /// Returns a list of [`block::Hash`]es in the chain, /// following the `intersection` with the chain. /// -/// /// See [`find_chain_hashes()`] for details. fn collect_chain_hashes( chain: Option, @@ -305,7 +405,7 @@ where /// Stops the list of hashes after: /// * adding the tip, /// * adding the `stop` hash to the list, if it is in the chain, or -/// * adding 500 hashes to the list. +/// * adding `max_len` hashes to the list. /// /// Returns an empty list if the state is empty, /// and a partial or empty list if the found heights are concurrently modified. diff --git a/zebra-state/src/service/read/find/tests.rs b/zebra-state/src/service/read/find/tests.rs new file mode 100644 index 000000000..50aadb532 --- /dev/null +++ b/zebra-state/src/service/read/find/tests.rs @@ -0,0 +1,3 @@ +//! Tests for concurrent "find" read requests. + +mod vectors; diff --git a/zebra-state/src/service/read/find/tests/vectors.rs b/zebra-state/src/service/read/find/tests/vectors.rs new file mode 100644 index 000000000..1069fe5e4 --- /dev/null +++ b/zebra-state/src/service/read/find/tests/vectors.rs @@ -0,0 +1,65 @@ +//! Fixed test vectors for "find" read requests. + +use zebra_chain::block::Height; + +use crate::{constants, service::read::find::block_locator_heights}; + +/// Block heights, and the expected minimum block locator height +static BLOCK_LOCATOR_CASES: &[(u32, u32)] = &[ + (0, 0), + (1, 0), + (10, 0), + (98, 0), + (99, 0), + (100, 1), + (101, 2), + (1000, 901), + (10000, 9901), +]; + +/// Check that the block locator heights are sensible. +#[test] +fn test_block_locator_heights() { + let _init_guard = zebra_test::init(); + + for (height, min_height) in BLOCK_LOCATOR_CASES.iter().cloned() { + let locator = block_locator_heights(Height(height)); + + assert!(!locator.is_empty(), "locators must not be empty"); + if (height - min_height) > 1 { + assert!( + locator.len() > 2, + "non-trivial locators must have some intermediate heights" + ); + } + + assert_eq!( + locator[0], + Height(height), + "locators must start with the tip height" + ); + + // Check that the locator is sorted, and that it has no duplicates + // TODO: replace with dedup() and is_sorted_by() when sorting stabilises. + assert!(locator.windows(2).all(|v| match v { + [a, b] => a.0 > b.0, + _ => unreachable!("windows returns exact sized slices"), + })); + + let final_height = locator[locator.len() - 1]; + assert_eq!( + final_height, + Height(min_height), + "locators must end with the specified final height" + ); + assert!( + height - final_height.0 <= constants::MAX_BLOCK_REORG_HEIGHT, + "locator for {} must not be more than the maximum reorg height {} below the tip, \ + but {} is {} blocks below the tip", + height, + constants::MAX_BLOCK_REORG_HEIGHT, + final_height.0, + height - final_height.0 + ); + } +} diff --git a/zebra-state/src/tests.rs b/zebra-state/src/tests.rs index 6bae162cf..725f659ae 100644 --- a/zebra-state/src/tests.rs +++ b/zebra-state/src/tests.rs @@ -3,15 +3,13 @@ use std::{mem, sync::Arc}; use zebra_chain::{ - block::{self, Block}, + block::Block, transaction::Transaction, transparent, work::difficulty::ExpandedDifficulty, work::difficulty::{Work, U256}, }; -use super::*; - pub mod setup; /// Helper trait for constructing "valid" looking chains of blocks @@ -77,23 +75,10 @@ fn work_to_expanded(work: U256) -> ExpandedDifficulty { ExpandedDifficulty::from(expanded) } -/// Block heights, and the expected minimum block locator height -static BLOCK_LOCATOR_CASES: &[(u32, u32)] = &[ - (0, 0), - (1, 0), - (10, 0), - (98, 0), - (99, 0), - (100, 1), - (101, 2), - (1000, 901), - (10000, 9901), -]; - -use proptest::prelude::*; - #[test] fn round_trip_work_expanded() { + use proptest::prelude::*; + let _init_guard = zebra_test::init(); proptest!(|(work_before in any::())| { @@ -103,47 +88,3 @@ fn round_trip_work_expanded() { prop_assert_eq!(work_before, work_after); }); } - -/// Check that the block locator heights are sensible. -#[test] -fn test_block_locator_heights() { - let _init_guard = zebra_test::init(); - - for (height, min_height) in BLOCK_LOCATOR_CASES.iter().cloned() { - let locator = util::block_locator_heights(block::Height(height)); - - assert!(!locator.is_empty(), "locators must not be empty"); - if (height - min_height) > 1 { - assert!( - locator.len() > 2, - "non-trivial locators must have some intermediate heights" - ); - } - - assert_eq!( - locator[0], - block::Height(height), - "locators must start with the tip height" - ); - - // Check that the locator is sorted, and that it has no duplicates - // TODO: replace with dedup() and is_sorted_by() when sorting stabilises. - assert!(locator.windows(2).all(|v| match v { - [a, b] => a.0 > b.0, - _ => unreachable!("windows returns exact sized slices"), - })); - - let final_height = locator[locator.len() - 1]; - assert_eq!( - final_height, - block::Height(min_height), - "locators must end with the specified final height" - ); - assert!(height - final_height.0 <= constants::MAX_BLOCK_REORG_HEIGHT, - "locator for {} must not be more than the maximum reorg height {} below the tip, but {} is {} blocks below the tip", - height, - constants::MAX_BLOCK_REORG_HEIGHT, - final_height.0, - height - final_height.0); - } -} diff --git a/zebra-state/src/util.rs b/zebra-state/src/util.rs deleted file mode 100644 index 6ee848ac2..000000000 --- a/zebra-state/src/util.rs +++ /dev/null @@ -1,28 +0,0 @@ -use std::iter; -use zebra_chain::block; - -use crate::constants; - -/// Get the heights of the blocks for constructing a block_locator list -pub fn block_locator_heights(tip_height: block::Height) -> Vec { - // Stop at the reorg limit, or the genesis block. - let min_locator_height = tip_height - .0 - .saturating_sub(constants::MAX_BLOCK_REORG_HEIGHT); - let locators = iter::successors(Some(1u32), |h| h.checked_mul(2)) - .flat_map(move |step| tip_height.0.checked_sub(step)); - let locators = iter::once(tip_height.0) - .chain(locators) - .take_while(move |&height| height > min_locator_height) - .chain(iter::once(min_locator_height)) - .map(block::Height); - - let locators = locators.collect(); - tracing::debug!( - ?tip_height, - ?min_locator_height, - ?locators, - "created block locator" - ); - locators -}