consensus,state: document cancellation contracts for services

This change explicitly documents cancellation contracts for our Tower services,
and tries to correct a bug in the implementation of the CheckpointVerifier,
which duplicates information from the state service but did not ensure that it
would be kept in sync.
This commit is contained in:
Henry de Valence 2020-11-12 11:43:17 -08:00
parent d5d17a9a71
commit a3ab589d89
5 changed files with 89 additions and 46 deletions

View File

@ -87,10 +87,18 @@ where
}
}
/// Return a block verification service, using `config`, `network` and
/// `state_service`.
/// Initialize a block verification service.
///
/// The consensus configuration is specified by `config`, and the Zcash network
/// to verify blocks for is specified by `network`.
///
/// The block verification service asynchronously performs semantic verification
/// checks. Blocks that pass semantic verification are submitted to the supplied
/// `state_service` for contextual verification before being committed to the chain.
///
/// This function should only be called once for a particular state service.
///
/// Dropped requests are cancelled on a best-effort basis, but may continue to be processed.
#[instrument(skip(state_service))]
pub async fn init<S>(
config: Config,

View File

@ -98,8 +98,6 @@ where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static,
{
// Inputs
//
/// The checkpoint list for this verifier.
checkpoint_list: CheckpointList,
@ -109,8 +107,6 @@ where
/// The underlying state service, possibly wrapped in other services.
state_service: S,
// Queued Blocks
//
/// A queue of unverified blocks.
///
/// Contains a list of unverified blocks at each block height. In most cases,
@ -127,9 +123,6 @@ where
verifier_progress: Progress<block::Height>,
}
/// The CheckpointVerifier implementation.
///
/// Contains non-service utility functions for CheckpointVerifiers.
impl<S> CheckpointVerifier<S>
where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
@ -386,8 +379,7 @@ where
height,
verified_height: previous_height,
});
// TODO: reduce to trace level once the AlreadyVerified bug is fixed
tracing::info!(?e);
tracing::trace!(?e);
e?;
}
InitialTip(_) | PreviousCheckpoint(_) => {}
@ -580,7 +572,7 @@ where
valid_qblock
}
/// Check all the blocks in the current checkpoint range.
/// Try to verify from the previous checkpoint to a target checkpoint.
///
/// Send `Ok` for the blocks that are in the chain, and `Err` for side-chain
/// blocks.
@ -830,34 +822,43 @@ where
return async { Err(VerifyCheckpointError::Finished) }.boxed();
}
// Queue the block for verification, until we receive all the blocks for
// the current checkpoint range.
let rx = self.queue_block(block.clone());
// Try to verify from the previous checkpoint to a target checkpoint.
//
// If there are multiple checkpoints in the target range, and one of
// the ranges is invalid, we'll try again with a smaller target range
// on the next call(). Failures always reject a block, so we know
// there will be at least one more call().
//
// We don't retry with a smaller range on failure, because failures
// should be rare.
self.process_checkpoint_range();
metrics::gauge!("checkpoint.queued_slots", self.queued.len() as i64);
// Because the checkpoint verifier duplicates state from the state
// service (it tracks which checkpoints have been verified), we must
// commit blocks transactionally on a per-checkpoint basis. Otherwise,
// the checkpoint verifier's state could desync from the underlying
// state service. Among other problems, this could cause the checkpoint
// verifier to reject blocks not already in the state as
// already-verified.
//
// To commit blocks transactionally on a per-checkpoint basis, we must
// commit all verified blocks in a checkpoint range, regardless of
// whether or not the response futures for each block were dropped.
//
// We accomplish this by spawning a new task containing the
// commit-if-verified logic. This task will always execute, except if
// the program is interrupted, in which case there is no longer a
// checkpoint verifier to keep in sync with the state.
let mut state_service = self.state_service.clone();
async move {
let commit_finalized_block = tokio::spawn(async move {
let hash = rx
.await
.expect("CheckpointVerifier does not leave dangling receivers")?;
// Once we get a verified hash, we must commit it to the chain state
// as a finalized block, or exit the program, so .expect rather than
// propagate errors from the state service.
match state_service
.ready_and()
.await
.map_err(VerifyCheckpointError::CommitFinalized)?
.expect("Verified checkpoints must be committed transactionally")
.call(zs::Request::CommitFinalizedBlock { block })
.await
.map_err(VerifyCheckpointError::CommitFinalized)?
.expect("Verified checkpoints must be committed transactionally")
{
zs::Response::Committed(committed_hash) => {
assert_eq!(committed_hash, hash, "state must commit correct hash");
@ -865,6 +866,12 @@ where
}
_ => unreachable!("wrong response for CommitFinalizedBlock"),
}
});
async move {
commit_finalized_block
.await
.expect("commit_finalized_block should not panic")
}
.boxed()
}

