2. change(state): Run AwaitUtxo read requests without shared mutable chain state (#5107)

* Move AwaitUtxos next to the other shared writeable state requests

* Rename ReadResponse::Utxos to ReadResponse::AddressUtxos

```sh
fastmod Utxos AddressUtxos zebra*
```

* Rename an out_point variable to outpoint for consistency

* Rename transparent_utxos to address_utxos

```sh
fastmod transparent_utxos address_utxos zebra*
```

* Run AwaitUtxo without accessing shared mutable chain state

* Fix some incorrect comments

* Explain why some concurrent reads are ok

* Add a TODO

* Stop using self.mem in AwaitUtxo requests

* Update state service module documentation

* Move the QueuedBlock type into the queued_blocks module

* Explain how spent UTXOs are treated by the state

* Clarify how cached Chains impact state read requests

And move repeated comments to the module header.

* fastmod ChainUtxo BestChainUtxo zebra*

* Add an AnyChainUtxo request

* Make AwaitUtxo non-blocking
This commit is contained in:
teor 2022-09-16 14:13:26 +10:00 committed by GitHub
parent 726f732640
commit 36a549ee3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 547 additions and 194 deletions

View File

@ -916,7 +916,7 @@ where
data: None,
})?;
let utxos = match response {
zebra_state::ReadResponse::Utxos(utxos) => utxos,
zebra_state::ReadResponse::AddressUtxos(utxos) => utxos,
_ => unreachable!("unmatched response to a UtxosByAddresses request"),
};

View File

