5. refactor(state): split the state service into modules (#3778)

* Move the legacy chain check to the `check` module

And move `populated_state` to the `arbitrary` module.

* Cleanup imports

* Document the state service struct

* Split state block iter into its own module
This commit is contained in:
teor 2022-03-11 06:40:48 +10:00 committed by GitHub
parent b6a0fcc44c
commit 86b3315d8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 222 additions and 192 deletions

View File

@ -40,8 +40,9 @@ pub use service::{
#[cfg(any(test, feature = "proptest-impl"))]
pub use service::{
arbitrary::populated_state,
chain_tip::{ChainTipBlock, ChainTipSender},
init_test, populated_state,
init_test,
};
pub(crate) use request::ContextuallyValidBlock;

View File

@ -7,9 +7,9 @@ use std::{
time::{Duration, Instant},
};
use futures::{future::FutureExt, stream::FuturesUnordered};
use futures::future::FutureExt;
use tokio::sync::oneshot;
use tower::{util::BoxService, Service, ServiceExt};
use tower::{util::BoxService, Service};
use tracing::instrument;
#[cfg(any(test, feature = "proptest-impl"))]
@ -24,18 +24,23 @@ use zebra_chain::{
};
use crate::{
constants, request::HashOrHeight, service::chain_tip::ChainTipBlock, BoxError, CloneError,
request::HashOrHeight, service::chain_tip::ChainTipBlock, BoxError, CloneError,
CommitBlockError, Config, FinalizedBlock, PreparedBlock, Request, Response,
ValidateContextError,
};
use self::{
chain_tip::{ChainTipChange, ChainTipSender, LatestChainTip},
finalized_state::FinalizedState,
non_finalized_state::{NonFinalizedState, QueuedBlocks},
pending_utxos::PendingUtxos,
};
pub mod block_iter;
pub mod chain_tip;
pub(crate) mod check;
mod finalized_state;
mod non_finalized_state;
mod pending_utxos;
@ -46,8 +51,6 @@ pub mod arbitrary;
#[cfg(test)]
mod tests;
use self::{finalized_state::FinalizedState, pending_utxos::PendingUtxos};
pub type QueuedBlock = (
PreparedBlock,
oneshot::Sender<Result<block::Hash, BoxError>>,
@ -57,6 +60,15 @@ pub type QueuedFinalized = (
oneshot::Sender<Result<block::Hash, BoxError>>,
);
/// A read-write service for Zebra's cached blockchain state.
///
/// This service modifies and provides access to:
/// - the non-finalized state: the ~100 most recent blocks.
/// Zebra allows chain forks in the non-finalized state,
/// stores it in memory, and re-downloads it when restarted.
/// - the finalized state: older blocks that have many confirmations.
/// Zebra stores the single best chain in the finalized state,
/// and re-loads it from disk when restarted.
pub(crate) struct StateService {
/// Holds data relating to finalized chain state.
pub(crate) disk: FinalizedState,
@ -103,7 +115,7 @@ impl StateService {
tracing::info!("starting legacy chain check");
if let Some(tip) = state.best_tip() {
if let Some(nu5_activation_height) = NetworkUpgrade::Nu5.activation_height(network) {
if legacy_chain_check(
if check::legacy_chain(
nu5_activation_height,
state.any_ancestor_blocks(tip.1),
state.network,
@ -401,10 +413,10 @@ impl StateService {
///
/// The block identified by `hash` is included in the chain of blocks yielded
/// by the iterator. `hash` can come from any chain.
pub fn any_ancestor_blocks(&self, hash: block::Hash) -> Iter<'_> {
Iter {
pub fn any_ancestor_blocks(&self, hash: block::Hash) -> block_iter::Iter<'_> {
block_iter::Iter {
service: self,
state: IterState::NonFinalized(hash),
state: block_iter::IterState::NonFinalized(hash),
}
}
@ -569,98 +581,6 @@ impl StateService {
}
}
pub(crate) struct Iter<'a> {
service: &'a StateService,
state: IterState,
}
enum IterState {
NonFinalized(block::Hash),
Finalized(block::Height),
Finished,
}
impl Iter<'_> {
fn next_non_finalized_block(&mut self) -> Option<Arc<Block>> {
let Iter { service, state } = self;
let hash = match state {
IterState::NonFinalized(hash) => *hash,
IterState::Finalized(_) | IterState::Finished => unreachable!(),
};
if let Some(block) = service.mem.any_block_by_hash(hash) {
let hash = block.header.previous_block_hash;
self.state = IterState::NonFinalized(hash);
Some(block)
} else {
None
}
}
fn next_finalized_block(&mut self) -> Option<Arc<Block>> {
let Iter { service, state } = self;
let hash_or_height: HashOrHeight = match *state {
IterState::Finalized(height) => height.into(),
IterState::NonFinalized(hash) => hash.into(),
IterState::Finished => unreachable!(),
};
if let Some(block) = service.disk.block(hash_or_height) {
let height = block
.coinbase_height()
.expect("valid blocks have a coinbase height");
if let Some(next_height) = height - 1 {
self.state = IterState::Finalized(next_height);
} else {
self.state = IterState::Finished;
}
Some(block)
} else {
self.state = IterState::Finished;
None
}
}
}
impl Iterator for Iter<'_> {
type Item = Arc<Block>;
fn next(&mut self) -> Option<Self::Item> {
match self.state {
IterState::NonFinalized(_) => self
.next_non_finalized_block()
.or_else(|| self.next_finalized_block()),
IterState::Finalized(_) => self.next_finalized_block(),
IterState::Finished => None,
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.len();
(len, Some(len))
}
}
impl std::iter::FusedIterator for Iter<'_> {}
impl ExactSizeIterator for Iter<'_> {
fn len(&self) -> usize {
match self.state {
IterState::NonFinalized(hash) => self
.service
.any_height_by_hash(hash)
.map(|height| (height.0 + 1) as _)
.unwrap_or(0),
IterState::Finalized(height) => (height.0 + 1) as _,
IterState::Finished => 0,
}
}
}
impl Service<Request> for StateService {
type Response = Response;
type Error = BoxError;
@ -842,81 +762,3 @@ pub fn init_test(network: Network) -> Buffer<BoxService<Request, Response, BoxEr
Buffer::new(BoxService::new(state_service), 1)
}
/// Initialize a state service with blocks.
#[cfg(any(test, feature = "proptest-impl"))]
pub async fn populated_state(
blocks: impl IntoIterator<Item = Arc<Block>>,
network: Network,
) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
let requests = blocks
.into_iter()
.map(|block| Request::CommitFinalizedBlock(block.into()));
let mut state = init_test(network);
let mut responses = FuturesUnordered::new();
for request in requests {
let rsp = state.ready().await.unwrap().call(request);
responses.push(rsp);
}
use futures::StreamExt;
while let Some(rsp) = responses.next().await {
rsp.expect("blocks should commit just fine");
}
state
}
/// Check if zebra is following a legacy chain and return an error if so.
fn legacy_chain_check<I>(
nu5_activation_height: block::Height,
ancestors: I,
network: Network,
) -> Result<(), BoxError>
where
I: Iterator<Item = Arc<Block>>,
{
for (count, block) in ancestors.enumerate() {
// Stop checking if the chain reaches Canopy. We won't find any more V5 transactions,
// so the rest of our checks are useless.
//
// If the cached tip is close to NU5 activation, but there aren't any V5 transactions in the
// chain yet, we could reach MAX_BLOCKS_TO_CHECK in Canopy, and incorrectly return an error.
if block
.coinbase_height()
.expect("valid blocks have coinbase heights")
< nu5_activation_height
{
return Ok(());
}
// If we are past our NU5 activation height, but there are no V5 transactions in recent blocks,
// the Zebra instance that verified those blocks had no NU5 activation height.
if count >= constants::MAX_LEGACY_CHAIN_BLOCKS {
return Err("giving up after checking too many blocks".into());
}
// If a transaction `network_upgrade` field is different from the network upgrade calculated
// using our activation heights, the Zebra instance that verified those blocks had different
// network upgrade heights.
block
.check_transaction_network_upgrade_consistency(network)
.map_err(|_| "inconsistent network upgrade found in transaction")?;
// If we find at least one transaction with a valid `network_upgrade` field, the Zebra instance that
// verified those blocks used the same network upgrade heights. (Up to this point in the chain.)
let has_network_upgrade = block
.transactions
.iter()
.find_map(|trans| trans.network_upgrade())
.is_some();
if has_network_upgrade {
return Ok(());
}
}
Ok(())
}

View File

@ -1,20 +1,27 @@
//! Arbitrary data generation and test setup for Zebra's state.
use std::sync::Arc;
use futures::{stream::FuturesUnordered, StreamExt};
use proptest::{
num::usize::BinarySearch,
prelude::*,
strategy::{NewTree, ValueTree},
test_runner::TestRunner,
};
use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};
use zebra_chain::{
block::Block, fmt::SummaryDebug, history_tree::HistoryTree, parameters::NetworkUpgrade,
block::Block,
fmt::SummaryDebug,
history_tree::HistoryTree,
parameters::{Network, NetworkUpgrade},
LedgerState,
};
use crate::arbitrary::Prepare;
use super::*;
use crate::{
arbitrary::Prepare, init_test, service::check, BoxError, PreparedBlock, Request, Response,
};
pub use zebra_chain::block::arbitrary::MAX_PARTIAL_CHAIN_BLOCKS;
@ -158,3 +165,28 @@ impl Strategy for PreparedChain {
})
}
}
/// Initialize a state service with blocks.
pub async fn populated_state(
blocks: impl IntoIterator<Item = Arc<Block>>,
network: Network,
) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
let requests = blocks
.into_iter()
.map(|block| Request::CommitFinalizedBlock(block.into()));
let mut state = init_test(network);
let mut responses = FuturesUnordered::new();
for request in requests {
let rsp = state.ready().await.unwrap().call(request);
responses.push(rsp);
}
while let Some(rsp) = responses.next().await {
rsp.expect("blocks should commit just fine");
}
state
}

