change(state): Write finalized blocks to the state in a separate thread, to avoid network and RPC hangs (#5134)

* Add a new block commit task and channels, that don't do anything yet

* Add last_block_hash_sent to the state service, to avoid database accesses

* Update last_block_hash_sent regardless of commit errors

* Rename a field to StateService.max_queued_finalized_height

* Commit finalized blocks to the state in a separate task

* Check for panics in the block write task

* Wait for the block commit task in tests, and check for errors

* Always run a proptest that sleeps once

* Add extra debugging to state shutdowns

* Work around a RocksDB shutdown bug

* Close the finalized block channel when we're finished with it

* Only reset state queue once per error

* Update some TODOs

* Add a module doc comment

* Drop channels and check for closed channels in the block commit task

* Close state channels and tasks on drop

* Remove some duplicate fields across StateService and ReadStateService

* Try tweaking the shutdown steps

* Update and clarify some comments

* Clarify another comment

* Don't try to cancel RocksDB background work on drop

* Fix up some comments

* Remove some duplicate code

* Remove redundant workarounds for shutdown issues

* Remode a redundant channel close in the block commit task

* Remove a mistaken `!force` shutdown condition

* Remove duplicate force-shutdown code and explain it better

* Improve RPC error logging

* Wait for chain tip updates in the RPC tests

* Wait 2 seconds for chain tip updates before skipping them

* Remove an unnecessary block_in_place()

* Fix some test error messages that were changed by earlier fixes

* Expand some comments, fix typos

Co-authored-by: Marek <mail@marek.onl>

* Actually drop children of failed blocks

* Explain why we drop descendants of failed blocks

* Clarify a comment

* Wait for chain tip updates in a failing test on macOS

* Clean duplicate finalized blocks when the non-finalized state activates

* Send an error when receiving a duplicate finalized block

* Update checkpoint block behaviour, document its consensus rule

* Wait for chain tip changes in inbound_block_height_lookahead_limit test

* Wait for the genesis block to commit in the fake peer set mempool tests

* Disable unreliable mempool verification check in the send transaction test

* Appease rustfmt

* Use clear_finalized_block_queue() everywhere that blocks are dropped

* Document how Finalized and NonFinalized clones are different

* Use the same check as commit_finalized() for finalized block heights

Co-authored-by: Marek <mail@marek.onl>

Co-authored-by: Marek <mail@marek.onl>
This commit is contained in:
teor 2022-09-29 02:09:56 +10:00 committed by GitHub
parent 55e5a13fc8
commit 343c5e68d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 929 additions and 284 deletions

View File

@ -872,7 +872,7 @@ where
hashes
.iter()
.map(|(tx_loc, tx_id)| {
// TODO: downgrade to debug, because there's nothing the user can do
// Check that the returned transactions are in chain order.
assert!(
*tx_loc > last_tx_location,
"Transactions were not in chain order:\n\
@ -931,7 +931,7 @@ where
let satoshis = u64::from(utxo_data.3.value);
let output_location = *utxo_data.2;
// TODO: downgrade to debug, because there's nothing the user can do
// Check that the returned UTXOs are in chain order.
assert!(
output_location > last_output_location,
"UTXOs were not in chain order:\n\
@ -1272,17 +1272,19 @@ impl GetRawTransaction {
/// Check if provided height range is valid for address indexes.
fn check_height_range(start: Height, end: Height, chain_height: Height) -> Result<()> {
if start == Height(0) || end == Height(0) {
return Err(Error::invalid_params(
"Start and end are expected to be greater than zero",
));
return Err(Error::invalid_params(format!(
"start {start:?} and end {end:?} must both be greater than zero"
)));
}
if end < start {
return Err(Error::invalid_params(
"End value is expected to be greater than or equal to start",
));
if start > end {
return Err(Error::invalid_params(format!(
"start {start:?} must be less than or equal to end {end:?}"
)));
}
if start > chain_height || end > chain_height {
return Err(Error::invalid_params("Start or end is outside chain range"));
return Err(Error::invalid_params(format!(
"start {start:?} and end {end:?} must both be less than or equal to the chain tip {chain_height:?}"
)));
}
Ok(())

View File

@ -395,7 +395,7 @@ async fn rpc_getaddresstxids_invalid_arguments() {
.unwrap_err();
assert_eq!(
error.message,
"End value is expected to be greater than or equal to start".to_string()
"start Height(2) must be less than or equal to end Height(1)".to_string()
);
// call the method with start equal zero
@ -411,7 +411,7 @@ async fn rpc_getaddresstxids_invalid_arguments() {
.unwrap_err();
assert_eq!(
error.message,
"Start and end are expected to be greater than zero".to_string()
"start Height(0) and end Height(1) must both be greater than zero".to_string()
);
// call the method outside the chain tip height
@ -427,7 +427,7 @@ async fn rpc_getaddresstxids_invalid_arguments() {
.unwrap_err();
assert_eq!(
error.message,
"Start or end is outside chain range".to_string()
"start Height(1) and end Height(11) must both be less than or equal to the chain tip Height(10)".to_string()
);
mempool.expect_no_requests().await;

View File

@ -17,6 +17,8 @@ use crate::{
/// Mocks computation done during semantic validation
pub trait Prepare {
/// Runs block semantic validation computation, and returns the result.
/// Test-only method.
fn prepare(self) -> PreparedBlock;
}

View File

@ -16,7 +16,8 @@
extern crate tracing;
#[cfg(any(test, feature = "proptest-impl"))]
mod arbitrary;
pub mod arbitrary;
mod config;
pub mod constants;
mod error;
@ -39,7 +40,7 @@ pub use service::{
#[cfg(any(test, feature = "proptest-impl"))]
pub use service::{
arbitrary::populated_state,
arbitrary::{populated_state, CHAIN_TIP_UPDATE_WAIT_LIMIT},
chain_tip::{ChainTipBlock, ChainTipSender},
init_test, init_test_services,
};

View File

@ -381,20 +381,44 @@ pub enum Request {
/// documentation for details.
CommitBlock(PreparedBlock),
/// Commit a finalized block to the state, skipping all validation.
/// Commit a checkpointed block to the state, skipping most block validation.
///
/// This is exposed for use in checkpointing, which produces finalized
/// blocks. It is the caller's responsibility to ensure that the block is
/// valid and final. This request can be made out-of-order; the state service
/// will queue it until its parent is ready.
/// semantically valid and final. This request can be made out-of-order;
/// the state service will queue it until its parent is ready.
///
/// Returns [`Response::Committed`] with the hash of the newly committed
/// block, or an error.
///
/// This request cannot be cancelled once submitted; dropping the response
/// future will have no effect on whether it is eventually processed.
/// Duplicate requests should not be made, because it is the caller's
/// responsibility to ensure that each block is valid and final.
/// Duplicate requests will replace the older duplicate, and return an error
/// in its response future.
///
/// # Note
///
/// Finalized and non-finalized blocks are an internal Zebra implementation detail.
/// There is no difference between these blocks on the network, or in Zebra's
/// network or syncer implementations.
///
/// # Consensus
///
/// Checkpointing is allowed under the Zcash "social consensus" rules.
/// Zebra checkpoints both settled network upgrades, and blocks past the rollback limit.
/// (By the time Zebra release is tagged, its final checkpoint is typically hours or days old.)
///
/// > A network upgrade is settled on a given network when there is a social consensus
/// > that it has activated with a given activation block hash. A full validator that
/// > potentially risks Mainnet funds or displays Mainnet transaction information to a user
/// > MUST do so only for a block chain that includes the activation block of the most
/// > recent settled network upgrade, with the corresponding activation block hash.
/// > ...
/// > A full validator MAY impose a limit on the number of blocks it will “roll back”
/// > when switching from one best valid block chain to another that is not a descendent.
/// > For `zcashd` and `zebra` this limit is 100 blocks.
///
/// <https://zips.z.cash/protocol/protocol.pdf#blockchain>
///
/// # Correctness
///

View File

@ -19,6 +19,7 @@ use std::{
convert,
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
time::{Duration, Instant},
};
@ -65,6 +66,7 @@ mod non_finalized_state;
mod pending_utxos;
mod queued_blocks;
pub(crate) mod read;
mod write;
#[cfg(any(test, feature = "proptest-impl"))]
pub mod arbitrary;
@ -74,7 +76,7 @@ mod tests;
pub use finalized_state::{OutputIndex, OutputLocation, TransactionLocation};
use self::queued_blocks::QueuedFinalized;
use self::queued_blocks::{QueuedFinalized, QueuedNonFinalized};
/// A read-write service for Zebra's cached blockchain state.
///
@ -126,6 +128,43 @@ pub(crate) struct StateService {
// and block write task share ownership of the database.
pub(crate) disk: FinalizedState,
/// A channel to send blocks to the `block_write_task`,
/// so they can be written to the [`NonFinalizedState`].
//
// TODO: actually send blocks on this channel
non_finalized_block_write_sender:
Option<tokio::sync::mpsc::UnboundedSender<QueuedNonFinalized>>,
/// A channel to send blocks to the `block_write_task`,
/// so they can be written to the [`FinalizedState`].
///
/// This sender is dropped after the state has finished sending all the checkpointed blocks,
/// and the lowest non-finalized block arrives.
finalized_block_write_sender: Option<tokio::sync::mpsc::UnboundedSender<QueuedFinalized>>,
/// The [`block::Hash`] of the most recent block sent on
/// `finalized_block_write_sender` or `non_finalized_block_write_sender`.
///
/// On startup, this is:
/// - the finalized tip, if there are stored blocks, or
/// - the genesis block's parent hash, if the database is empty.
///
/// If `invalid_block_reset_receiver` gets a reset, this is:
/// - the hash of the last valid committed block (the parent of the invalid block).
//
// TODO:
// - turn this into an IndexMap containing recent non-finalized block hashes and heights
// (they are all potential tips)
// - remove block hashes once their heights are strictly less than the finalized tip
last_block_hash_sent: block::Hash,
/// If an invalid block is sent on `finalized_block_write_sender`
/// or `non_finalized_block_write_sender`,
/// this channel gets the [`block::Hash`] of the valid tip.
//
// TODO: add tests for finalized and non-finalized resets (#2654)
invalid_block_reset_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
// Pending UTXO Request Tracking
//
/// The set of outpoints with pending requests for their associated transparent::Output.
@ -134,15 +173,19 @@ pub(crate) struct StateService {
/// Instant tracking the last time `pending_utxos` was pruned.
last_prune: Instant,
// Concurrently Readable State
// Updating Concurrently Readable State
//
/// A sender channel used to update the current best chain tip for
/// [`LatestChainTip`] and [`ChainTipChange`].
chain_tip_sender: ChainTipSender,
//
// TODO: remove this copy of the chain tip sender, and get rid of the mutex in the block write task
chain_tip_sender: Arc<Mutex<ChainTipSender>>,
/// A sender channel used to update the recent non-finalized state for the [`ReadStateService`].
non_finalized_state_sender: watch::Sender<NonFinalizedState>,
// Concurrently Readable State
//
/// A cloneable [`ReadStateService`], used to answer concurrent read requests.
///
/// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`.
@ -154,7 +197,9 @@ pub(crate) struct StateService {
///
/// Set to `f64::NAN` if `queued_finalized_blocks` is empty, because grafana shows NaNs
/// as a break in the graph.
max_queued_height: f64,
//
// TODO: add a similar metric for `queued_non_finalized_blocks`
max_queued_finalized_height: f64,
}
/// A read-only service for accessing Zebra's cached blockchain state.
@ -177,7 +222,7 @@ pub struct ReadStateService {
// Shared Concurrently Readable State
//
/// A watch channel for a recent [`NonFinalizedState`].
/// A watch channel with a cached copy of the [`NonFinalizedState`].
///
/// This state is only updated between requests,
/// so it might include some block data that is also on `disk`.
@ -191,6 +236,63 @@ pub struct ReadStateService {
/// This chain is updated concurrently with requests,
/// so it might include some block data that is also in `best_mem`.
db: ZebraDb,
/// A shared handle to a task that writes blocks to the [`NonFinalizedState`] or [`FinalizedState`],
/// once the queues have received all their parent blocks.
///
/// Used to check for panics when writing blocks.
block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
}
impl Drop for StateService {
fn drop(&mut self) {
// The state service owns the state, tasks, and channels,
// so dropping it should shut down everything.
// Close the channels (non-blocking)
// This makes the block write thread exit the next time it checks the channels.
// We want to do this here so we get any errors or panics from the block write task before it shuts down.
self.invalid_block_reset_receiver.close();
std::mem::drop(self.finalized_block_write_sender.take());
std::mem::drop(self.non_finalized_block_write_sender.take());
self.clear_finalized_block_queue("dropping the state: dropped unused queued block");
// Then drop self.read_service, which checks the block write task for panics,
// and tries to shut down the database.
}
}
impl Drop for ReadStateService {
fn drop(&mut self) {
// The read state service shares the state,
// so dropping it should check if we can shut down.
if let Some(block_write_task) = self.block_write_task.take() {
if let Ok(block_write_task_handle) = Arc::try_unwrap(block_write_task) {
// We're the last database user, so we can tell it to shut down (blocking):
// - flushes the database to disk, and
// - drops the database, which cleans up any database tasks correctly.
self.db.shutdown(true);
// We are the last state with a reference to this thread, so we can
// wait until the block write task finishes, then check for panics (blocking).
// (We'd also like to abort the thread, but std::thread::JoinHandle can't do that.)
info!("waiting for the block write task to finish");
if let Err(thread_panic) = block_write_task_handle.join() {
std::panic::resume_unwind(thread_panic);
} else {
info!("shutting down the state without waiting for the block write task");
}
}
} else {
// Even if we're not the last database user, try shutting it down.
//
// TODO: rename this to try_shutdown()?
self.db.shutdown(false);
}
}
}
impl StateService {
@ -205,12 +307,12 @@ impl StateService {
) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
let timer = CodeTimer::start();
let disk = FinalizedState::new(&config, network);
let finalized_state = FinalizedState::new(&config, network);
timer.finish(module_path!(), line!(), "opening finalized state database");
let timer = CodeTimer::start();
let initial_tip = disk
.db()
let initial_tip = finalized_state
.db
.tip_block()
.map(FinalizedBlock::from)
.map(ChainTipBlock::from);
@ -219,26 +321,56 @@ impl StateService {
let timer = CodeTimer::start();
let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
ChainTipSender::new(initial_tip, network);
let chain_tip_sender = Arc::new(Mutex::new(chain_tip_sender));
let mem = NonFinalizedState::new(network);
let non_finalized_state = NonFinalizedState::new(network);
let (read_service, non_finalized_state_sender) = ReadStateService::new(&disk);
// Security: The number of blocks in these channels is limited by
// the syncer and inbound lookahead limits.
let (non_finalized_block_write_sender, non_finalized_block_write_receiver) =
tokio::sync::mpsc::unbounded_channel();
let (finalized_block_write_sender, finalized_block_write_receiver) =
tokio::sync::mpsc::unbounded_channel();
let (invalid_block_reset_sender, invalid_block_reset_receiver) =
tokio::sync::mpsc::unbounded_channel();
let finalized_state_for_writing = finalized_state.clone();
let chain_tip_sender_for_writing = chain_tip_sender.clone();
let block_write_task = std::thread::spawn(move || {
write::write_blocks_from_channels(
finalized_block_write_receiver,
non_finalized_block_write_receiver,
finalized_state_for_writing,
invalid_block_reset_sender,
chain_tip_sender_for_writing,
)
});
let block_write_task = Arc::new(block_write_task);
let (read_service, non_finalized_state_sender) =
ReadStateService::new(&finalized_state, block_write_task);
let queued_non_finalized_blocks = QueuedBlocks::default();
let pending_utxos = PendingUtxos::default();
let last_block_hash_sent = finalized_state.db.finalized_tip_hash();
let state = Self {
network,
queued_non_finalized_blocks,
queued_finalized_blocks: HashMap::new(),
mem,
disk,
mem: non_finalized_state,
disk: finalized_state,
non_finalized_block_write_sender: Some(non_finalized_block_write_sender),
finalized_block_write_sender: Some(finalized_block_write_sender),
last_block_hash_sent,
invalid_block_reset_receiver,
pending_utxos,
last_prune: Instant::now(),
chain_tip_sender,
non_finalized_state_sender,
read_service: read_service.clone(),
max_queued_height: f64::NAN,
max_queued_finalized_height: f64::NAN,
};
timer.finish(module_path!(), line!(), "initializing state service");
@ -256,7 +388,7 @@ impl StateService {
state.network,
MAX_LEGACY_CHAIN_BLOCKS,
) {
let legacy_db_path = state.disk.path().to_path_buf();
let legacy_db_path = state.read_service.db.path().to_path_buf();
panic!(
"Cached state contains a legacy chain.\n\
An outdated Zebra version did not know about a recent network upgrade,\n\
@ -275,75 +407,147 @@ impl StateService {
}
/// Queue a finalized block for verification and storage in the finalized state.
///
/// Returns a channel receiver that provides the result of the block commit.
fn queue_and_commit_finalized(
&mut self,
finalized: FinalizedBlock,
) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
// # Correctness & Performance
//
// This method must not block, access the database, or perform CPU-intensive tasks,
// because it is called directly from the tokio executor's Future threads.
let queued_prev_hash = finalized.block.header.previous_block_hash;
let queued_height = finalized.height;
let (rsp_tx, rsp_rx) = oneshot::channel();
let queued = (finalized, rsp_tx);
// TODO: move this code into the state block commit task:
// - queue_and_commit_finalized()'s commit_finalized() call becomes a send to the block commit channel
// - run commit_finalized() in the state block commit task
// - run the metrics update in queue_and_commit_finalized() in the block commit task
// - run the set_finalized_tip() in this function in the state block commit task
// - move all that code to the inner service
let tip_block = self
.drain_queue_and_commit_finalized((finalized, rsp_tx))
.map(ChainTipBlock::from);
self.chain_tip_sender.set_finalized_tip(tip_block);
rsp_rx
}
/// Queue a finalized block to be committed to the state.
///
/// After queueing a finalized block, this method checks whether the newly
/// queued block (and any of its descendants) can be committed to the state.
///
/// Returns the highest finalized tip block committed from the queue,
/// or `None` if no blocks were committed in this call.
/// (Use `tip_block` to get the finalized tip, regardless of when it was committed.)
pub fn drain_queue_and_commit_finalized(
&mut self,
queued: QueuedFinalized,
) -> Option<FinalizedBlock> {
let mut highest_queue_commit = None;
let prev_hash = queued.0.block.header.previous_block_hash;
let height = queued.0.height;
self.queued_finalized_blocks.insert(prev_hash, queued);
while let Some(queued_block) = self
.queued_finalized_blocks
.remove(&self.disk.db().finalized_tip_hash())
{
if let Ok(finalized) = self.disk.commit_finalized(queued_block) {
highest_queue_commit = Some(finalized);
} else {
// the last block in the queue failed, so we can't commit the next block
break;
if self.finalized_block_write_sender.is_some() {
// We're still committing finalized blocks
if let Some(duplicate_queued) = self
.queued_finalized_blocks
.insert(queued_prev_hash, queued)
{
Self::send_finalized_block_error(
duplicate_queued,
"dropping older finalized block: got newer duplicate block",
);
}
self.drain_queue_and_commit_finalized();
} else {
// We've finished committing finalized blocks, so drop any repeated queued blocks,
// and return an error.
//
// TODO: track the latest sent height, and drop any blocks under that height
// every time we send some blocks (like QueuedNonFinalizedBlocks)
Self::send_finalized_block_error(
queued,
"already finished committing finalized blocks: dropped duplicate block, \
block is already committed to the state",
);
self.clear_finalized_block_queue(
"already finished committing finalized blocks: dropped duplicate block, \
block is already committed to the state",
);
}
if self.queued_finalized_blocks.is_empty() {
self.max_queued_height = f64::NAN;
} else if self.max_queued_height.is_nan() || self.max_queued_height < height.0 as f64 {
self.max_queued_finalized_height = f64::NAN;
} else if self.max_queued_finalized_height.is_nan()
|| self.max_queued_finalized_height < queued_height.0 as f64
{
// if there are still blocks in the queue, then either:
// - the new block was lower than the old maximum, and there was a gap before it,
// so the maximum is still the same (and we skip this code), or
// - the new block is higher than the old maximum, and there is at least one gap
// between the finalized tip and the new maximum
self.max_queued_height = height.0 as f64;
self.max_queued_finalized_height = queued_height.0 as f64;
}
metrics::gauge!("state.checkpoint.queued.max.height", self.max_queued_height);
metrics::gauge!(
"state.checkpoint.queued.max.height",
self.max_queued_finalized_height
);
metrics::gauge!(
"state.checkpoint.queued.block.count",
self.queued_finalized_blocks.len() as f64,
);
highest_queue_commit
rsp_rx
}
/// Finds queued finalized blocks to be committed to the state in order,
/// removes them from the queue, and sends them to the block commit task.
///
/// After queueing a finalized block, this method checks whether the newly
/// queued block (and any of its descendants) can be committed to the state.
///
/// Returns an error if the block commit channel has been closed.
pub fn drain_queue_and_commit_finalized(&mut self) {
use tokio::sync::mpsc::error::{SendError, TryRecvError};
// # Correctness & Performance
//
// This method must not block, access the database, or perform CPU-intensive tasks,
// because it is called directly from the tokio executor's Future threads.
// If a block failed, we need to start again from a valid tip.
match self.invalid_block_reset_receiver.try_recv() {
Ok(reset_tip_hash) => self.last_block_hash_sent = reset_tip_hash,
Err(TryRecvError::Disconnected) => {
info!("Block commit task closed the block reset channel. Is Zebra shutting down?");
return;
}
// There are no errors, so we can just use the last block hash we sent
Err(TryRecvError::Empty) => {}
}
while let Some(queued_block) = self
.queued_finalized_blocks
.remove(&self.last_block_hash_sent)
{
self.last_block_hash_sent = queued_block.0.hash;
// If we've finished sending finalized blocks, ignore any repeated blocks.
// (Blocks can be repeated after a syncer reset.)
if let Some(finalized_block_write_sender) = &self.finalized_block_write_sender {
let send_result = finalized_block_write_sender.send(queued_block);
// If the receiver is closed, we can't send any more blocks.
if let Err(SendError(queued)) = send_result {
// If Zebra is shutting down, drop blocks and return an error.
Self::send_finalized_block_error(
queued,
"block commit task exited. Is Zebra shutting down?",
);
self.clear_finalized_block_queue(
"block commit task exited. Is Zebra shutting down?",
);
};
}
}
}
/// Drops all queued finalized blocks, and sends an error on their result channels.
fn clear_finalized_block_queue(&mut self, error: impl Into<BoxError> + Clone) {
for (_hash, queued) in self.queued_finalized_blocks.drain() {
Self::send_finalized_block_error(queued, error.clone());
}
}
/// Send an error on a `QueuedFinalized` block's result channel, and drop the block
fn send_finalized_block_error(queued: QueuedFinalized, error: impl Into<BoxError>) {
let (finalized, rsp_tx) = queued;
// The block sender might have already given up on this block,
// so ignore any channel send errors.
let _ = rsp_tx.send(Err(error.into()));
std::mem::drop(finalized);
}
/// Queue a non finalized block for verification and check if any queued
@ -362,7 +566,7 @@ impl StateService {
let parent_hash = prepared.block.header.previous_block_hash;
if self.mem.any_chain_contains(&prepared.hash)
|| self.disk.db().hash(prepared.height).is_some()
|| self.read_service.db.hash(prepared.height).is_some()
{
let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Err("block is already committed to the state".into()));
@ -386,6 +590,31 @@ impl StateService {
rsp_rx
};
// We've finished sending finalized blocks when:
// - we've sent the finalized block for the last checkpoint, and
// - it has been successfully written to disk.
//
// We detect the last checkpoint by looking for non-finalized blocks
// that are a child of the last block we sent.
//
// TODO: configure the state with the last checkpoint hash instead?
if self.finalized_block_write_sender.is_some()
&& self
.queued_non_finalized_blocks
.has_queued_children(self.last_block_hash_sent)
&& self.read_service.db.finalized_tip_hash() == self.last_block_hash_sent
{
// Tell the block write task to stop committing finalized blocks,
// and move on to committing non-finalized blocks.
std::mem::drop(self.finalized_block_write_sender.take());
// We've finished committing finalized blocks, so drop any repeated queued blocks.
self.clear_finalized_block_queue(
"already finished committing finalized blocks: dropped duplicate block, \
block is already committed to the state",
);
}
// TODO: avoid a temporary verification failure that can happen
// if the first non-finalized block arrives before the last finalized block is committed
// (#5125)
@ -411,7 +640,7 @@ impl StateService {
);
}
let finalized_tip_height = self.disk.db().finalized_tip_height().expect(
let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
"Finalized state must have at least one block before committing non-finalized state",
);
self.queued_non_finalized_blocks
@ -447,6 +676,9 @@ impl StateService {
/// non-finalized state is empty.
///
/// [1]: non_finalized_state::Chain
//
// TODO: remove this clippy allow when we remove self.chain_tip_sender
#[allow(clippy::unwrap_in_result)]
#[instrument(level = "debug", skip(self))]
fn update_latest_chain_channels(&mut self) -> Option<block::Height> {
let best_chain = self.mem.best_chain();
@ -459,7 +691,10 @@ impl StateService {
// If the final receiver was just dropped, ignore the error.
let _ = self.non_finalized_state_sender.send(self.mem.clone());
self.chain_tip_sender.set_best_non_finalized_tip(tip_block);
self.chain_tip_sender
.lock()
.expect("unexpected panic in block commit task or state")
.set_best_non_finalized_tip(tip_block);
tip_block_height
}
@ -471,10 +706,10 @@ impl StateService {
self.check_contextual_validity(&prepared)?;
let parent_hash = prepared.block.header.previous_block_hash;
if self.disk.db().finalized_tip_hash() == parent_hash {
self.mem.commit_new_chain(prepared, self.disk.db())?;
if self.disk.db.finalized_tip_hash() == parent_hash {
self.mem.commit_new_chain(prepared, &self.disk.db)?;
} else {
self.mem.commit_block(prepared, self.disk.db())?;
self.mem.commit_block(prepared, &self.disk.db)?;
}
Ok(())
@ -482,7 +717,7 @@ impl StateService {
/// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
self.mem.any_chain_contains(hash) || &self.disk.db().finalized_tip_hash() == hash
self.mem.any_chain_contains(hash) || &self.read_service.db.finalized_tip_hash() == hash
}
/// Attempt to validate and commit all queued blocks whose parents have
@ -547,25 +782,25 @@ impl StateService {
check::block_is_valid_for_recent_chain(
prepared,
self.network,
self.disk.db().finalized_tip_height(),
self.disk.db.finalized_tip_height(),
relevant_chain,
)?;
check::nullifier::no_duplicates_in_finalized_chain(prepared, self.disk.db())?;
check::nullifier::no_duplicates_in_finalized_chain(prepared, &self.disk.db)?;
Ok(())
}
/// Return the tip of the current best chain.
pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
self.mem.best_tip().or_else(|| self.disk.db().tip())
self.mem.best_tip().or_else(|| self.read_service.db.tip())
}
/// Return the height for the block at `hash` in any chain.
pub fn any_height_by_hash(&self, hash: block::Hash) -> Option<block::Height> {
self.mem
.any_height_by_hash(hash)
.or_else(|| self.disk.db().height(hash))
.or_else(|| self.read_service.db.height(hash))
}
/// Return an iterator over the relevant chain of the block identified by
@ -593,18 +828,23 @@ impl StateService {
}
impl ReadStateService {
/// Creates a new read-only state service, using the provided finalized state.
/// Creates a new read-only state service, using the provided finalized state and
/// block write task handle.
///
/// Returns the newly created service,
/// and a watch channel for updating the shared recent non-finalized chain.
pub(crate) fn new(disk: &FinalizedState) -> (Self, watch::Sender<NonFinalizedState>) {
pub(crate) fn new(
finalized_state: &FinalizedState,
block_write_task: Arc<std::thread::JoinHandle<()>>,
) -> (Self, watch::Sender<NonFinalizedState>) {
let (non_finalized_state_sender, non_finalized_state_receiver) =
watch::channel(NonFinalizedState::new(disk.network()));
watch::channel(NonFinalizedState::new(finalized_state.network()));
let read_service = Self {
network: disk.network(),
db: disk.db().clone(),
network: finalized_state.network(),
db: finalized_state.db.clone(),
non_finalized_state_receiver: WatchReceiver::new(non_finalized_state_receiver),
block_write_task: Some(block_write_task),
};
tracing::info!("created new read-only state service");
@ -619,7 +859,11 @@ impl Service<Request> for StateService {
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Check for panics in the block write task
let poll = self.read_service.poll_ready(cx);
// Prune outdated UTXO requests
let now = Instant::now();
if self.last_prune + Self::PRUNE_INTERVAL < now {
@ -646,7 +890,7 @@ impl Service<Request> for StateService {
}
}
Poll::Ready(Ok(()))
poll
}
#[instrument(name = "state", skip(self, req))]
@ -679,6 +923,10 @@ impl Service<Request> for StateService {
span.in_scope(|| self.queue_and_commit_non_finalized(prepared))
});
// TODO:
// - check for panics in the block write task here,
// as well as in poll_ready()
// The work is all done, the future just waits on a channel for the result
timer.finish(module_path!(), line!(), "CommitBlock");
@ -700,7 +948,7 @@ impl Service<Request> for StateService {
}
// Uses queued_finalized_blocks and pending_utxos in the StateService.
// Accesses shared writeable state in the StateService and ZebraDb.
// Accesses shared writeable state in the StateService.
Request::CommitFinalizedBlock(finalized) => {
let timer = CodeTimer::start();
@ -716,14 +964,13 @@ impl Service<Request> for StateService {
// # Performance
//
// Allow other async tasks to make progress while blocks are being verified
// and written to disk.
//
// See the note in `CommitBlock` for more details.
let span = Span::current();
let rsp_rx = tokio::task::block_in_place(move || {
span.in_scope(|| self.queue_and_commit_finalized(finalized))
});
// This method doesn't block, access the database, or perform CPU-intensive tasks,
// so we can run it directly in the tokio executor's Future threads.
let rsp_rx = self.queue_and_commit_finalized(finalized);
// TODO:
// - check for panics in the block write task here,
// as well as in poll_ready()
// The work is all done, the future just waits on a channel for the result
timer.finish(module_path!(), line!(), "CommitFinalizedBlock");
@ -847,6 +1094,27 @@ impl Service<ReadRequest> for ReadStateService {
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Check for panics in the block write task
let block_write_task = self.block_write_task.take();
if let Some(block_write_task) = block_write_task {
if block_write_task.is_finished() {
match Arc::try_unwrap(block_write_task) {
// We are the last state with a reference to this task, so we can propagate any panics
Ok(block_write_task_handle) => {
if let Err(thread_panic) = block_write_task_handle.join() {
std::panic::resume_unwind(thread_panic);
}
}
// We're not the last state, so we need to put it back
Err(arc_block_write_task) => self.block_write_task = Some(arc_block_write_task),
}
} else {
// It hasn't finished, so we need to put it back
self.block_write_task = Some(block_write_task);
}
}
Poll::Ready(Ok(()))
}

View File

@ -1,6 +1,6 @@
//! Arbitrary data generation and test setup for Zebra's state.
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use futures::{stream::FuturesUnordered, StreamExt};
use proptest::{
@ -9,11 +9,12 @@ use proptest::{
strategy::{NewTree, ValueTree},
test_runner::TestRunner,
};
use tokio::time::timeout;
use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};
use zebra_chain::{
block::Block,
fmt::SummaryDebug,
fmt::{humantime_seconds, SummaryDebug},
history_tree::HistoryTree,
parameters::{Network, NetworkUpgrade},
LedgerState,
@ -27,6 +28,9 @@ use crate::{
pub use zebra_chain::block::arbitrary::MAX_PARTIAL_CHAIN_BLOCKS;
/// How long we wait for chain tip updates before skipping them.
pub const CHAIN_TIP_UPDATE_WAIT_LIMIT: Duration = Duration::from_secs(2);
#[derive(Debug)]
pub struct PreparedChainTree {
chain: Arc<SummaryDebug<Vec<PreparedBlock>>>,
@ -197,7 +201,7 @@ pub async fn populated_state(
.into_iter()
.map(|block| Request::CommitFinalizedBlock(block.into()));
let (state, read_state, latest_chain_tip, chain_tip_change) =
let (state, read_state, latest_chain_tip, mut chain_tip_change) =
StateService::new(Config::ephemeral(), network);
let mut state = Buffer::new(BoxService::new(state), 1);
@ -209,7 +213,24 @@ pub async fn populated_state(
}
while let Some(rsp) = responses.next().await {
rsp.expect("blocks should commit just fine");
// Wait for the block result and the chain tip update,
// which both happen in a separate thread from this one.
rsp.expect("unexpected block commit failure");
// Wait for the chain tip update
if let Err(timeout_error) = timeout(
CHAIN_TIP_UPDATE_WAIT_LIMIT,
chain_tip_change.wait_for_tip_change(),
)
.await
.map(|change_result| change_result.expect("unexpected chain tip update failure"))
{
info!(
timeout = ?humantime_seconds(CHAIN_TIP_UPDATE_WAIT_LIMIT),
?timeout_error,
"timeout waiting for chain tip change after committing block"
);
}
}
(state, read_state, latest_chain_tip, chain_tip_change)

View File

@ -49,7 +49,7 @@ impl Iter<'_> {
IterState::Finished => unreachable!(),
};
if let Some(block) = service.disk.db().block(hash_or_height) {
if let Some(block) = service.read_service.db.block(hash_or_height) {
let height = block
.coinbase_height()
.expect("valid blocks have a coinbase height");

View File

@ -17,7 +17,6 @@
use std::{
io::{stderr, stdout, Write},
path::Path,
sync::Arc,
};
@ -46,8 +45,11 @@ pub(super) use zebra_db::ZebraDb;
/// The finalized part of the chain state, stored in the db.
///
/// `rocksdb` allows concurrent writes through a shared reference,
/// so finalized state instances are cloneable. When the final clone is dropped,
/// the database is closed.
/// so clones of the finalized state represent the same database instance.
/// When the final clone is dropped, the database is closed.
///
/// This is different from `NonFinalizedState::clone()`,
/// which returns an independent copy of the chains.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct FinalizedState {
// Configuration
@ -72,7 +74,7 @@ pub struct FinalizedState {
/// `rocksdb` allows reads and writes via a shared reference,
/// so this database object can be freely cloned.
/// The last instance that is dropped will close the underlying database.
db: ZebraDb,
pub db: ZebraDb,
}
impl FinalizedState {
@ -134,29 +136,19 @@ impl FinalizedState {
self.network
}
/// Returns the `Path` where the files used by this database are located.
pub fn path(&self) -> &Path {
self.db.path()
}
/// Returns a reference to the inner database instance.
pub(crate) fn db(&self) -> &ZebraDb {
&self.db
}
/// Commit a finalized block to the state.
///
/// It's the caller's responsibility to ensure that blocks are committed in
/// order.
pub fn commit_finalized(
&mut self,
queued_block: QueuedFinalized,
) -> Result<FinalizedBlock, ()> {
let (finalized, rsp_tx) = queued_block;
ordered_block: QueuedFinalized,
) -> Result<FinalizedBlock, BoxError> {
let (finalized, rsp_tx) = ordered_block;
let result =
self.commit_finalized_direct(finalized.clone().into(), "CommitFinalized request");
let block_result = if result.is_ok() {
if result.is_ok() {
metrics::counter!("state.checkpoint.finalized.block.count", 1);
metrics::gauge!(
"state.checkpoint.finalized.block.height",
@ -171,21 +163,23 @@ impl FinalizedState {
finalized.height.0 as f64,
);
metrics::counter!("zcash.chain.verified.block.total", 1);
Ok(finalized)
} else {
metrics::counter!("state.checkpoint.error.block.count", 1);
metrics::gauge!(
"state.checkpoint.error.block.height",
finalized.height.0 as f64,
);
Err(())
};
let _ = rsp_tx.send(result.map_err(Into::into));
// Some io errors can't be cloned, so we format them instead.
let owned_result = result
.as_ref()
.map(|_hash| finalized)
.map_err(|error| format!("{:?}", error).into());
block_result
let _ = rsp_tx.send(result);
owned_result
}
/// Immediately commit a `finalized` block to the finalized state.

View File

@ -14,7 +14,7 @@ impl Deref for FinalizedState {
type Target = ZebraDb;
fn deref(&self) -> &Self::Target {
self.db()
&self.db
}
}

View File

@ -643,23 +643,49 @@ impl DiskDb {
/// It should only be used in debugging or test code, immediately before a manual shutdown.
///
/// TODO: make private after the stop height check has moved to the syncer (#3442)
/// move shutting down the database to a blocking thread (#2188),
/// and remove `force` and the manual flush
/// move shutting down the database to a blocking thread (#2188)
pub(crate) fn shutdown(&mut self, force: bool) {
// Prevent a race condition where another thread clones the Arc,
// right after we've checked we're the only holder of the Arc.
// # Correctness
//
// There is still a small race window after the guard is dropped,
// but if the race happens, it will only cause database errors during shutdown.
let clone_prevention_guard = Arc::get_mut(&mut self.db);
// If we're the only owner of the shared database instance,
// then there are no other threads that can increase the strong or weak count.
//
// ## Implementation Requirements
//
// This function and all functions that it calls should avoid cloning the shared database
// instance. If they do, they must drop it before:
// - shutting down database threads, or
// - deleting database files.
let shared_database_owners = Arc::strong_count(&self.db) + Arc::weak_count(&self.db);
if clone_prevention_guard.is_none() && !force {
debug!(
"dropping cloned DiskDb, \
but keeping shared database until the last reference is dropped",
);
if shared_database_owners > 1 {
let path = self.path();
return;
let mut ephemeral_note = "";
if force {
if self.ephemeral {
ephemeral_note = " and removing ephemeral files";
}
info!(
?path,
"forcing shutdown{} of a state database with multiple active instances",
ephemeral_note,
);
} else {
if self.ephemeral {
ephemeral_note = " and files";
}
debug!(
?path,
"dropping DiskDb clone, \
but keeping shared database instance{} until the last reference is dropped",
ephemeral_note,
);
return;
}
}
self.assert_default_cf_is_empty();
@ -670,17 +696,29 @@ impl DiskDb {
// - the database flushes regularly anyway
// - Zebra commits each block in a database transaction, any incomplete blocks get rolled back
// - ephemeral files are placed in the os temp dir and should be cleaned up automatically eventually
info!("flushing database to disk");
self.db.flush().expect("flush is successful");
let path = self.path();
info!(?path, "flushing database to disk");
self.db
.flush()
.expect("unexpected failure flushing SST data to disk");
self.db
.flush_wal(true)
.expect("unexpected failure flushing WAL data to disk");
// But we should call `cancel_all_background_work` before Zebra exits.
// If we don't, we see these kinds of errors:
// We'd like to call `cancel_all_background_work()` before Zebra exits,
// but when we call it, we get memory, thread, or C++ errors when the process exits.
// (This seems to be a bug in RocksDB: cancel_all_background_work() should wait until
// all the threads have cleaned up.)
//
// We see these kinds of errors:
// ```
// pthread lock: Invalid argument
// pure virtual method called
// terminate called without an active exception
// pthread destroy mutex: Device or resource busy
// Aborted (core dumped)
// signal: 6, SIGABRT: process abort signal
// signal: 11, SIGSEGV: invalid memory reference
// ```
//
// The RocksDB wiki says:
@ -690,8 +728,8 @@ impl DiskDb {
// > You can speed up the waiting by calling CancelAllBackgroundWork().
//
// https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ
info!("stopping background database tasks");
self.db.cancel_all_background_work(true);
//info!(?path, "stopping background database tasks");
//self.db.cancel_all_background_work(true);
// We'd like to drop the database before deleting its files,
// because that closes the column families and the database correctly.
@ -705,57 +743,52 @@ impl DiskDb {
//
// https://github.com/facebook/rocksdb/wiki/Known-Issues
//
// But our current code doesn't seem to cause any issues.
// We might want to explicitly drop the database as part of graceful shutdown (#1678).
self.delete_ephemeral(force);
// But this implementation doesn't seem to cause any issues,
// and the RocksDB Drop implementation handles any cleanup.
self.delete_ephemeral();
}
/// If the database is `ephemeral`, delete it.
///
/// If `force` is true, clean up regardless of any shared references.
/// `force` can cause errors accessing the database from other shared references.
/// It should only be used in debugging or test code, immediately before a manual shutdown.
fn delete_ephemeral(&mut self, force: bool) {
/// If the database is `ephemeral`, delete its files.
fn delete_ephemeral(&mut self) {
// # Correctness
//
// This function and all functions that it calls should avoid cloning the shared database
// instance. See `shutdown()` for details.
if !self.ephemeral {
return;
}
// Prevent a race condition where another thread clones the Arc,
// right after we've checked we're the only holder of the Arc.
//
// There is still a small race window after the guard is dropped,
// but if the race happens, it will only cause database errors during shutdown.
let clone_prevention_guard = Arc::get_mut(&mut self.db);
if clone_prevention_guard.is_none() && !force {
debug!(
"dropping cloned DiskDb, \
but keeping shared database files until the last reference is dropped",
);
return;
}
let path = self.path();
info!(cache_path = ?path, "removing temporary database files");
info!(?path, "removing temporary database files");
// We'd like to use `rocksdb::Env::mem_env` for ephemeral databases,
// but the Zcash blockchain might not fit in memory. So we just
// delete the database files instead.
//
// We'd like to call `DB::destroy` here, but calling destroy on a
// We'd also like to call `DB::destroy` here, but calling destroy on a
// live DB is undefined behaviour:
// https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ#basic-readwrite
//
// So we assume that all the database files are under `path`, and
// delete them using standard filesystem APIs. Deleting open files
// might cause errors on non-Unix platforms, so we ignore the result.
// (The OS will delete them eventually anyway.)
let res = std::fs::remove_dir_all(path);
// (The OS will delete them eventually anyway, if they are in a temporary directory.)
let result = std::fs::remove_dir_all(path);
// TODO: downgrade to debug once bugs like #2905 are fixed
// but leave any errors at "info" level
info!(?res, "removed temporary database files");
if result.is_err() {
info!(
?result,
?path,
"removing temporary database files caused an error",
);
} else {
debug!(
?result,
?path,
"successfully removed temporary database files",
);
}
}
/// Check that the "default" column family is empty.
@ -764,6 +797,11 @@ impl DiskDb {
///
/// If Zebra has a bug where it is storing data in the wrong column family.
fn assert_default_cf_is_empty(&self) {
// # Correctness
//
// This function and all functions that it calls should avoid cloning the shared database
// instance. See `shutdown()` for details.
if let Some(default_cf) = self.cf_handle("default") {
assert!(
self.zs_is_empty(&default_cf),
@ -775,6 +813,9 @@ impl DiskDb {
impl Drop for DiskDb {
fn drop(&mut self) {
let path = self.path();
debug!(?path, "dropping DiskDb instance");
self.shutdown(false);
}
}

View File

@ -30,7 +30,13 @@ mod tests;
pub(crate) use chain::Chain;
/// The state of the chains in memory, including queued blocks.
#[derive(Debug, Clone)]
///
/// Clones of the non-finalized state contain independent copies of the chains.
/// This is different from `FinalizedState::clone()`,
/// which returns a shared reference to the database.
///
/// Most chain data is clone-on-write using [`Arc`].
#[derive(Clone, Debug)]
pub struct NonFinalizedState {
/// Verified, non-finalized chains, in ascending order.
///

View File

@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::future::Future;
//! Pending UTXO tracker for [`AwaitUtxo` requests](crate::Request::AwaitUtxo).
use std::{collections::HashMap, future::Future};
use tokio::sync::broadcast;

View File

@ -77,6 +77,12 @@ impl QueuedBlocks {
self.update_metrics();
}
/// Returns `true` if there are any queued children of `parent_hash`.
#[instrument(skip(self), fields(%parent_hash))]
pub fn has_queued_children(&self, parent_hash: block::Hash) -> bool {
self.by_parent.contains_key(&parent_hash)
}
/// Dequeue and return all blocks that were waiting for the arrival of
/// `parent`.
#[instrument(skip(self), fields(%parent_hash))]

View File

@ -2,8 +2,9 @@
//!
//! In the functions in this module:
//!
//! The StateService commits blocks to the finalized state before updating
//! `chain` from the latest chain. Then it can commit additional blocks to
//! The block write task commits blocks to the finalized state before updating
//! `chain` with a cached copy of the best non-finalized chain from
//! `NonFinalizedState.chain_set`. Then the block commit task can commit additional blocks to
//! the finalized state after we've cloned the `chain`.
//!
//! This means that some blocks can be in both:

View File

@ -2,8 +2,9 @@
//!
//! In the functions in this module:
//!
//! The StateService commits blocks to the finalized state before updating
//! `chain` from the latest chain. Then it can commit additional blocks to
//! The block write task commits blocks to the finalized state before updating
//! `chain` with a cached copy of the best non-finalized chain from
//! `NonFinalizedState.chain_set`. Then the block commit task can commit additional blocks to
//! the finalized state after we've cloned the `chain`.
//!
//! This means that some blocks can be in both:

View File

@ -2,8 +2,9 @@
//!
//! In the functions in this module:
//!
//! The StateService commits blocks to the finalized state before updating
//! `chain` from the latest chain. Then it can commit additional blocks to
//! The block write task commits blocks to the finalized state before updating
//! `chain` with a cached copy of the best non-finalized chain from
//! `NonFinalizedState.chain_set`. Then the block commit task can commit additional blocks to
//! the finalized state after we've cloned the `chain`.
//!
//! This means that some blocks can be in both:

View File

@ -2,8 +2,9 @@
//!
//! In the functions in this module:
//!
//! The StateService commits blocks to the finalized state before updating
//! `chain` or `non_finalized_state` from the latest chains. Then it can
//! The block write task commits blocks to the finalized state before updating
//! `chain` or `non_finalized_state` with a cached copy of the non-finalized chains
//! in `NonFinalizedState.chain_set`. Then the block commit task can
//! commit additional blocks to the finalized state after we've cloned the
//! `chain` or `non_finalized_state`.
//!

View File

@ -2,8 +2,9 @@
//!
//! In the functions in this module:
//!
//! The StateService commits blocks to the finalized state before updating
//! `chain` from the latest chain. Then it can commit additional blocks to
//! The block write task commits blocks to the finalized state before updating
//! `chain` with a cached copy of the best non-finalized chain from
//! `NonFinalizedState.chain_set`. Then the block commit task can commit additional blocks to
//! the finalized state after we've cloned the `chain`.
//!
//! This means that some blocks can be in both:

View File

@ -2,8 +2,9 @@
//!
//! In the functions in this module:
//!
//! The StateService commits blocks to the finalized state before updating
//! `chain` from the latest chain. Then it can commit additional blocks to
//! The block write task commits blocks to the finalized state before updating
//! `chain` with a cached copy of the best non-finalized chain from
//! `NonFinalizedState.chain_set`. Then the block commit task can commit additional blocks to
//! the finalized state after we've cloned the `chain`.
//!
//! This means that some blocks can be in both:

View File

@ -2,7 +2,7 @@
//!
//! TODO: move these tests into tests::vectors and tests::prop modules.
use std::{env, sync::Arc};
use std::{env, sync::Arc, time::Duration};
use tower::{buffer::Buffer, util::BoxService};
@ -386,59 +386,6 @@ proptest! {
prop_assert_eq!(response, Ok(()));
}
/// Test that the best tip height is updated accordingly.
///
/// 1. Generate a finalized chain and some non-finalized blocks.
/// 2. Check that initially the best tip height is empty.
/// 3. Commit the finalized blocks and check that the best tip height is updated accordingly.
/// 4. Commit the non-finalized blocks and check that the best tip height is also updated
/// accordingly.
#[test]
fn chain_tip_sender_is_updated(
(network, finalized_blocks, non_finalized_blocks)
in continuous_empty_blocks_from_test_vectors(),
) {
let _init_guard = zebra_test::init();
let (mut state_service, _read_only_state_service, latest_chain_tip, mut chain_tip_change) = StateService::new(Config::ephemeral(), network);
prop_assert_eq!(latest_chain_tip.best_tip_height(), None);
prop_assert_eq!(chain_tip_change.last_tip_change(), None);
for block in finalized_blocks {
let expected_block = block.clone();
let expected_action = if expected_block.height <= block::Height(1) {
// 0: reset by both initialization and the Genesis network upgrade
// 1: reset by the BeforeOverwinter network upgrade
TipAction::reset_with(expected_block.clone().into())
} else {
TipAction::grow_with(expected_block.clone().into())
};
state_service.queue_and_commit_finalized(block);
prop_assert_eq!(latest_chain_tip.best_tip_height(), Some(expected_block.height));
prop_assert_eq!(chain_tip_change.last_tip_change(), Some(expected_action));
}
for block in non_finalized_blocks {
let expected_block = block.clone();
let expected_action = if expected_block.height == block::Height(1) {
// 1: reset by the BeforeOverwinter network upgrade
TipAction::reset_with(expected_block.clone().into())
} else {
TipAction::grow_with(expected_block.clone().into())
};
state_service.queue_and_commit_non_finalized(block);
prop_assert_eq!(latest_chain_tip.best_tip_height(), Some(expected_block.height));
prop_assert_eq!(chain_tip_change.last_tip_change(), Some(expected_action));
}
}
/// Test that the value pool is updated accordingly.
///
/// 1. Generate a finalized chain and some non-finalized blocks.
@ -476,7 +423,10 @@ proptest! {
expected_finalized_value_pool += *block_value_pool;
}
state_service.queue_and_commit_finalized(block.clone());
let result_receiver = state_service.queue_and_commit_finalized(block.clone());
let result = result_receiver.blocking_recv();
prop_assert!(result.is_ok(), "unexpected failed finalized block commit: {:?}", result);
prop_assert_eq!(
state_service.disk.finalized_value_pool(),
@ -499,7 +449,10 @@ proptest! {
let block_value_pool = &block.block.chain_value_pool_change(&transparent::utxos_from_ordered_utxos(utxos))?;
expected_non_finalized_value_pool += *block_value_pool;
state_service.queue_and_commit_non_finalized(block.clone());
let result_receiver = state_service.queue_and_commit_non_finalized(block.clone());
let result = result_receiver.blocking_recv();
prop_assert!(result.is_ok(), "unexpected failed non-finalized block commit: {:?}", result);
prop_assert_eq!(
state_service.mem.best_chain().unwrap().chain_value_pools,
@ -518,6 +471,80 @@ proptest! {
}
}
// This test sleeps for every block, so we only ever want to run it once
proptest! {
#![proptest_config(
proptest::test_runner::Config::with_cases(1)
)]
/// Test that the best tip height is updated accordingly.
///
/// 1. Generate a finalized chain and some non-finalized blocks.
/// 2. Check that initially the best tip height is empty.
/// 3. Commit the finalized blocks and check that the best tip height is updated accordingly.
/// 4. Commit the non-finalized blocks and check that the best tip height is also updated
/// accordingly.
#[test]
fn chain_tip_sender_is_updated(
(network, finalized_blocks, non_finalized_blocks)
in continuous_empty_blocks_from_test_vectors(),
) {
let _init_guard = zebra_test::init();
let (mut state_service, _read_only_state_service, latest_chain_tip, mut chain_tip_change) = StateService::new(Config::ephemeral(), network);
prop_assert_eq!(latest_chain_tip.best_tip_height(), None);
prop_assert_eq!(chain_tip_change.last_tip_change(), None);
for block in finalized_blocks {
let expected_block = block.clone();
let expected_action = if expected_block.height <= block::Height(1) {
// 0: reset by both initialization and the Genesis network upgrade
// 1: reset by the BeforeOverwinter network upgrade
TipAction::reset_with(expected_block.clone().into())
} else {
TipAction::grow_with(expected_block.clone().into())
};
let result_receiver = state_service.queue_and_commit_finalized(block);
let result = result_receiver.blocking_recv();
prop_assert!(result.is_ok(), "unexpected failed finalized block commit: {:?}", result);
// Wait for the channels to be updated by the block commit task.
// TODO: add a blocking method on ChainTipChange
std::thread::sleep(Duration::from_secs(1));
prop_assert_eq!(latest_chain_tip.best_tip_height(), Some(expected_block.height));
prop_assert_eq!(chain_tip_change.last_tip_change(), Some(expected_action));
}
for block in non_finalized_blocks {
let expected_block = block.clone();
let expected_action = if expected_block.height == block::Height(1) {
// 1: reset by the BeforeOverwinter network upgrade
TipAction::reset_with(expected_block.clone().into())
} else {
TipAction::grow_with(expected_block.clone().into())
};
let result_receiver = state_service.queue_and_commit_non_finalized(block);
let result = result_receiver.blocking_recv();
prop_assert!(result.is_ok(), "unexpected failed non-finalized block commit: {:?}", result);
// Wait for the channels to be updated by the block commit task.
// TODO: add a blocking method on ChainTipChange
std::thread::sleep(Duration::from_secs(1));
prop_assert_eq!(latest_chain_tip.best_tip_height(), Some(expected_block.height));
prop_assert_eq!(chain_tip_change.last_tip_change(), Some(expected_action));
}
}
}
/// Test strategy to generate a chain split in two from the test vectors.
///
/// Selects either the mainnet or testnet chain test vector and randomly splits the chain in two

View File

@ -0,0 +1,138 @@
//! Writing blocks to the finalized and non-finalized states.
use std::sync::{Arc, Mutex};
use zebra_chain::block::{self, Height};
use crate::service::{
finalized_state::FinalizedState,
queued_blocks::{QueuedFinalized, QueuedNonFinalized},
ChainTipBlock, ChainTipSender,
};
/// Reads blocks from the channels, writes them to the `finalized_state`,
/// and updates the `chain_tip_sender`.
///
/// TODO: pass the non-finalized state and associated update channel to this function
#[instrument(skip(
finalized_block_write_receiver,
non_finalized_block_write_receiver,
invalid_block_reset_sender,
chain_tip_sender
))]
pub fn write_blocks_from_channels(
mut finalized_block_write_receiver: tokio::sync::mpsc::UnboundedReceiver<QueuedFinalized>,
mut non_finalized_block_write_receiver: tokio::sync::mpsc::UnboundedReceiver<
QueuedNonFinalized,
>,
mut finalized_state: FinalizedState,
invalid_block_reset_sender: tokio::sync::mpsc::UnboundedSender<block::Hash>,
chain_tip_sender: Arc<Mutex<ChainTipSender>>,
) {
// Write all the finalized blocks sent by the state,
// until the state closes the finalized block channel's sender.
while let Some(ordered_block) = finalized_block_write_receiver.blocking_recv() {
// TODO: split these checks into separate functions
if invalid_block_reset_sender.is_closed() {
info!("StateService closed the block reset channel. Is Zebra shutting down?");
return;
}
// Discard any children of invalid blocks in the channel
//
// `commit_finalized()` requires blocks in height order.
// So if there has been a block commit error,
// we need to drop all the descendants of that block,
// until we receive a block at the required next height.
let next_valid_height = finalized_state
.db
.finalized_tip_height()
.map(|height| (height + 1).expect("committed heights are valid"))
.unwrap_or(Height(0));
if ordered_block.0.height != next_valid_height {
debug!(
?next_valid_height,
invalid_height = ?ordered_block.0.height,
invalid_hash = ?ordered_block.0.hash,
"got a block that was the wrong height. \
Assuming a parent block failed, and dropping this block",
);
// We don't want to send a reset here, because it could overwrite a valid sent hash
std::mem::drop(ordered_block);
continue;
}
// Try committing the block
match finalized_state.commit_finalized(ordered_block) {
Ok(finalized) => {
let tip_block = ChainTipBlock::from(finalized);
// TODO: update the chain tip sender with non-finalized blocks in this function,
// and get rid of the mutex
chain_tip_sender
.lock()
.expect("unexpected panic in block commit task or state")
.set_finalized_tip(tip_block);
}
Err(error) => {
let finalized_tip = finalized_state.db.tip();
// The last block in the queue failed, so we can't commit the next block.
// Instead, we need to reset the state queue,
// and discard any children of the invalid block in the channel.
info!(
?error,
last_valid_height = ?finalized_tip.map(|tip| tip.0),
last_valid_hash = ?finalized_tip.map(|tip| tip.1),
"committing a block to the finalized state failed, resetting state queue",
);
let send_result =
invalid_block_reset_sender.send(finalized_state.db.finalized_tip_hash());
if send_result.is_err() {
info!("StateService closed the block reset channel. Is Zebra shutting down?");
return;
}
}
}
}
// Do this check even if the channel got closed before any finalized blocks were sent.
// This can happen if we're past the finalized tip.
if invalid_block_reset_sender.is_closed() {
info!("StateService closed the block reset channel. Is Zebra shutting down?");
return;
}
// Write all the finalized blocks sent by the state, until Zebra shuts down.
while let Some(_block) = non_finalized_block_write_receiver.blocking_recv() {
if invalid_block_reset_sender.is_closed() {
info!("StateService closed the block reset channel. Is Zebra shutting down?");
return;
}
// TODO:
// - read from the channel
// - commit blocks to the non-finalized state
// - if there are any ready, commit blocks to the finalized state
// - handle errors by sending a reset with all the block hashes in the non-finalized state, and the finalized tip
// - update the chain tip sender and cached non-finalized state
error!("handle non-finalized block writes here");
}
// We're finished receiving non-finalized blocks from the state.
//
// TODO:
// - make the task an object, and do this in the drop impl?
// - does the drop order matter here?
non_finalized_block_write_receiver.close();
std::mem::drop(non_finalized_block_write_receiver);
// We're done writing to the finalized state, so we can force it to shut down.
finalized_state.db.shutdown(true);
std::mem::drop(finalized_state);
}

View File

@ -10,13 +10,14 @@ use std::{
};
use futures::FutureExt;
use tokio::{sync::oneshot, task::JoinHandle};
use tokio::{sync::oneshot, task::JoinHandle, time::timeout};
use tower::{buffer::Buffer, builder::ServiceBuilder, util::BoxService, Service, ServiceExt};
use tracing::Span;
use zebra_chain::{
amount::Amount,
block::Block,
fmt::humantime_seconds,
parameters::Network::{self, *},
serialization::ZcashDeserializeInto,
transaction::{UnminedTx, UnminedTxId, VerifiedUnminedTx},
@ -24,7 +25,7 @@ use zebra_chain::{
use zebra_consensus::{error::TransactionError, transaction, Config as ConsensusConfig};
use zebra_network::{AddressBook, InventoryResponse, Request, Response};
use zebra_node_services::mempool;
use zebra_state::Config as StateConfig;
use zebra_state::{ChainTipChange, Config as StateConfig, CHAIN_TIP_UPDATE_WAIT_LIMIT};
use zebra_test::mock_service::{MockService, PanicAssertion};
use crate::{
@ -59,6 +60,7 @@ async fn mempool_requests_for_transactions() {
_mock_tx_verifier,
mut peer_set,
_state_guard,
_chain_tip_change,
sync_gossip_task_handle,
tx_gossip_task_handle,
) = setup(true).await;
@ -142,6 +144,7 @@ async fn mempool_push_transaction() -> Result<(), crate::BoxError> {
mut tx_verifier,
mut peer_set,
_state_guard,
_chain_tip_change,
sync_gossip_task_handle,
tx_gossip_task_handle,
) = setup(false).await;
@ -236,6 +239,7 @@ async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> {
mut tx_verifier,
mut peer_set,
_state_guard,
_chain_tip_change,
sync_gossip_task_handle,
tx_gossip_task_handle,
) = setup(false).await;
@ -342,6 +346,7 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> {
mut tx_verifier,
mut peer_set,
state_service,
_chain_tip_change,
sync_gossip_task_handle,
tx_gossip_task_handle,
) = setup(false).await;
@ -638,6 +643,7 @@ async fn inbound_block_height_lookahead_limit() -> Result<(), crate::BoxError> {
mut tx_verifier,
mut peer_set,
state_service,
mut chain_tip_change,
sync_gossip_task_handle,
tx_gossip_task_handle,
) = setup(false).await;
@ -658,7 +664,20 @@ async fn inbound_block_height_lookahead_limit() -> Result<(), crate::BoxError> {
.await
.respond(Response::Blocks(vec![Available(block)]));
// TODO: check that the block is queued in the checkpoint verifier
// Wait for the chain tip update
if let Err(timeout_error) = timeout(
CHAIN_TIP_UPDATE_WAIT_LIMIT,
chain_tip_change.wait_for_tip_change(),
)
.await
.map(|change_result| change_result.expect("unexpected chain tip update failure"))
{
info!(
timeout = ?humantime_seconds(CHAIN_TIP_UPDATE_WAIT_LIMIT),
?timeout_error,
"timeout waiting for chain tip change after committing block"
);
}
// check that nothing unexpected happened
peer_set.expect_no_requests().await;
@ -729,6 +748,7 @@ async fn setup(
MockService<transaction::Request, transaction::Response, PanicAssertion, TransactionError>,
MockService<Request, Response, PanicAssertion>,
Buffer<BoxService<zebra_state::Request, zebra_state::Response, BoxError>, zebra_state::Request>,
ChainTipChange,
JoinHandle<Result<(), BlockGossipError>>,
JoinHandle<Result<(), BoxError>>,
) {
@ -744,7 +764,7 @@ async fn setup(
);
let address_book = Arc::new(std::sync::Mutex::new(address_book));
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, _read_only_state_service, latest_chain_tip, chain_tip_change) =
let (state, _read_only_state_service, latest_chain_tip, mut chain_tip_change) =
zebra_state::init(state_config.clone(), network);
let mut state_service = ServiceBuilder::new().buffer(1).service(state);
@ -786,6 +806,21 @@ async fn setup(
.unwrap();
committed_blocks.push(genesis_block);
// Wait for the chain tip update
if let Err(timeout_error) = timeout(
CHAIN_TIP_UPDATE_WAIT_LIMIT,
chain_tip_change.wait_for_tip_change(),
)
.await
.map(|change_result| change_result.expect("unexpected chain tip update failure"))
{
info!(
timeout = ?humantime_seconds(CHAIN_TIP_UPDATE_WAIT_LIMIT),
?timeout_error,
"timeout waiting for chain tip change after committing block"
);
}
// Also push block 1.
// Block one is a network upgrade and the mempool will be cleared at it,
// let all our tests start after this event.
@ -801,6 +836,8 @@ async fn setup(
.unwrap();
committed_blocks.push(block_one);
// Don't wait for the chain tip update here, we wait for AdvertiseBlock below
let (mut mempool_service, transaction_receiver) = Mempool::new(
&MempoolConfig::default(),
buffered_peer_set.clone(),
@ -845,7 +882,7 @@ async fn setup(
let sync_gossip_task_handle = tokio::spawn(sync::gossip_best_tip_block_hashes(
sync_status.clone(),
chain_tip_change,
chain_tip_change.clone(),
peer_set.clone(),
));
@ -873,6 +910,7 @@ async fn setup(
mock_tx_verifier,
peer_set,
state_service,
chain_tip_change,
sync_gossip_task_handle,
tx_gossip_task_handle,
)

View File

@ -3,20 +3,23 @@
use std::{collections::HashSet, sync::Arc};
use color_eyre::Report;
use tokio::time;
use tokio::time::{self, timeout};
use tower::{ServiceBuilder, ServiceExt};
use zebra_chain::{block::Block, parameters::Network, serialization::ZcashDeserializeInto};
use zebra_chain::{
block::Block, fmt::humantime_seconds, parameters::Network, serialization::ZcashDeserializeInto,
};
use zebra_consensus::transaction as tx;
use zebra_state::Config as StateConfig;
use zebra_state::{Config as StateConfig, CHAIN_TIP_UPDATE_WAIT_LIMIT};
use zebra_test::mock_service::{MockService, PanicAssertion};
use super::UnboxMempoolError;
use crate::components::{
mempool::{self, storage::tests::unmined_transactions_in_blocks, *},
sync::RecentSyncLengths,
};
use super::UnboxMempoolError;
/// A [`MockService`] representing the network service.
type MockPeerSet = MockService<zn::Request, zn::Response, PanicAssertion>;
@ -51,7 +54,7 @@ async fn mempool_service_basic_single() -> Result<(), Report> {
// inserted except one (the genesis block transaction).
let cost_limit = more_transactions.iter().map(|tx| tx.cost()).sum();
let (mut service, _peer_set, _state_service, _tx_verifier, mut recent_syncs) =
let (mut service, _peer_set, _state_service, _chain_tip_change, _tx_verifier, mut recent_syncs) =
setup(network, cost_limit).await;
// Enable the mempool
@ -198,7 +201,7 @@ async fn mempool_queue_single() -> Result<(), Report> {
.map(|tx| tx.cost())
.sum();
let (mut service, _peer_set, _state_service, _tx_verifier, mut recent_syncs) =
let (mut service, _peer_set, _state_service, _chain_tip_change, _tx_verifier, mut recent_syncs) =
setup(network, cost_limit).await;
// Enable the mempool
@ -272,7 +275,7 @@ async fn mempool_service_disabled() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
let (mut service, _peer_set, _state_service, _tx_verifier, mut recent_syncs) =
let (mut service, _peer_set, _state_service, _chain_tip_change, _tx_verifier, mut recent_syncs) =
setup(network, u64::MAX).await;
// get the genesis block transactions from the Zcash blockchain.
@ -387,8 +390,14 @@ async fn mempool_cancel_mined() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
let (mut mempool, _peer_set, mut state_service, _tx_verifier, mut recent_syncs) =
setup(network, u64::MAX).await;
let (
mut mempool,
_peer_set,
mut state_service,
_chain_tip_change,
_tx_verifier,
mut recent_syncs,
) = setup(network, u64::MAX).await;
// Enable the mempool
mempool.enable(&mut recent_syncs).await;
@ -480,8 +489,14 @@ async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report>
// Using the mainnet for now
let network = Network::Mainnet;
let (mut mempool, _peer_set, mut state_service, _tx_verifier, mut recent_syncs) =
setup(network, u64::MAX).await;
let (
mut mempool,
_peer_set,
mut state_service,
mut chain_tip_change,
_tx_verifier,
mut recent_syncs,
) = setup(network, u64::MAX).await;
// Enable the mempool
mempool.enable(&mut recent_syncs).await;
@ -501,6 +516,21 @@ async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report>
.await
.unwrap();
// Wait for the chain tip update
if let Err(timeout_error) = timeout(
CHAIN_TIP_UPDATE_WAIT_LIMIT,
chain_tip_change.wait_for_tip_change(),
)
.await
.map(|change_result| change_result.expect("unexpected chain tip update failure"))
{
info!(
timeout = ?humantime_seconds(CHAIN_TIP_UPDATE_WAIT_LIMIT),
?timeout_error,
"timeout waiting for chain tip change after committing block"
);
}
// Queue transaction from block 2 for download
let txid = block2.transactions[0].unmined_id();
let response = mempool
@ -533,6 +563,21 @@ async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report>
.await
.unwrap();
// Wait for the chain tip update
if let Err(timeout_error) = timeout(
CHAIN_TIP_UPDATE_WAIT_LIMIT,
chain_tip_change.wait_for_tip_change(),
)
.await
.map(|change_result| change_result.expect("unexpected chain tip update failure"))
{
info!(
timeout = ?humantime_seconds(CHAIN_TIP_UPDATE_WAIT_LIMIT),
?timeout_error,
"timeout waiting for chain tip change after committing block"
);
}
// Query the mempool to make it poll chain_tip_change
mempool.dummy_call().await;
@ -548,8 +593,14 @@ async fn mempool_failed_verification_is_rejected() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
let (mut mempool, _peer_set, _state_service, mut tx_verifier, mut recent_syncs) =
setup(network, u64::MAX).await;
let (
mut mempool,
_peer_set,
_state_service,
_chain_tip_change,
mut tx_verifier,
mut recent_syncs,
) = setup(network, u64::MAX).await;
// Get transactions to use in the test
let mut unmined_transactions = unmined_transactions_in_blocks(1..=2, network);
@ -617,8 +668,14 @@ async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
let (mut mempool, mut peer_set, _state_service, _tx_verifier, mut recent_syncs) =
setup(network, u64::MAX).await;
let (
mut mempool,
mut peer_set,
_state_service,
_chain_tip_change,
_tx_verifier,
mut recent_syncs,
) = setup(network, u64::MAX).await;
// Get transactions to use in the test
let mut unmined_transactions = unmined_transactions_in_blocks(1..=2, network);
@ -688,6 +745,7 @@ async fn setup(
Mempool,
MockPeerSet,
StateService,
ChainTipChange,
MockTxVerifier,
RecentSyncLengths,
) {
@ -712,8 +770,15 @@ async fn setup(
Buffer::new(BoxService::new(tx_verifier.clone()), 1),
sync_status,
latest_chain_tip,
chain_tip_change,
chain_tip_change.clone(),
);
(mempool, peer_set, state_service, tx_verifier, recent_syncs)
(
mempool,
peer_set,
state_service,
chain_tip_change,
tx_verifier,
recent_syncs,
)
}

View File

@ -176,8 +176,13 @@ pub async fn run() -> Result<()> {
assert_eq!(response, expected_response);
}
tracing::info!("waiting for mempool to verify some transactions...");
zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?;
// The timing of verification logs are unrealiable, so we've disabled this check for now.
//
// TODO: when lightwalletd starts returning transactions again:
// re-enable this check, find a better way to check, or delete this commented-out check
//
//tracing::info!("waiting for mempool to verify some transactions...");
//zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?;
tracing::info!("calling GetMempoolTx gRPC to fetch transactions...");
let mut transactions_stream = rpc_client