@ -445,9 +445,11 @@ pub enum Request {
/// [`block::Height`] using `.into()`.
Block(HashOrHeight),
/// Request a UTXO identified by the given
/// [`OutPoint`](transparent::OutPoint), waiting until it becomes available
/// if it is unknown.
/// Request a UTXO identified by the given [`OutPoint`](transparent::OutPoint),
/// waiting until it becomes available if it is unknown.
///
/// Checks the finalized chain, all non-finalized chains, queued unverified blocks,
/// and any blocks that arrive at the state after the request future has been created.
///
/// This request is purely informational, and there are no guarantees about
/// whether the UTXO remains unspent or is on the best chain, or any chain.
@ -458,6 +460,8 @@ pub enum Request {
/// UTXO requests should be wrapped in a timeout, so that
/// out-of-order and invalid requests do not hang indefinitely. See the [`crate`]
/// documentation for details.
///
/// Outdated requests are pruned on a regular basis.
AwaitUtxo(transparent::OutPoint),
/// Finds the first hash that's in the peer's `known_blocks` and the local best chain.
@ -542,6 +546,24 @@ pub enum ReadRequest {
/// * [`ReadResponse::Transaction(None)`](ReadResponse::Transaction) otherwise.
Transaction(transaction::Hash),
/// Looks up a UTXO identified by the given [`OutPoint`](transparent::OutPoint),
/// returning `None` immediately if it is unknown.
///
/// Checks verified blocks in the finalized chain and the _best_ non-finalized chain.
///
/// This request is purely informational, there is no guarantee that
/// the UTXO remains unspent in the best chain.
BestChainUtxo(transparent::OutPoint),
/// Looks up a UTXO identified by the given [`OutPoint`](transparent::OutPoint),
/// returning `None` immediately if it is unknown.
///
/// Checks verified blocks in the finalized chain and _all_ non-finalized chains.
///
/// This request is purely informational, there is no guarantee that
/// the UTXO remains unspent in the best chain.
AnyChainUtxo(transparent::OutPoint),
/// Computes a block locator object based on the current best chain.
///
/// Returns [`ReadResponse::BlockLocator`] with hashes starting
@ -662,8 +684,6 @@ impl TryFrom<Request> for ReadRequest {
Request::Block(hash_or_height) => Ok(ReadRequest::Block(hash_or_height)),
Request::Transaction(tx_hash) => Ok(ReadRequest::Transaction(tx_hash)),
Request::AwaitUtxo(_) => unimplemented!("use StoredUtxo here"),
Request::BlockLocator => Ok(ReadRequest::BlockLocator),
Request::FindBlockHashes { known_blocks, stop } => {
Ok(ReadRequest::FindBlockHashes { known_blocks, stop })
@ -675,6 +695,10 @@ impl TryFrom<Request> for ReadRequest {
Request::CommitBlock(_) | Request::CommitFinalizedBlock(_) => {
Err("ReadService does not write blocks")
}
Request::AwaitUtxo(_) => Err("ReadService does not track pending UTXOs. \
Manually convert the request to ReadRequest::AnyChainUtxo, \
and handle pending UTXOs"),
}
}
}

View File

@ -39,7 +39,8 @@ pub enum Response {
/// Response to [`Request::Block`] with the specified block.
Block(Option<Arc<Block>>),
/// The response to a `AwaitUtxo` request.
/// The response to a `AwaitUtxo` request, from any non-finalized chains, finalized chain,
/// pending unverified blocks, or blocks received after the request was sent.
Utxo(transparent::Utxo),
/// The response to a `FindBlockHashes` request.
@ -75,6 +76,20 @@ pub enum ReadResponse {
/// The response to a `FindBlockHeaders` request.
BlockHeaders(Vec<block::CountedHeader>),
/// The response to a `BestChainUtxo` request, from verified blocks in the
/// _best_ non-finalized chain, or the finalized chain.
///
/// This response is purely informational, there is no guarantee that
/// the UTXO remains unspent in the best chain.
BestChainUtxo(Option<transparent::Utxo>),
/// The response to an `AnyChainUtxo` request, from verified blocks in
/// _any_ non-finalized chain, or the finalized chain.
///
/// This response is purely informational, there is no guarantee that
/// the UTXO remains unspent in the best chain.
AnyChainUtxo(Option<transparent::Utxo>),
/// Response to [`ReadRequest::SaplingTree`] with the specified Sapling note commitment tree.
SaplingTree(Option<Arc<sapling::tree::NoteCommitmentTree>>),
@ -89,7 +104,7 @@ pub enum ReadResponse {
AddressesTransactionIds(BTreeMap<TransactionLocation, transaction::Hash>),
/// Response to [`ReadRequest::UtxosByAddresses`] with found utxos and transaction data.
Utxos(AddressUtxos),
AddressUtxos(AddressUtxos),
}
/// Conversion from read-only [`ReadResponse`]s to read-write [`Response`]s.
@ -108,17 +123,21 @@ impl TryFrom<ReadResponse> for Response {
Ok(Response::Transaction(tx_and_height.map(|(tx, _height)| tx)))
}
ReadResponse::AnyChainUtxo(_) => Err("ReadService does not track pending UTXOs. \
Manually unwrap the response, and handle pending UTXOs."),
ReadResponse::BlockLocator(hashes) => Ok(Response::BlockLocator(hashes)),
ReadResponse::BlockHashes(hashes) => Ok(Response::BlockHashes(hashes)),
ReadResponse::BlockHeaders(headers) => Ok(Response::BlockHeaders(headers)),
ReadResponse::SaplingTree(_) => unimplemented!(),
ReadResponse::OrchardTree(_) => unimplemented!(),
ReadResponse::AddressBalance(_) => unimplemented!(),
ReadResponse::AddressesTransactionIds(_) => unimplemented!(),
// TODO: Rename to AddressUtxos
ReadResponse::Utxos(_) => unimplemented!(),
ReadResponse::BestChainUtxo(_)
| ReadResponse::SaplingTree(_)
| ReadResponse::OrchardTree(_)
| ReadResponse::AddressBalance(_)
| ReadResponse::AddressesTransactionIds(_)
| ReadResponse::AddressUtxos(_) => {
Err("there is no corresponding Response for this ReadResponse")
}
}
}
}

View File

@ -1,13 +1,12 @@
//! [`tower::Service`]s for Zebra's cached chain state.
//!
//! Zebra provides cached state access via two main services:
//! - [`StateService`]: a read-write service that waits for queued blocks.
//! - [`StateService`]: a read-write service that writes blocks to the state,
//! and redirects most read requests to the [`ReadStateService`].
//! - [`ReadStateService`]: a read-only service that answers from the most
//! recent committed block.
//!
//! Most users should prefer [`ReadStateService`], unless they need to wait for
//! verified blocks to be committed. (For example, the syncer and mempool
//! tasks.)
//! Most users should prefer [`ReadStateService`], unless they need to write blocks to the state.
//!
//! Zebra also provides access to the best chain tip via:
//! - [`LatestChainTip`]: a read-only channel that contains the latest committed
@ -19,7 +18,6 @@ use std::{
convert,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::{Duration, Instant},
};
@ -36,7 +34,6 @@ use zebra_chain::{
block::{self, CountedHeader},
diagnostic::CodeTimer,
parameters::{Network, NetworkUpgrade},
transparent,
};
use crate::{
@ -44,7 +41,7 @@ use crate::{
service::{
chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
finalized_state::{FinalizedState, ZebraDb},
non_finalized_state::{Chain, NonFinalizedState, QueuedBlocks},
non_finalized_state::{NonFinalizedState, QueuedBlocks},
pending_utxos::PendingUtxos,
watch_receiver::WatchReceiver,
},
@ -71,10 +68,6 @@ mod tests;
pub use finalized_state::{OutputIndex, OutputLocation, TransactionLocation};
pub type QueuedBlock = (
PreparedBlock,
oneshot::Sender<Result<block::Hash, BoxError>>,
);
pub type QueuedFinalized = (
FinalizedBlock,
oneshot::Sender<Result<block::Hash, BoxError>>,
@ -113,9 +106,10 @@ pub(crate) struct StateService {
/// The non-finalized chain state, including its in-memory chain forks.
mem: NonFinalizedState,
// Queued Non-Finalized Blocks
// Queued Blocks
//
/// Blocks awaiting their parent blocks for contextual verification.
/// Blocks for the [`NonFinalizedState`], which are awaiting their parent blocks
/// before they can do contextual verification.
queued_blocks: QueuedBlocks,
// Pending UTXO Request Tracking
@ -132,12 +126,12 @@ pub(crate) struct StateService {
/// [`LatestChainTip`] and [`ChainTipChange`].
chain_tip_sender: ChainTipSender,
/// A sender channel used to update the current best non-finalized chain for [`ReadStateService`].
best_chain_sender: watch::Sender<Option<Arc<Chain>>>,
/// A sender channel used to update the recent non-finalized state for the [`ReadStateService`].
non_finalized_state_sender: watch::Sender<NonFinalizedState>,
/// A cloneable [`ReadStateService`], used to answer concurrent read requests.
///
/// TODO: move concurrent read requests to [`ReadRequest`], and remove `read_service`.
/// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`.
read_service: ReadStateService,
}
@ -170,11 +164,11 @@ pub struct ReadStateService {
/// so it might include some block data that is also in `best_mem`.
db: ZebraDb,
/// A watch channel for the current best in-memory chain.
/// A watch channel for a recent [`NonFinalizedState`].
///
/// This chain is only updated between requests,
/// This state is only updated between requests,
/// so it might include some block data that is also on `disk`.
best_chain_receiver: WatchReceiver<Option<Arc<Chain>>>,
non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
}
impl StateService {
@ -205,7 +199,7 @@ impl StateService {
let mem = NonFinalizedState::new(network);
let (read_service, best_chain_sender) = ReadStateService::new(&disk);
let (read_service, non_finalized_state_sender) = ReadStateService::new(&disk);
let queued_blocks = QueuedBlocks::default();
let pending_utxos = PendingUtxos::default();
@ -218,7 +212,7 @@ impl StateService {
pending_utxos,
last_prune: Instant::now(),
chain_tip_sender,
best_chain_sender,
non_finalized_state_sender,
read_service: read_service.clone(),
};
timer.finish(module_path!(), line!(), "initializing state service");
@ -315,6 +309,9 @@ impl StateService {
rsp_rx
};
// TODO: avoid a temporary verification failure that can happen
// if the first non-finalized block arrives before the last finalized block is committed
// (#5125)
if !self.can_fork_chain_at(&parent_hash) {
tracing::trace!("unready to verify, returning early");
return rsp_rx;
@ -364,7 +361,7 @@ impl StateService {
rsp_rx
}
/// Update the [`LatestChainTip`], [`ChainTipChange`], and `best_chain_sender`
/// Update the [`LatestChainTip`], [`ChainTipChange`], and `non_finalized_state_sender`
/// channels with the latest non-finalized [`ChainTipBlock`] and
/// [`Chain`][1].
///
@ -381,11 +378,8 @@ impl StateService {
.map(ChainTipBlock::from);
let tip_block_height = tip_block.as_ref().map(|block| block.height);
// The RPC service uses the ReadStateService, but it is not turned on by default.
if self.best_chain_sender.receiver_count() > 0 {
// If the final receiver was just dropped, ignore the error.
let _ = self.best_chain_sender.send(best_chain.cloned());
}
let _ = self.non_finalized_state_sender.send(self.mem.clone());
self.chain_tip_sender.set_best_non_finalized_tip(tip_block);
@ -462,7 +456,7 @@ impl StateService {
/// network, based on the committed finalized and non-finalized state.
///
/// Note: some additional contextual validity checks are performed by the
/// non-finalized [`Chain`].
/// non-finalized [`Chain`](non_finalized_state::Chain).
fn check_contextual_validity(
&mut self,
prepared: &PreparedBlock,
@ -494,26 +488,6 @@ impl StateService {
.or_else(|| self.disk.db().height(hash))
}
/// Return the [`transparent::Utxo`] pointed to by `outpoint`, if it exists
/// in any chain, or in any pending block.
///
/// Some of the returned UTXOs may be invalid, because:
/// - they are not in the best chain, or
/// - their block fails contextual validation.
pub fn any_utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::Utxo> {
// We ignore any UTXOs in FinalizedState.queued_by_prev_hash,
// because it is only used during checkpoint verification.
self.mem
.any_utxo(outpoint)
.or_else(|| self.queued_blocks.utxo(outpoint))
.or_else(|| {
self.disk
.db()
.utxo(outpoint)
.map(|ordered_utxo| ordered_utxo.utxo)
})
}
/// Return an iterator over the relevant chain of the block identified by
/// `hash`, in order from the largest height to the genesis block.
///
@ -542,19 +516,20 @@ impl ReadStateService {
/// Creates a new read-only state service, using the provided finalized state.
///
/// Returns the newly created service,
/// and a watch channel for updating its best non-finalized chain.
pub(crate) fn new(disk: &FinalizedState) -> (Self, watch::Sender<Option<Arc<Chain>>>) {
let (best_chain_sender, best_chain_receiver) = watch::channel(None);
/// and a watch channel for updating the shared recent non-finalized chain.
pub(crate) fn new(disk: &FinalizedState) -> (Self, watch::Sender<NonFinalizedState>) {
let (non_finalized_state_sender, non_finalized_state_receiver) =
watch::channel(NonFinalizedState::new(disk.network()));
let read_service = Self {
network: disk.network(),
db: disk.db().clone(),
best_chain_receiver: WatchReceiver::new(best_chain_receiver),
non_finalized_state_receiver: WatchReceiver::new(non_finalized_state_receiver),
};
tracing::info!("created new read-only state service");
(read_service, best_chain_sender)
(read_service, non_finalized_state_sender)
}
}
@ -704,7 +679,80 @@ impl Service<Request> for StateService {
.boxed()
}
// TODO: add a name() method to Request, and combine all the read requests
// Uses pending_utxos and queued_blocks in the StateService.
// If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
Request::AwaitUtxo(outpoint) => {
metrics::counter!(
"state.requests",
1,
"service" => "state",
"type" => "await_utxo",
);
let timer = CodeTimer::start();
// Prepare the AwaitUtxo future from PendingUxtos.
let response_fut = self.pending_utxos.queue(outpoint);
// Only instrument `response_fut`, the ReadStateService already
// instruments its requests with the same span.
let span = Span::current();
let response_fut = response_fut.instrument(span).boxed();
// Check the non-finalized block queue outside the returned future,
// so we can access mutable state fields.
if let Some(utxo) = self.queued_blocks.utxo(&outpoint) {
self.pending_utxos.respond(&outpoint, utxo);
// We're finished, the returned future gets the UTXO from the respond() channel.
timer.finish(module_path!(), line!(), "AwaitUtxo/queued-non-finalized");
return response_fut;
}
// We ignore any UTXOs in FinalizedState.queued_by_prev_hash,
// because it is only used during checkpoint verification.
//
// This creates a rare race condition, but it doesn't seem to happen much in practice.
// See #5126 for details.
// Manually send a request to the ReadStateService,
// to get UTXOs from any non-finalized chain or the finalized chain.
let read_service = self.read_service.clone();
// Run the request in an async block, so we can await the response.
async move {
let req = ReadRequest::AnyChainUtxo(outpoint);
let rsp = read_service.oneshot(req).await?;
// Optional TODO:
// - make pending_utxos.respond() async using a channel,
// so we can respond to all waiting requests here
//
// This change is not required for correctness, because:
// - any waiting requests should have returned when the block was sent to the state
// - otherwise, the request returns immediately if:
// - the block is in the non-finalized queue, or
// - the block is in any non-finalized chain or the finalized state
//
// And if the block is in the finalized queue,
// that's rare enough that a retry is ok.
if let ReadResponse::AnyChainUtxo(Some(utxo)) = rsp {
// We got a UTXO, so we replace the response future with the result own.
timer.finish(module_path!(), line!(), "AwaitUtxo/any-chain");
return Ok(Response::Utxo(utxo));
}
// We're finished, but the returned future is waiting on the respond() channel.
timer.finish(module_path!(), line!(), "AwaitUtxo/waiting");
response_fut.await
}
.boxed()
}
// TODO: add a name() method to Request, and combine all the generic read requests
//
// Runs concurrently using the ReadStateService
Request::Depth(_) => {
@ -831,32 +879,6 @@ impl Service<Request> for StateService {
.boxed()
}
// Uses pending_utxos and queued_blocks in the StateService.
// Accesses shared writeable state in the StateService.
Request::AwaitUtxo(outpoint) => {
metrics::counter!(
"state.requests",
1,
"service" => "state",
"type" => "await_utxo",
);
let timer = CodeTimer::start();
let span = Span::current();
let fut = self.pending_utxos.queue(outpoint);
// TODO: move disk reads (in `any_utxo()`) to a blocking thread (#2188)
if let Some(utxo) = self.any_utxo(&outpoint) {
self.pending_utxos.respond(&outpoint, utxo);
}
// The future waits on a channel for a response.
timer.finish(module_path!(), line!(), "AwaitUtxo");
fut.instrument(span).boxed()
}
// Runs concurrently using the ReadStateService
Request::FindBlockHashes { .. } => {
metrics::counter!(
@ -939,9 +961,11 @@ impl Service<ReadRequest> for ReadStateService {
let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let tip = state
.best_chain_receiver
.with_watch_data(|best_chain| read::tip(best_chain, &state.db));
let tip = state.non_finalized_state_receiver.with_watch_data(
|non_finalized_state| {
read::tip(non_finalized_state.best_chain(), &state.db)
},
);
// The work is done in the future.
timer.finish(module_path!(), line!(), "ReadRequest::Tip");
@ -969,9 +993,11 @@ impl Service<ReadRequest> for ReadStateService {
let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let depth = state
.best_chain_receiver
.with_watch_data(|best_chain| read::depth(best_chain, &state.db, hash));
let depth = state.non_finalized_state_receiver.with_watch_data(
|non_finalized_state| {
read::depth(non_finalized_state.best_chain(), &state.db, hash)
},
);
// The work is done in the future.
timer.finish(module_path!(), line!(), "ReadRequest::Depth");
@ -983,7 +1009,7 @@ impl Service<ReadRequest> for ReadStateService {
.boxed()
}
// Used by get_block RPC.
// Used by get_block RPC and the StateService.
ReadRequest::Block(hash_or_height) => {
metrics::counter!(
"state.requests",
@ -999,9 +1025,15 @@ impl Service<ReadRequest> for ReadStateService {
let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let block = state.best_chain_receiver.with_watch_data(|best_chain| {
read::block(best_chain, &state.db, hash_or_height)
});
let block = state.non_finalized_state_receiver.with_watch_data(
|non_finalized_state| {
read::block(
non_finalized_state.best_chain(),
&state.db,
hash_or_height,
)
},
);
// The work is done in the future.
timer.finish(module_path!(), line!(), "ReadRequest::Block");
@ -1013,7 +1045,7 @@ impl Service<ReadRequest> for ReadStateService {
.boxed()
}
// For the get_raw_transaction RPC.
// For the get_raw_transaction RPC and the StateService.
ReadRequest::Transaction(hash) => {
metrics::counter!(
"state.requests",
@ -1029,9 +1061,10 @@ impl Service<ReadRequest> for ReadStateService {
let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let transaction_and_height =
state.best_chain_receiver.with_watch_data(|best_chain| {
read::transaction(best_chain, &state.db, hash)
let transaction_and_height = state
.non_finalized_state_receiver
.with_watch_data(|non_finalized_state| {
read::transaction(non_finalized_state.best_chain(), &state.db, hash)
});
// The work is done in the future.
@ -1044,6 +1077,70 @@ impl Service<ReadRequest> for ReadStateService {
.boxed()
}
// Currently unused.
ReadRequest::BestChainUtxo(outpoint) => {
metrics::counter!(
"state.requests",
1,
"service" => "read_state",
"type" => "best_chain_utxo",
);
let timer = CodeTimer::start();
let state = self.clone();
let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let utxo = state.non_finalized_state_receiver.with_watch_data(
|non_finalized_state| {
read::utxo(non_finalized_state.best_chain(), &state.db, outpoint)
},
);
// The work is done in the future.
timer.finish(module_path!(), line!(), "ReadRequest::BestChainUtxo");
Ok(ReadResponse::BestChainUtxo(utxo))
})
})
.map(|join_result| join_result.expect("panic in ReadRequest::BestChainUtxo"))
.boxed()
}
// Manually used by the StateService to implement part of AwaitUtxo.
ReadRequest::AnyChainUtxo(outpoint) => {
metrics::counter!(
"state.requests",
1,
"service" => "read_state",
"type" => "any_chain_utxo",
);
let timer = CodeTimer::start();
let state = self.clone();
let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let utxo = state.non_finalized_state_receiver.with_watch_data(
|non_finalized_state| {
read::any_utxo(non_finalized_state, &state.db, outpoint)
},
);
// The work is done in the future.
timer.finish(module_path!(), line!(), "ReadRequest::AnyChainUtxo");
Ok(ReadResponse::AnyChainUtxo(utxo))
})
})
.map(|join_result| join_result.expect("panic in ReadRequest::AnyChainUtxo"))
.boxed()
}
// Used by the StateService.
ReadRequest::BlockLocator => {
metrics::counter!(
@ -1060,10 +1157,11 @@ impl Service<ReadRequest> for ReadStateService {
let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let block_locator =
state.best_chain_receiver.with_watch_data(|best_chain| {
read::block_locator(best_chain, &state.db)
});
let block_locator = state.non_finalized_state_receiver.with_watch_data(
|non_finalized_state| {
read::block_locator(non_finalized_state.best_chain(), &state.db)
},
);
// The work is done in the future.
timer.finish(module_path!(), line!(), "ReadRequest::BlockLocator");
@ -1093,16 +1191,17 @@ impl Service<ReadRequest> for ReadStateService {
let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let block_hashes =
state.best_chain_receiver.with_watch_data(|best_chain| {
let block_hashes = state.non_finalized_state_receiver.with_watch_data(
|non_finalized_state| {
read::find_chain_hashes(
best_chain,
non_finalized_state.best_chain(),
&state.db,
known_blocks,
stop,
MAX_FIND_BLOCK_HASHES_RESULTS,
)
});
},
);
// The work is done in the future.
timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHashes");
@ -1130,16 +1229,17 @@ impl Service<ReadRequest> for ReadStateService {
let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let block_headers =
state.best_chain_receiver.with_watch_data(|best_chain| {
let block_headers = state.non_finalized_state_receiver.with_watch_data(
|non_finalized_state| {
read::find_chain_headers(
best_chain,
non_finalized_state.best_chain(),
&state.db,
known_blocks,
stop,
MAX_FIND_BLOCK_HEADERS_RESULTS_FOR_ZEBRA,
)
});
},
);
let block_headers = block_headers
.into_iter()
@ -1171,10 +1271,15 @@ impl Service<ReadRequest> for ReadStateService {
let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let sapling_tree =
state.best_chain_receiver.with_watch_data(|best_chain| {
read::sapling_tree(best_chain, &state.db, hash_or_height)
});
let sapling_tree = state.non_finalized_state_receiver.with_watch_data(
|non_finalized_state| {
read::sapling_tree(
non_finalized_state.best_chain(),
&state.db,
hash_or_height,
)
},
);
// The work is done in the future.
timer.finish(module_path!(), line!(), "ReadRequest::SaplingTree");
@ -1201,10 +1306,15 @@ impl Service<ReadRequest> for ReadStateService {
let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let orchard_tree =
state.best_chain_receiver.with_watch_data(|best_chain| {
read::orchard_tree(best_chain, &state.db, hash_or_height)
});
let orchard_tree = state.non_finalized_state_receiver.with_watch_data(
|non_finalized_state| {
read::orchard_tree(
non_finalized_state.best_chain(),
&state.db,
hash_or_height,
)
},
);
// The work is done in the future.
timer.finish(module_path!(), line!(), "ReadRequest::OrchardTree");
@ -1232,9 +1342,15 @@ impl Service<ReadRequest> for ReadStateService {
let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let balance = state.best_chain_receiver.with_watch_data(|best_chain| {
read::transparent_balance(best_chain, &state.db, addresses)
})?;
let balance = state.non_finalized_state_receiver.with_watch_data(
|non_finalized_state| {
read::transparent_balance(
non_finalized_state.best_chain().cloned(),
&state.db,
addresses,
)
},
)?;
// The work is done in the future.
timer.finish(module_path!(), line!(), "ReadRequest::AddressBalance");
@ -1265,9 +1381,16 @@ impl Service<ReadRequest> for ReadStateService {
let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let tx_ids = state.best_chain_receiver.with_watch_data(|best_chain| {
read::transparent_tx_ids(best_chain, &state.db, addresses, height_range)
});
let tx_ids = state.non_finalized_state_receiver.with_watch_data(
|non_finalized_state| {
read::transparent_tx_ids(
non_finalized_state.best_chain(),
&state.db,
addresses,
height_range,
)
},
);
// The work is done in the future.
timer.finish(
@ -1301,14 +1424,21 @@ impl Service<ReadRequest> for ReadStateService {
let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let utxos = state.best_chain_receiver.with_watch_data(|best_chain| {
read::transparent_utxos(state.network, best_chain, &state.db, addresses)
});
let utxos = state.non_finalized_state_receiver.with_watch_data(
|non_finalized_state| {
read::address_utxos(
state.network,
non_finalized_state.best_chain(),
&state.db,
addresses,
)
},
);
// The work is done in the future.
timer.finish(module_path!(), line!(), "ReadRequest::UtxosByAddresses");
utxos.map(ReadResponse::Utxos)
utxos.map(ReadResponse::AddressUtxos)
})
})
.map(|join_result| join_result.expect("panic in ReadRequest::UtxosByAddresses"))

View File

@ -9,9 +9,13 @@ use crate::{
ValidateContextError,
};
// Tidy up some doc links
#[allow(unused_imports)]
use crate::service;
/// Reject double-spends of nullifers:
/// - one from this [`PreparedBlock`], and the other already committed to the
/// [`FinalizedState`](super::super::FinalizedState).
/// [`FinalizedState`](service::FinalizedState).
///
/// (Duplicate non-finalized nullifiers are rejected during the chain update,
/// see [`add_to_non_finalized_chain_unique`] for details.)
@ -80,7 +84,7 @@ pub(crate) fn no_duplicates_in_finalized_chain(
/// [2]: zebra_chain::sapling::Spend
/// [3]: zebra_chain::orchard::Action
/// [4]: zebra_chain::block::Block
/// [5]: super::super::Chain
/// [5]: service::non_finalized_state::Chain
#[tracing::instrument(skip(chain_nullifiers, shielded_data_nullifiers))]
pub(crate) fn add_to_non_finalized_chain_unique<'block, NullifierT>(
chain_nullifiers: &mut HashSet<NullifierT>,
@ -124,7 +128,7 @@ where
/// [`add_to_non_finalized_chain_unique`], so this shielded data should be the
/// only shielded data that added this nullifier to this [`Chain`][1].
///
/// [1]: super::super::Chain
/// [1]: service::non_finalized_state::Chain
#[tracing::instrument(skip(chain_nullifiers, shielded_data_nullifiers))]
pub(crate) fn remove_from_non_finalized_chain<'block, NullifierT>(
chain_nullifiers: &mut HashSet<NullifierT>,

View File

@ -313,7 +313,7 @@ impl ZebraDb {
///
/// Specifically, a block in the partial chain must be a child block of the finalized tip.
/// (But the child block does not have to be the partial chain root.)
pub fn partial_finalized_transparent_utxos(
pub fn partial_finalized_address_utxos(
&self,
addresses: &HashSet<transparent::Address>,
) -> BTreeMap<OutputLocation, transparent::Output> {

View File

@ -340,14 +340,13 @@ impl NonFinalizedState {
/// Returns the [`transparent::Utxo`] pointed to by the given
/// [`transparent::OutPoint`] if it is present in any chain.
///
/// UTXOs are returned regardless of whether they have been spent.
pub fn any_utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::Utxo> {
for chain in self.chain_set.iter().rev() {
if let Some(utxo) = chain.created_utxos.get(outpoint) {
return Some(utxo.utxo.clone());
}
}
None
self.chain_set
.iter()
.rev()
.find_map(|chain| chain.created_utxo(outpoint))
}
/// Returns the `block` with the given hash in any chain.

View File

@ -646,11 +646,23 @@ impl Chain {
/// and removed from the relevant chain(s).
pub fn unspent_utxos(&self) -> HashMap<transparent::OutPoint, transparent::OrderedUtxo> {
let mut unspent_utxos = self.created_utxos.clone();
unspent_utxos.retain(|out_point, _utxo| !self.spent_utxos.contains(out_point));
unspent_utxos.retain(|outpoint, _utxo| !self.spent_utxos.contains(outpoint));
unspent_utxos
}
/// Returns the [`transparent::Utxo`] pointed to by the given
/// [`transparent::OutPoint`] if it was created by this chain.
///
/// UTXOs are returned regardless of whether they have been spent.
pub fn created_utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::Utxo> {
if let Some(utxo) = self.created_utxos.get(outpoint) {
return Some(utxo.utxo.clone());
}
None
}
// Address index queries
/// Returns the transparent transfers for `addresses` in this non-finalized chain.

View File

@ -1,12 +1,22 @@
//! Queued blocks that are awaiting their parent block for verification.
use std::{
collections::{BTreeMap, HashMap, HashSet},
mem,
};
use tokio::sync::oneshot;
use tracing::instrument;
use zebra_chain::{block, transparent};
use crate::service::QueuedBlock;
use crate::{BoxError, PreparedBlock};
/// A queued non-finalized block, and its corresponding [`Result`] channel.
pub type QueuedBlock = (
PreparedBlock,
oneshot::Sender<Result<block::Hash, BoxError>>,
);
/// A queue of blocks, awaiting the arrival of parent blocks.
#[derive(Debug, Default)]
@ -27,6 +37,7 @@ impl QueuedBlocks {
/// # Panics
///
/// - if a block with the same `block::Hash` has already been queued.
#[instrument(skip(self), fields(height = ?new.0.height, hash = %new.0.hash))]
pub fn queue(&mut self, new: QueuedBlock) {
let new_hash = new.0.hash;
let new_height = new.0.height;
@ -94,6 +105,7 @@ impl QueuedBlocks {
/// Remove all queued blocks whose height is less than or equal to the given
/// `finalized_tip_height`.
#[instrument(skip(self))]
pub fn prune_by_height(&mut self, finalized_tip_height: block::Height) {
// split_off returns the values _greater than or equal to_ the key. What
// we need is the keys that are less than or equal to
@ -165,11 +177,13 @@ impl QueuedBlocks {
}
/// Try to look up this UTXO in any queued block.
#[instrument(skip(self))]
pub fn utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::Utxo> {
self.known_utxos.get(outpoint).cloned()
}
}
// TODO: move these tests into their own `tests/vectors.rs` module
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@ -4,11 +4,15 @@
//! best [`Chain`][5] in the [`NonFinalizedState`][3], and the database in the
//! [`FinalizedState`][4].
//!
//! [1]: super::StateService
//! [2]: super::ReadStateService
//! [3]: super::non_finalized_state::NonFinalizedState
//! [4]: super::finalized_state::FinalizedState
//! [5]: super::Chain
//! [1]: service::StateService
//! [2]: service::ReadStateService
//! [3]: service::non_finalized_state::NonFinalizedState
//! [4]: service::finalized_state::FinalizedState
//! [5]: service::non_finalized_state::Chain
// Tidy up some doc links
#[allow(unused_imports)]
use crate::service;
pub mod address;
pub mod block;
@ -21,9 +25,9 @@ mod tests;
pub use address::{
balance::transparent_balance,
tx_id::transparent_tx_ids,
utxo::{transparent_utxos, AddressUtxos, ADDRESS_HEIGHTS_FULL_RANGE},
utxo::{address_utxos, AddressUtxos, ADDRESS_HEIGHTS_FULL_RANGE},
};
pub use block::{block, block_header, transaction};
pub use block::{any_utxo, block, block_header, transaction, utxo};
pub use find::{
block_locator, chain_contains_hash, depth, find_chain_hashes, find_chain_headers,
hash_by_height, height_by_hash, tip, tip_height,

View File

@ -1,4 +1,14 @@
//! Reading address balances.
//!
//! In the functions in this module:
//!
//! The StateService commits blocks to the finalized state before updating
//! `chain` from the latest chain. Then it can commit additional blocks to
//! the finalized state after we've cloned the `chain`.
//!
//! This means that some blocks can be in both:
//! - the cached [`Chain`], and
//! - the shared finalized [`ZebraDb`] reference.
use std::{collections::HashSet, sync::Arc};
@ -91,8 +101,7 @@ fn chain_transparent_balance_change(
) -> Amount<NegativeAllowed> {
// # Correctness
//
// The StateService commits blocks to the finalized state before updating the latest chain,
// and it can commit additional blocks after we've cloned this `chain` variable.
// Find the balance adjustment that corrects for overlapping finalized and non-finalized blocks.
// Check if the finalized and non-finalized states match
let required_chain_root = finalized_tip

View File

@ -1,4 +1,14 @@
//! Reading address transaction IDs.
//!
//! In the functions in this module:
//!
//! The StateService commits blocks to the finalized state before updating
//! `chain` from the latest chain. Then it can commit additional blocks to
//! the finalized state after we've cloned the `chain`.
//!
//! This means that some blocks can be in both:
//! - the cached [`Chain`], and
//! - the shared finalized [`ZebraDb`] reference.
use std::{
collections::{BTreeMap, HashSet},
@ -134,10 +144,7 @@ where
// # Correctness
//
// The StateService commits blocks to the finalized state before updating the latest chain,
// and it can commit additional blocks after we've cloned this `chain` variable.
//
// But we can compensate for addresses with mismatching blocks,
// We can compensate for addresses with mismatching blocks,
// by adding the overlapping non-finalized transaction IDs.
//
// If there is only one address, mismatches aren't possible,

View File

@ -1,4 +1,14 @@
//! Transparent address index UTXO queries.
//!
//! In the functions in this module:
//!
//! The StateService commits blocks to the finalized state before updating
//! `chain` from the latest chain. Then it can commit additional blocks to
//! the finalized state after we've cloned the `chain`.
//!
//! This means that some blocks can be in both:
//! - the cached [`Chain`], and
//! - the shared finalized [`ZebraDb`] reference.
use std::{
collections::{BTreeMap, BTreeSet, HashSet},
@ -83,7 +93,7 @@ impl AddressUtxos {
///
/// If the addresses do not exist in the non-finalized `chain` or finalized `db`,
/// returns an empty list.
pub fn transparent_utxos<C>(
pub fn address_utxos<C>(
network: Network,
chain: Option<C>,
db: &ZebraDb,
@ -100,7 +110,7 @@ where
for attempt in 0..=FINALIZED_ADDRESS_INDEX_RETRIES {
debug!(?attempt, ?address_count, "starting address UTXO query");
let (finalized_utxos, finalized_tip_range) = finalized_transparent_utxos(db, &addresses);
let (finalized_utxos, finalized_tip_range) = finalized_address_utxos(db, &addresses);
debug!(
finalized_utxo_count = ?finalized_utxos.len(),
@ -162,7 +172,7 @@ where
/// If the addresses do not exist in the finalized `db`, returns an empty list.
//
// TODO: turn the return type into a struct?
fn finalized_transparent_utxos(
fn finalized_address_utxos(
db: &ZebraDb,
addresses: &HashSet<transparent::Address>,
) -> (
@ -176,7 +186,7 @@ fn finalized_transparent_utxos(
// Check if the finalized state changed while we were querying it
let start_finalized_tip = db.finalized_tip_height();
let finalized_utxos = db.partial_finalized_transparent_utxos(addresses);
let finalized_utxos = db.partial_finalized_address_utxos(addresses);
let end_finalized_tip = db.finalized_tip_height();
@ -234,10 +244,7 @@ where
// # Correctness
//
// The StateService commits blocks to the finalized state before updating the latest chain,
// and it can commit additional blocks after we've cloned this `chain` variable.
//
// But we can compensate for deleted UTXOs by applying the overlapping non-finalized UTXO changes.
// We can compensate for deleted UTXOs by applying the overlapping non-finalized UTXO changes.
// Check if the finalized and non-finalized states match or overlap
let required_min_non_finalized_root = finalized_tip_range.start().0 + 1;

View File

@ -1,14 +1,29 @@
//! Shared block, header, and transaction reading code.
//!
//! In the functions in this module:
//!
//! The StateService commits blocks to the finalized state before updating
//! `chain` or `non_finalized_state` from the latest chains. Then it can
//! commit additional blocks to the finalized state after we've cloned the
//! `chain` or `non_finalized_state`.
//!
//! This means that some blocks can be in both:
//! - the cached [`Chain`] or [`NonFinalizedState`], and
//! - the shared finalized [`ZebraDb`] reference.
use std::sync::Arc;
use zebra_chain::{
block::{self, Block, Height},
transaction::{self, Transaction},
transparent::{self, Utxo},
};
use crate::{
service::{finalized_state::ZebraDb, non_finalized_state::Chain},
service::{
finalized_state::ZebraDb,
non_finalized_state::{Chain, NonFinalizedState},
},
HashOrHeight,
};
@ -20,10 +35,6 @@ where
{
// # Correctness
//
// The StateService commits blocks to the finalized state before updating
// the latest chain, and it can commit additional blocks after we've cloned
// this `chain` variable.
//
// Since blocks are the same in the finalized and non-finalized state, we
// check the most efficient alternative first. (`chain` is always in memory,
// but `db` stores blocks on disk, with a memory cache.)
@ -46,10 +57,6 @@ where
{
// # Correctness
//
// The StateService commits blocks to the finalized state before updating
// the latest chain, and it can commit additional blocks after we've cloned
// this `chain` variable.
//
// Since blocks are the same in the finalized and non-finalized state, we
// check the most efficient alternative first. (`chain` is always in memory,
// but `db` stores blocks on disk, with a memory cache.)
@ -72,10 +79,6 @@ where
{
// # Correctness
//
// The StateService commits blocks to the finalized state before updating
// the latest chain, and it can commit additional blocks after we've cloned
// this `chain` variable.
//
// Since transactions are the same in the finalized and non-finalized state,
// we check the most efficient alternative first. (`chain` is always in
// memory, but `db` stores transactions on disk, with a memory cache.)
@ -88,3 +91,53 @@ where
})
.or_else(|| db.transaction(hash))
}
/// Returns the [`Utxo`] for [`transparent::OutPoint`], if it exists in the
/// non-finalized `chain` or finalized `db`.
///
/// Non-finalized UTXOs are returned regardless of whether they have been spent.
///
/// Finalized UTXOs are only returned if they are unspent in the finalized chain.
/// They may have been spent in the non-finalized chain,
/// but this function returns them without checking for non-finalized spends,
/// because we don't know which non-finalized chain will be committed to the finalized state.
pub fn utxo<C>(chain: Option<C>, db: &ZebraDb, outpoint: transparent::OutPoint) -> Option<Utxo>
where
C: AsRef<Chain>,
{
// # Correctness
//
// Since UTXOs are the same in the finalized and non-finalized state,
// we check the most efficient alternative first. (`chain` is always in
// memory, but `db` stores transactions on disk, with a memory cache.)
chain
.and_then(|chain| chain.as_ref().created_utxo(&outpoint))
.or_else(|| db.utxo(&outpoint).map(|utxo| utxo.utxo))
}
/// Returns the [`Utxo`] for [`transparent::OutPoint`], if it exists in any chain
/// in the `non_finalized_state`, or in the finalized `db`.
///
/// Non-finalized UTXOs are returned regardless of whether they have been spent.
///
/// Finalized UTXOs are only returned if they are unspent in the finalized chain.
/// They may have been spent in one or more non-finalized chains,
/// but this function returns them without checking for non-finalized spends,
/// because we don't know which non-finalized chain the request belongs to.
///
/// UTXO spends are checked once the block reaches the non-finalized state,
/// by [`check::utxo::transparent_spend()`](crate::service::check::utxo::transparent_spend).
pub fn any_utxo(
non_finalized_state: NonFinalizedState,
db: &ZebraDb,
outpoint: transparent::OutPoint,
) -> Option<Utxo> {
// # Correctness
//
// Since UTXOs are the same in the finalized and non-finalized state,
// we check the most efficient alternative first. (`non_finalized_state` is always in
// memory, but `db` stores transactions on disk, with a memory cache.)
non_finalized_state
.any_utxo(&outpoint)
.or_else(|| db.utxo(&outpoint).map(|utxo| utxo.utxo))
}

View File

@ -1,4 +1,14 @@
//! Finding and reading block hashes and headers, in response to peer requests.
//!
//! In the functions in this module:
//!
//! The StateService commits blocks to the finalized state before updating
//! `chain` from the latest chain. Then it can commit additional blocks to
//! the finalized state after we've cloned the `chain`.
//!
//! This means that some blocks can be in both:
//! - the cached [`Chain`], and
//! - the shared finalized [`ZebraDb`] reference.
use std::{
iter,
@ -22,6 +32,12 @@ pub fn tip<C>(chain: Option<C>, db: &ZebraDb) -> Option<(Height, block::Hash)>
where
C: AsRef<Chain>,
{
// # Correctness
//
// If there is an overlap between the non-finalized and finalized states,
// where the finalized tip is above the non-finalized tip,
// Zebra is receiving a lot of blocks, or this request has been delayed for a long time,
// so it is acceptable to return either tip.
chain
.map(|chain| chain.as_ref().non_finalized_tip())
.or_else(|| db.tip())
@ -54,6 +70,11 @@ where
{
let chain = chain.as_ref();
// # Correctness
//
// It is ok to do this lookup in two different calls. Finalized state updates
// can only add overlapping blocks, and hashes are unique.
let tip = tip_height(chain, db)?;
let height = height_by_hash(chain, db, hash)?;
@ -65,6 +86,10 @@ pub fn height_by_hash<C>(chain: Option<C>, db: &ZebraDb, hash: block::Hash) -> O
where
C: AsRef<Chain>,
{
// # Correctness
//
// Finalized state updates can only add overlapping blocks, and hashes are unique.
chain
.and_then(|chain| chain.as_ref().height_by_hash(hash))
.or_else(|| db.height(hash))
@ -75,6 +100,16 @@ pub fn hash_by_height<C>(chain: Option<C>, db: &ZebraDb, height: Height) -> Opti
where
C: AsRef<Chain>,
{
// # Correctness
//
// Finalized state updates can only add overlapping blocks, and heights are unique
// in the current `chain`.
//
// If there is an overlap between the non-finalized and finalized states,
// where the finalized tip is above the non-finalized tip,
// Zebra is receiving a lot of blocks, or this request has been delayed for a long time,
// so it is acceptable to return hashes from either chain.
chain
.and_then(|chain| chain.as_ref().hash_by_height(height))
.or_else(|| db.hash(height))
@ -85,6 +120,15 @@ pub fn chain_contains_hash<C>(chain: Option<C>, db: &ZebraDb, hash: block::Hash)
where
C: AsRef<Chain>,
{
// # Correctness
//
// Finalized state updates can only add overlapping blocks, and hashes are unique.
//
// If there is an overlap between the non-finalized and finalized states,
// where the finalized tip is above the non-finalized tip,
// Zebra is receiving a lot of blocks, or this request has been delayed for a long time,
// so it is acceptable to return hashes from either chain.
chain
.map(|chain| chain.as_ref().height_by_hash.contains_key(&hash))
.unwrap_or(false)
@ -102,6 +146,19 @@ where
{
let chain = chain.as_ref();
// # Correctness
//
// It is ok to do these lookups using multiple database calls. Finalized state updates
// can only add overlapping blocks, and hashes are unique.
//
// If there is an overlap between the non-finalized and finalized states,
// where the finalized tip is above the non-finalized tip,
// Zebra is receiving a lot of blocks, or this request has been delayed for a long time,
// so it is acceptable to return a set of hashes from multiple chains.
//
// Multiple heights can not map to the same hash, even in different chains,
// because the block height is covered by the block hash,
// via the transaction merkle tree commitments.
let tip_height = tip_height(chain, db)?;
let heights = block_locator_heights(tip_height);
@ -419,6 +476,10 @@ pub fn find_chain_hashes<C>(
where
C: AsRef<Chain>,
{
// # Correctness
//
// See the note in `block_locator()`.
let chain = chain.as_ref();
let intersection = find_chain_intersection(chain, db, known_blocks);
@ -439,6 +500,14 @@ pub fn find_chain_headers<C>(
where
C: AsRef<Chain>,
{
// # Correctness
//
// Headers are looked up by their hashes using a unique mapping,
// so it is not possible for multiple hashes to look up the same header,
// even across different chains.
//
// See also the note in `block_locator()`.
let chain = chain.as_ref();
let intersection = find_chain_intersection(chain, db, known_blocks);

View File

@ -1,4 +1,14 @@
//! Reading note commitment trees.
//!
//! In the functions in this module:
//!
//! The StateService commits blocks to the finalized state before updating
//! `chain` from the latest chain. Then it can commit additional blocks to
//! the finalized state after we've cloned the `chain`.
//!
//! This means that some blocks can be in both:
//! - the cached [`Chain`], and
//! - the shared finalized [`ZebraDb`] reference.
use std::sync::Arc;
@ -22,10 +32,6 @@ where
{
// # Correctness
//
// The StateService commits blocks to the finalized state before updating
// the latest chain, and it can commit additional blocks after we've cloned
// this `chain` variable.
//
// Since sapling treestates are the same in the finalized and non-finalized
// state, we check the most efficient alternative first. (`chain` is always
// in memory, but `db` stores blocks on disk, with a memory cache.)
@ -47,10 +53,6 @@ where
{
// # Correctness
//
// The StateService commits blocks to the finalized state before updating
// the latest chain, and it can commit additional blocks after we've cloned
// this `chain` variable.
//
// Since orchard treestates are the same in the finalized and non-finalized
// state, we check the most efficient alternative first. (`chain` is always
// in memory, but `db` stores blocks on disk, with a memory cache.)