View File

@ -0,0 +1,103 @@
//! Iterators for blocks in the non-finalized and finalized state.
use std::sync::Arc;
use zebra_chain::block::{self, Block};
use crate::{service::StateService, HashOrHeight};
/// Iterator for state blocks.
///
/// Starts at any block in any non-finalized or finalized chain,
/// and iterates in reverse height order. (Towards the genesis block.)
pub(crate) struct Iter<'a> {
pub(super) service: &'a StateService,
pub(super) state: IterState,
}
pub(super) enum IterState {
NonFinalized(block::Hash),
Finalized(block::Height),
Finished,
}
impl Iter<'_> {
fn next_non_finalized_block(&mut self) -> Option<Arc<Block>> {
let Iter { service, state } = self;
let hash = match state {
IterState::NonFinalized(hash) => *hash,
IterState::Finalized(_) | IterState::Finished => unreachable!(),
};
if let Some(block) = service.mem.any_block_by_hash(hash) {
let hash = block.header.previous_block_hash;
self.state = IterState::NonFinalized(hash);
Some(block)
} else {
None
}
}
fn next_finalized_block(&mut self) -> Option<Arc<Block>> {
let Iter { service, state } = self;
let hash_or_height: HashOrHeight = match *state {
IterState::Finalized(height) => height.into(),
IterState::NonFinalized(hash) => hash.into(),
IterState::Finished => unreachable!(),
};
if let Some(block) = service.disk.block(hash_or_height) {
let height = block
.coinbase_height()
.expect("valid blocks have a coinbase height");
if let Some(next_height) = height - 1 {
self.state = IterState::Finalized(next_height);
} else {
self.state = IterState::Finished;
}
Some(block)
} else {
self.state = IterState::Finished;
None
}
}
}
impl Iterator for Iter<'_> {
type Item = Arc<Block>;
fn next(&mut self) -> Option<Self::Item> {
match self.state {
IterState::NonFinalized(_) => self
.next_non_finalized_block()
.or_else(|| self.next_finalized_block()),
IterState::Finalized(_) => self.next_finalized_block(),
IterState::Finished => None,
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.len();
(len, Some(len))
}
}
impl std::iter::FusedIterator for Iter<'_> {}
impl ExactSizeIterator for Iter<'_> {
fn len(&self) -> usize {
match self.state {
IterState::NonFinalized(hash) => self
.service
.any_height_by_hash(hash)
.map(|height| (height.0 + 1) as _)
.unwrap_or(0),
IterState::Finalized(height) => (height.0 + 1) as _,
IterState::Finished => 0,
}
}
}