View File

@ -14,6 +14,18 @@ use super::super::types::Nonce;
/// possible [`Response`s](super::Response) it can generate; it is fine (and
/// recommended!) to match on the expected responses and treat the others as
/// `unreachable!()`, since their return indicates a bug in the network code.
///
/// # Cancellations
///
/// The peer set handles cancelled requests (i.e., requests where the future
/// returned by `Service::call` is dropped before it resolves) on a best-effort
/// basis. Requests are routed to a particular peer connection, and then
/// translated into Zcash protocol messages and sent over the network. If a
/// request is cancelled after it is submitted but before it is processed by a
/// peer connection, no messages will be sent. Otherwise, if it is cancelled
/// while waiting for a response, the peer connection resets its state and makes
/// a best-effort attempt to ignore any messages responsive to the cancelled
/// request, subject to limitations in the underlying Zcash protocol.
#[derive(Clone, Debug)]
pub enum Request {
/// Requests additional peers from the server.

View File

@ -53,11 +53,18 @@ pub enum Request {
/// Performs contextual validation of the given block, committing it to the
/// state if successful.
///
/// Returns [`Response::Committed`] with the hash of the newly
/// committed block, or an error.
/// It is the caller's responsibility to perform semantic validation. This
/// request can be made out-of-order; the state service will queue it until
/// its parent is ready.
///
/// This request can be made out-of-order; the state service will buffer it
/// until its parent is ready.
/// Returns [`Response::Committed`] with the hash of the block when it is
/// committed to the state, or an error if the block fails contextual
/// validation or has already been committed to the state.
///
/// This request cannot be cancelled once submitted; dropping the response
/// future will have no effect on whether it is eventually processed. A
/// request to commit a block which has been queued internally but not yet
/// committed will fail the older request and replace it with the newer request.
CommitBlock {
/// The block to commit to the state.
block: Arc<Block>,
@ -66,15 +73,20 @@ pub enum Request {
// sapling_anchor: sapling::tree::Root,
},
/// Commit a finalized block to the state, skipping contextual validation.
/// Commit a finalized block to the state, skipping all validation.
///
/// This is exposed for use in checkpointing, which produces finalized
/// blocks.
/// blocks. It is the caller's responsibility to ensure that the block is
/// valid and final. This request can be made out-of-order; the state service
/// will queue it until its parent is ready.
///
/// Returns [`Response::Committed`] with the hash of the newly
/// committed block, or an error.
/// Returns [`Response::Committed`] with the hash of the newly committed
/// block, or an error.
///
/// This request can be made out-of-order; the state service will buffer it
/// until its parent is ready.
/// This request cannot be cancelled once submitted; dropping the response
/// future will have no effect on whether it is eventually processed.
/// Duplicate requests should not be made, because it is the caller's
/// responsibility to ensure that each block is valid and final.
CommitFinalizedBlock {
/// The block to commit to the state.
block: Arc<Block>,
@ -124,8 +136,14 @@ pub enum Request {
/// [`block::Height`] using `.into()`.
Block(HashOrHeight),
/// Request a UTXO identified by the given Outpoint in any chain.
/// Request a UTXO identified by the given Outpoint, waiting until it becomes
/// available if it is unknown.
///
/// Returns UTXOs fron any chain, including side-chains.
/// This request is purely informational, and there are no guarantees about
/// whether the UTXO remains unspent or is on the best chain. Its purpose is
/// to allow asynchronous script verification.
///
/// Code making this request should apply a timeout layer to the service to
/// handle missing UTXOs.
AwaitUtxo(transparent::OutPoint),
}

View File

@ -92,19 +92,17 @@ impl StateService {
if self.contains_committed_block(&block) {
let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Err("duplicate block".into()));
let _ = rsp_tx.send(Err("block is already committed to the state".into()));
return rsp_rx;
}
// The queue of blocks maintained by this service acts as a pipeline for
// blocks waiting for contextual verification. We lazily flush the
// pipeline here by handling duplicate requests to verify an existing
// queued block. We handle those duplicate requests by replacing the old
// channel with the new one and sending an error over the old channel.
// Request::CommitBlock contract: a request to commit a block which has
// been queued but not yet committed to the state fails the older
// request and replaces it with the newer request.
let rsp_rx = if let Some(queued_block) = self.queued_blocks.get_mut(&hash) {
let (mut rsp_tx, rsp_rx) = oneshot::channel();
std::mem::swap(&mut queued_block.rsp_tx, &mut rsp_tx);
let _ = rsp_tx.send(Err("duplicate block".into()));
let _ = rsp_tx.send(Err("replaced by newer request".into()));
rsp_rx
} else {
let (rsp_tx, rsp_rx) = oneshot::channel();