View File

@ -12,12 +12,11 @@ use zebra_chain::{
work::difficulty::CompactDifficulty,
};
use crate::{FinalizedBlock, PreparedBlock, ValidateContextError};
use crate::{constants, BoxError, FinalizedBlock, PreparedBlock, ValidateContextError};
// use self as check
use super::check;
use difficulty::{AdjustedDifficulty, POW_MEDIAN_BLOCK_SPAN};
pub(crate) mod anchors;
pub(crate) mod difficulty;
pub(crate) mod nullifier;
@ -26,6 +25,8 @@ pub(crate) mod utxo;
#[cfg(test)]
mod tests;
use difficulty::{AdjustedDifficulty, POW_MEDIAN_BLOCK_SPAN};
/// Check that the `prepared` block is contextually valid for `network`, based
/// on the `finalized_tip_height` and `relevant_chain`.
///
@ -305,3 +306,54 @@ fn difficulty_threshold_is_valid(
Ok(())
}
/// Check if zebra is following a legacy chain and return an error if so.
pub(crate) fn legacy_chain<I>(
nu5_activation_height: block::Height,
ancestors: I,
network: Network,
) -> Result<(), BoxError>
where
I: Iterator<Item = Arc<Block>>,
{
for (count, block) in ancestors.enumerate() {
// Stop checking if the chain reaches Canopy. We won't find any more V5 transactions,
// so the rest of our checks are useless.
//
// If the cached tip is close to NU5 activation, but there aren't any V5 transactions in the
// chain yet, we could reach MAX_BLOCKS_TO_CHECK in Canopy, and incorrectly return an error.
if block
.coinbase_height()
.expect("valid blocks have coinbase heights")
< nu5_activation_height
{
return Ok(());
}
// If we are past our NU5 activation height, but there are no V5 transactions in recent blocks,
// the Zebra instance that verified those blocks had no NU5 activation height.
if count >= constants::MAX_LEGACY_CHAIN_BLOCKS {
return Err("giving up after checking too many blocks".into());
}
// If a transaction `network_upgrade` field is different from the network upgrade calculated
// using our activation heights, the Zebra instance that verified those blocks had different
// network upgrade heights.
block
.check_transaction_network_upgrade_consistency(network)
.map_err(|_| "inconsistent network upgrade found in transaction")?;
// If we find at least one transaction with a valid `network_upgrade` field, the Zebra instance that
// verified those blocks used the same network upgrade heights. (Up to this point in the chain.)
let has_network_upgrade = block
.transactions
.iter()
.find_map(|trans| trans.network_upgrade())
.is_some();
if has_network_upgrade {
return Ok(());
}
}
Ok(())
}

View File

@ -16,7 +16,7 @@ use zebra_test::{prelude::*, transcript::Transcript};
use crate::{
arbitrary::Prepare,
constants, init_test,
service::{chain_tip::TipAction, populated_state, StateService},
service::{arbitrary::populated_state, chain_tip::TipAction, StateService},
tests::setup::{partial_nu5_chain_strategy, transaction_v4_from_coinbase},
BoxError, Config, FinalizedBlock, PreparedBlock, Request, Response,
};
@ -305,7 +305,7 @@ proptest! {
fn some_block_less_than_network_upgrade(
(network, nu_activation_height, chain) in partial_nu5_chain_strategy(4, true, UNDER_LEGACY_CHAIN_LIMIT, NetworkUpgrade::Canopy)
) {
let response = crate::service::legacy_chain_check(nu_activation_height, chain.into_iter().rev(), network)
let response = crate::service::check::legacy_chain(nu_activation_height, chain.into_iter().rev(), network)
.map_err(|error| error.to_string());
prop_assert_eq!(response, Ok(()));
@ -316,7 +316,7 @@ proptest! {
fn no_transaction_with_network_upgrade(
(network, nu_activation_height, chain) in partial_nu5_chain_strategy(4, true, OVER_LEGACY_CHAIN_LIMIT, NetworkUpgrade::Canopy)
) {
let response = crate::service::legacy_chain_check(nu_activation_height, chain.into_iter().rev(), network)
let response = crate::service::check::legacy_chain(nu_activation_height, chain.into_iter().rev(), network)
.map_err(|error| error.to_string());
prop_assert_eq!(
@ -350,7 +350,7 @@ proptest! {
.is_err()
);
let response = crate::service::legacy_chain_check(
let response = crate::service::check::legacy_chain(
nu_activation_height,
chain.clone().into_iter().rev(),
network
@ -370,7 +370,7 @@ proptest! {
fn at_least_one_transaction_with_valid_network_upgrade(
(network, nu_activation_height, chain) in partial_nu5_chain_strategy(5, true, UNDER_LEGACY_CHAIN_LIMIT, NetworkUpgrade::Canopy)
) {
let response = crate::service::legacy_chain_check(nu_activation_height, chain.into_iter().rev(), network)
let response = crate::service::check::legacy_chain(nu_activation_height, chain.into_iter().rev(), network)
.map_err(|error| error.to_string());
prop_assert_eq!(response, Ok(()));