Reject connections from outdated peers (#2519)
* Simplify state service initialization in test Use the test helper function to remove redundant code. * Create `BestTipHeight` helper type This type abstracts away the calculation of the best tip height based on the finalized block height and the best non-finalized chain's tip. * Add `best_tip_height` field to `StateService` The receiver endpoint is currently ignored. * Return receiver endpoint from service constructor Make it available so that the best tip height can be watched. * Update finalized height after finalizing blocks After blocks from the queue are finalized and committed to disk, update the finalized block height. * Update best non-finalized height after validation Update the value of the best non-finalized chain tip block height after a new block is committed to the non-finalized state. * Update finalized height after loading from disk When `FinalizedState` is first created, it loads the state from persistent storage, and the finalized tip height is updated. Therefore, the `best_tip_height` must be notified of the initial value. * Update the finalized height on checkpoint commit When a checkpointed block is commited, it bypasses the non-finalized state, so there's an extra place where the finalized height has to be updated. * Add `best_tip_height` to `Handshake` service It can be configured using the `Builder::with_best_tip_height`. It's currently not used, but it will be used to determine if a connection to a remote peer should be rejected or not based on that peer's protocol version. * Require best tip height to init. `zebra_network` Without it the handshake service can't properly enforce the minimum network protocol version from peers. Zebrad obtains the best tip height endpoint from `zebra_state`, and the test vectors simply use a dummy endpoint that's fixed at the genesis height. * Pass `best_tip_height` to proto. ver. negotiation The protocol version negotiation code will reject connections to peers if they are using an old protocol version. An old version is determined based on the current known best chain tip height. * Handle an optional height in `Version` Fallback to the genesis height in `None` is specified. * Reject connections to peers on old proto. versions Avoid connecting to peers that are on protocol versions that don't recognize a network update. * Document why peers on old versions are rejected Describe why it's a security issue above the check. * Test if `BestTipHeight` starts with `None` Check if initially there is no best tip height. * Test if best tip height is max. of latest values After applying a list of random updates where each one either sets the finalized height or the non-finalized height, check that the best tip height is the maximum of the most recently set finalized height and the most recently set non-finalized height. * Add `queue_and_commit_finalized` method A small refactor to make testing easier. The handling of requests for committing non-finalized and finalized blocks is now more consistent. * Add `assert_block_can_be_validated` helper Refactor to move into a separate method some assertions that are done before a block is validated. This is to allow moving these assertions more easily to simplify testing. * Remove redundant PoW block assertion It's also checked in `zebra_state::service::check::block_is_contextually_valid`, and it was getting in the way of tests that received a gossiped block before finalizing enough blocks. * Create a test strategy for test vector chain Splits a chain loaded from the test vectors in two parts, containing the blocks to finalize and the blocks to keep in the non-finalized state. * Test committing blocks update best tip height Create a mock blockchain state, with a chain of finalized blocks and a chain of non-finalized blocks. Commit all the blocks appropriately, and verify that the best tip height is updated. Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
parent
751185d4ec
commit
4c4dbfe7cd
|
@ -13,7 +13,12 @@ use futures::{
|
||||||
channel::{mpsc, oneshot},
|
channel::{mpsc, oneshot},
|
||||||
future, FutureExt, SinkExt, StreamExt,
|
future, FutureExt, SinkExt, StreamExt,
|
||||||
};
|
};
|
||||||
use tokio::{net::TcpStream, sync::broadcast, task::JoinError, time::timeout};
|
use tokio::{
|
||||||
|
net::TcpStream,
|
||||||
|
sync::{broadcast, watch},
|
||||||
|
task::JoinError,
|
||||||
|
time::timeout,
|
||||||
|
};
|
||||||
use tokio_util::codec::Framed;
|
use tokio_util::codec::Framed;
|
||||||
use tower::Service;
|
use tower::Service;
|
||||||
use tracing::{span, Level, Span};
|
use tracing::{span, Level, Span};
|
||||||
|
@ -53,6 +58,7 @@ pub struct Handshake<S> {
|
||||||
our_services: PeerServices,
|
our_services: PeerServices,
|
||||||
relay: bool,
|
relay: bool,
|
||||||
parent_span: Span,
|
parent_span: Span,
|
||||||
|
best_tip_height: Option<watch::Receiver<Option<block::Height>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The peer address that we are handshaking with.
|
/// The peer address that we are handshaking with.
|
||||||
|
@ -302,6 +308,7 @@ pub struct Builder<S> {
|
||||||
user_agent: Option<String>,
|
user_agent: Option<String>,
|
||||||
relay: Option<bool>,
|
relay: Option<bool>,
|
||||||
inv_collector: Option<broadcast::Sender<(InventoryHash, SocketAddr)>>,
|
inv_collector: Option<broadcast::Sender<(InventoryHash, SocketAddr)>>,
|
||||||
|
best_tip_height: Option<watch::Receiver<Option<block::Height>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S> Builder<S>
|
impl<S> Builder<S>
|
||||||
|
@ -361,6 +368,18 @@ where
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Provide a realtime endpoint to obtain the current best chain tip block height. Optional.
|
||||||
|
///
|
||||||
|
/// If this is unset, the minimum accepted protocol version for peer connections is kept
|
||||||
|
/// constant over network upgrade activations.
|
||||||
|
pub fn with_best_tip_height(
|
||||||
|
mut self,
|
||||||
|
best_tip_height: Option<watch::Receiver<Option<block::Height>>>,
|
||||||
|
) -> Self {
|
||||||
|
self.best_tip_height = best_tip_height;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Whether to request that peers relay transactions to our node. Optional.
|
/// Whether to request that peers relay transactions to our node. Optional.
|
||||||
///
|
///
|
||||||
/// If this is unset, the node will not request transactions.
|
/// If this is unset, the node will not request transactions.
|
||||||
|
@ -402,6 +421,7 @@ where
|
||||||
our_services,
|
our_services,
|
||||||
relay,
|
relay,
|
||||||
parent_span: Span::current(),
|
parent_span: Span::current(),
|
||||||
|
best_tip_height: self.best_tip_height,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -424,6 +444,7 @@ where
|
||||||
our_services: None,
|
our_services: None,
|
||||||
relay: None,
|
relay: None,
|
||||||
inv_collector: None,
|
inv_collector: None,
|
||||||
|
best_tip_height: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -433,6 +454,7 @@ where
|
||||||
///
|
///
|
||||||
/// We split `Handshake` into its components before calling this function,
|
/// We split `Handshake` into its components before calling this function,
|
||||||
/// to avoid infectious `Sync` bounds on the returned future.
|
/// to avoid infectious `Sync` bounds on the returned future.
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub async fn negotiate_version(
|
pub async fn negotiate_version(
|
||||||
peer_conn: &mut Framed<TcpStream, Codec>,
|
peer_conn: &mut Framed<TcpStream, Codec>,
|
||||||
connected_addr: &ConnectedAddr,
|
connected_addr: &ConnectedAddr,
|
||||||
|
@ -441,6 +463,7 @@ pub async fn negotiate_version(
|
||||||
user_agent: String,
|
user_agent: String,
|
||||||
our_services: PeerServices,
|
our_services: PeerServices,
|
||||||
relay: bool,
|
relay: bool,
|
||||||
|
best_tip_height: Option<watch::Receiver<Option<block::Height>>>,
|
||||||
) -> Result<(Version, PeerServices, SocketAddr), HandshakeError> {
|
) -> Result<(Version, PeerServices, SocketAddr), HandshakeError> {
|
||||||
// Create a random nonce for this connection
|
// Create a random nonce for this connection
|
||||||
let local_nonce = Nonce::default();
|
let local_nonce = Nonce::default();
|
||||||
|
@ -552,17 +575,11 @@ pub async fn negotiate_version(
|
||||||
Err(HandshakeError::NonceReuse)?;
|
Err(HandshakeError::NonceReuse)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Reject connections with nodes that don't know about the current network upgrade (#1334)
|
// SECURITY: Reject connections to peers on old versions, because they might not know about all
|
||||||
// Use the latest non-finalized block height, rather than the minimum
|
// network upgrades and could lead to chain forks or slower block propagation.
|
||||||
if remote_version
|
let height = best_tip_height.and_then(|height| *height.borrow());
|
||||||
< Version::min_remote_for_height(
|
let min_version = Version::min_remote_for_height(config.network, height);
|
||||||
config.network,
|
if remote_version < min_version {
|
||||||
// This code will be replaced in #1334
|
|
||||||
constants::INITIAL_MIN_NETWORK_PROTOCOL_VERSION
|
|
||||||
.activation_height(config.network)
|
|
||||||
.expect("minimum network protocol network upgrade has an activation height"),
|
|
||||||
)
|
|
||||||
{
|
|
||||||
// Disconnect if peer is using an obsolete version.
|
// Disconnect if peer is using an obsolete version.
|
||||||
Err(HandshakeError::ObsoleteVersion(remote_version))?;
|
Err(HandshakeError::ObsoleteVersion(remote_version))?;
|
||||||
}
|
}
|
||||||
|
@ -617,6 +634,7 @@ where
|
||||||
let user_agent = self.user_agent.clone();
|
let user_agent = self.user_agent.clone();
|
||||||
let our_services = self.our_services;
|
let our_services = self.our_services;
|
||||||
let relay = self.relay;
|
let relay = self.relay;
|
||||||
|
let best_tip_height = self.best_tip_height.clone();
|
||||||
|
|
||||||
let fut = async move {
|
let fut = async move {
|
||||||
debug!(
|
debug!(
|
||||||
|
@ -647,6 +665,7 @@ where
|
||||||
user_agent,
|
user_agent,
|
||||||
our_services,
|
our_services,
|
||||||
relay,
|
relay,
|
||||||
|
best_tip_height,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
.await??;
|
.await??;
|
||||||
|
|
|
@ -12,7 +12,11 @@ use futures::{
|
||||||
stream::{FuturesUnordered, StreamExt},
|
stream::{FuturesUnordered, StreamExt},
|
||||||
TryFutureExt,
|
TryFutureExt,
|
||||||
};
|
};
|
||||||
use tokio::{net::TcpListener, sync::broadcast, time::Instant};
|
use tokio::{
|
||||||
|
net::TcpListener,
|
||||||
|
sync::{broadcast, watch},
|
||||||
|
time::Instant,
|
||||||
|
};
|
||||||
use tower::{
|
use tower::{
|
||||||
buffer::Buffer, discover::Change, layer::Layer, load::peak_ewma::PeakEwmaDiscover,
|
buffer::Buffer, discover::Change, layer::Layer, load::peak_ewma::PeakEwmaDiscover,
|
||||||
util::BoxService, Service, ServiceExt,
|
util::BoxService, Service, ServiceExt,
|
||||||
|
@ -25,7 +29,7 @@ use crate::{
|
||||||
BoxError, Config, Request, Response,
|
BoxError, Config, Request, Response,
|
||||||
};
|
};
|
||||||
|
|
||||||
use zebra_chain::parameters::Network;
|
use zebra_chain::{block, parameters::Network};
|
||||||
|
|
||||||
use super::CandidateSet;
|
use super::CandidateSet;
|
||||||
use super::PeerSet;
|
use super::PeerSet;
|
||||||
|
@ -59,6 +63,7 @@ type PeerChange = Result<Change<SocketAddr, peer::Client>, BoxError>;
|
||||||
pub async fn init<S>(
|
pub async fn init<S>(
|
||||||
config: Config,
|
config: Config,
|
||||||
inbound_service: S,
|
inbound_service: S,
|
||||||
|
best_tip_height: Option<watch::Receiver<Option<block::Height>>>,
|
||||||
) -> (
|
) -> (
|
||||||
Buffer<BoxService<Request, Response, BoxError>, Request>,
|
Buffer<BoxService<Request, Response, BoxError>, Request>,
|
||||||
Arc<std::sync::Mutex<AddressBook>>,
|
Arc<std::sync::Mutex<AddressBook>>,
|
||||||
|
@ -87,6 +92,7 @@ where
|
||||||
.with_timestamp_collector(timestamp_collector)
|
.with_timestamp_collector(timestamp_collector)
|
||||||
.with_advertised_services(PeerServices::NODE_NETWORK)
|
.with_advertised_services(PeerServices::NODE_NETWORK)
|
||||||
.with_user_agent(crate::constants::USER_AGENT.to_string())
|
.with_user_agent(crate::constants::USER_AGENT.to_string())
|
||||||
|
.with_best_tip_height(best_tip_height)
|
||||||
.want_transactions(true)
|
.want_transactions(true)
|
||||||
.finish()
|
.finish()
|
||||||
.expect("configured all required parameters");
|
.expect("configured all required parameters");
|
||||||
|
|
|
@ -110,7 +110,7 @@ async fn local_listener_port_with(listen_addr: SocketAddr, network: Network) {
|
||||||
let inbound_service =
|
let inbound_service =
|
||||||
service_fn(|_| async { unreachable!("inbound service should never be called") });
|
service_fn(|_| async { unreachable!("inbound service should never be called") });
|
||||||
|
|
||||||
let (_peer_service, address_book) = init(config, inbound_service).await;
|
let (_peer_service, address_book) = init(config, inbound_service, None).await;
|
||||||
let local_listener = address_book.lock().unwrap().local_listener_meta_addr();
|
let local_listener = address_book.lock().unwrap().local_listener_meta_addr();
|
||||||
|
|
||||||
if listen_addr.port() == 0 {
|
if listen_addr.port() == 0 {
|
||||||
|
|
|
@ -48,7 +48,11 @@ impl Version {
|
||||||
/// # Panics
|
/// # Panics
|
||||||
///
|
///
|
||||||
/// If we are incompatible with our own minimum remote protocol version.
|
/// If we are incompatible with our own minimum remote protocol version.
|
||||||
pub fn min_remote_for_height(network: Network, height: block::Height) -> Version {
|
pub fn min_remote_for_height(
|
||||||
|
network: Network,
|
||||||
|
height: impl Into<Option<block::Height>>,
|
||||||
|
) -> Version {
|
||||||
|
let height = height.into().unwrap_or(block::Height(0));
|
||||||
let min_spec = Version::min_specified_for_height(network, height);
|
let min_spec = Version::min_specified_for_height(network, height);
|
||||||
|
|
||||||
// shut down if our own version is too old
|
// shut down if our own version is too old
|
||||||
|
|
|
@ -6,28 +6,28 @@ use std::{
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
|
|
||||||
use check::difficulty::POW_MEDIAN_BLOCK_SPAN;
|
|
||||||
use futures::future::FutureExt;
|
use futures::future::FutureExt;
|
||||||
use non_finalized_state::{NonFinalizedState, QueuedBlocks};
|
use non_finalized_state::{NonFinalizedState, QueuedBlocks};
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::{oneshot, watch};
|
||||||
#[cfg(any(test, feature = "proptest-impl"))]
|
#[cfg(any(test, feature = "proptest-impl"))]
|
||||||
use tower::buffer::Buffer;
|
use tower::buffer::Buffer;
|
||||||
use tower::{util::BoxService, Service};
|
use tower::{util::BoxService, Service};
|
||||||
use tracing::instrument;
|
use tracing::instrument;
|
||||||
use zebra_chain::{
|
use zebra_chain::{
|
||||||
block::{self, Block},
|
block::{self, Block},
|
||||||
parameters::POW_AVERAGING_WINDOW,
|
|
||||||
parameters::{Network, NetworkUpgrade},
|
parameters::{Network, NetworkUpgrade},
|
||||||
transaction,
|
transaction,
|
||||||
transaction::Transaction,
|
transaction::Transaction,
|
||||||
transparent,
|
transparent,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use self::best_tip_height::BestTipHeight;
|
||||||
use crate::{
|
use crate::{
|
||||||
constants, request::HashOrHeight, BoxError, CloneError, CommitBlockError, Config,
|
constants, request::HashOrHeight, BoxError, CloneError, CommitBlockError, Config,
|
||||||
FinalizedBlock, PreparedBlock, Request, Response, ValidateContextError,
|
FinalizedBlock, PreparedBlock, Request, Response, ValidateContextError,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
mod best_tip_height;
|
||||||
pub(crate) mod check;
|
pub(crate) mod check;
|
||||||
mod finalized_state;
|
mod finalized_state;
|
||||||
mod non_finalized_state;
|
mod non_finalized_state;
|
||||||
|
@ -63,14 +63,21 @@ pub(crate) struct StateService {
|
||||||
network: Network,
|
network: Network,
|
||||||
/// Instant tracking the last time `pending_utxos` was pruned
|
/// Instant tracking the last time `pending_utxos` was pruned
|
||||||
last_prune: Instant,
|
last_prune: Instant,
|
||||||
|
/// The current best chain tip height.
|
||||||
|
best_tip_height: BestTipHeight,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StateService {
|
impl StateService {
|
||||||
const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
|
const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
|
||||||
|
|
||||||
pub fn new(config: Config, network: Network) -> Self {
|
pub fn new(config: Config, network: Network) -> (Self, watch::Receiver<Option<block::Height>>) {
|
||||||
|
let (mut best_tip_height, best_tip_height_receiver) = BestTipHeight::new();
|
||||||
let disk = FinalizedState::new(&config, network);
|
let disk = FinalizedState::new(&config, network);
|
||||||
|
|
||||||
|
if let Some(finalized_height) = disk.finalized_tip_height() {
|
||||||
|
best_tip_height.set_finalized_height(finalized_height);
|
||||||
|
}
|
||||||
|
|
||||||
let mem = NonFinalizedState::new(network);
|
let mem = NonFinalizedState::new(network);
|
||||||
let queued_blocks = QueuedBlocks::default();
|
let queued_blocks = QueuedBlocks::default();
|
||||||
let pending_utxos = PendingUtxos::default();
|
let pending_utxos = PendingUtxos::default();
|
||||||
|
@ -82,6 +89,7 @@ impl StateService {
|
||||||
pending_utxos,
|
pending_utxos,
|
||||||
network,
|
network,
|
||||||
last_prune: Instant::now(),
|
last_prune: Instant::now(),
|
||||||
|
best_tip_height,
|
||||||
};
|
};
|
||||||
|
|
||||||
tracing::info!("starting legacy chain check");
|
tracing::info!("starting legacy chain check");
|
||||||
|
@ -108,7 +116,23 @@ impl StateService {
|
||||||
}
|
}
|
||||||
tracing::info!("no legacy chain found");
|
tracing::info!("no legacy chain found");
|
||||||
|
|
||||||
state
|
(state, best_tip_height_receiver)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Queue a finalized block for verification and storage in the finalized state.
|
||||||
|
fn queue_and_commit_finalized(
|
||||||
|
&mut self,
|
||||||
|
finalized: FinalizedBlock,
|
||||||
|
) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
|
||||||
|
let (rsp_tx, rsp_rx) = oneshot::channel();
|
||||||
|
|
||||||
|
self.disk.queue_and_commit_finalized((finalized, rsp_tx));
|
||||||
|
|
||||||
|
if let Some(finalized_height) = self.disk.finalized_tip_height() {
|
||||||
|
self.best_tip_height.set_finalized_height(finalized_height);
|
||||||
|
}
|
||||||
|
|
||||||
|
rsp_rx
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Queue a non finalized block for verification and check if any queued
|
/// Queue a non finalized block for verification and check if any queued
|
||||||
|
@ -165,10 +189,17 @@ impl StateService {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
self.queued_blocks
|
let finalized_tip_height = self.disk.finalized_tip_height().expect(
|
||||||
.prune_by_height(self.disk.finalized_tip_height().expect(
|
|
||||||
"Finalized state must have at least one block before committing non-finalized state",
|
"Finalized state must have at least one block before committing non-finalized state",
|
||||||
));
|
);
|
||||||
|
let non_finalized_tip_height = self.mem.best_tip().map(|(height, _hash)| height);
|
||||||
|
|
||||||
|
self.queued_blocks.prune_by_height(finalized_tip_height);
|
||||||
|
|
||||||
|
self.best_tip_height
|
||||||
|
.set_finalized_height(finalized_tip_height);
|
||||||
|
self.best_tip_height
|
||||||
|
.set_best_non_finalized_height(non_finalized_tip_height);
|
||||||
|
|
||||||
tracing::trace!("finished processing queued block");
|
tracing::trace!("finished processing queued block");
|
||||||
rsp_rx
|
rsp_rx
|
||||||
|
@ -204,23 +235,6 @@ impl StateService {
|
||||||
let queued_children = self.queued_blocks.dequeue_children(parent_hash);
|
let queued_children = self.queued_blocks.dequeue_children(parent_hash);
|
||||||
|
|
||||||
for (child, rsp_tx) in queued_children {
|
for (child, rsp_tx) in queued_children {
|
||||||
// required by validate_and_commit, moved here to make testing easier
|
|
||||||
assert!(
|
|
||||||
child.height > self.network.mandatory_checkpoint_height(),
|
|
||||||
"invalid non-finalized block height: the canopy checkpoint is mandatory, \
|
|
||||||
pre-canopy blocks, and the canopy activation block, \
|
|
||||||
must be committed to the state as finalized blocks"
|
|
||||||
);
|
|
||||||
|
|
||||||
// required by check_contextual_validity, moved here to make testing easier
|
|
||||||
let relevant_chain =
|
|
||||||
self.any_ancestor_blocks(child.block.header.previous_block_hash);
|
|
||||||
assert!(
|
|
||||||
relevant_chain.len() >= POW_AVERAGING_WINDOW + POW_MEDIAN_BLOCK_SPAN,
|
|
||||||
"contextual validation requires at least \
|
|
||||||
28 (POW_AVERAGING_WINDOW + POW_MEDIAN_BLOCK_SPAN) blocks"
|
|
||||||
);
|
|
||||||
|
|
||||||
let child_hash = child.hash;
|
let child_hash = child.hash;
|
||||||
let result;
|
let result;
|
||||||
|
|
||||||
|
@ -504,6 +518,17 @@ impl StateService {
|
||||||
let intersection = self.find_best_chain_intersection(known_blocks);
|
let intersection = self.find_best_chain_intersection(known_blocks);
|
||||||
self.collect_best_chain_hashes(intersection, stop, max_len)
|
self.collect_best_chain_hashes(intersection, stop, max_len)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Assert some assumptions about the prepared `block` before it is validated.
|
||||||
|
fn assert_block_can_be_validated(&self, block: &PreparedBlock) {
|
||||||
|
// required by validate_and_commit, moved here to make testing easier
|
||||||
|
assert!(
|
||||||
|
block.height > self.network.mandatory_checkpoint_height(),
|
||||||
|
"invalid non-finalized block height: the canopy checkpoint is mandatory, pre-canopy \
|
||||||
|
blocks, and the canopy activation block, must be committed to the state as finalized \
|
||||||
|
blocks"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) struct Iter<'a> {
|
pub(crate) struct Iter<'a> {
|
||||||
|
@ -640,6 +665,8 @@ impl Service<Request> for StateService {
|
||||||
Request::CommitBlock(prepared) => {
|
Request::CommitBlock(prepared) => {
|
||||||
metrics::counter!("state.requests", 1, "type" => "commit_block");
|
metrics::counter!("state.requests", 1, "type" => "commit_block");
|
||||||
|
|
||||||
|
self.assert_block_can_be_validated(&prepared);
|
||||||
|
|
||||||
self.pending_utxos
|
self.pending_utxos
|
||||||
.check_against_ordered(&prepared.new_outputs);
|
.check_against_ordered(&prepared.new_outputs);
|
||||||
let rsp_rx = self.queue_and_commit_non_finalized(prepared);
|
let rsp_rx = self.queue_and_commit_non_finalized(prepared);
|
||||||
|
@ -656,10 +683,8 @@ impl Service<Request> for StateService {
|
||||||
Request::CommitFinalizedBlock(finalized) => {
|
Request::CommitFinalizedBlock(finalized) => {
|
||||||
metrics::counter!("state.requests", 1, "type" => "commit_finalized_block");
|
metrics::counter!("state.requests", 1, "type" => "commit_finalized_block");
|
||||||
|
|
||||||
let (rsp_tx, rsp_rx) = oneshot::channel();
|
|
||||||
|
|
||||||
self.pending_utxos.check_against(&finalized.new_outputs);
|
self.pending_utxos.check_against(&finalized.new_outputs);
|
||||||
self.disk.queue_and_commit_finalized((finalized, rsp_tx));
|
let rsp_rx = self.queue_and_commit_finalized(finalized);
|
||||||
|
|
||||||
async move {
|
async move {
|
||||||
rsp_rx
|
rsp_rx
|
||||||
|
@ -748,8 +773,16 @@ impl Service<Request> for StateService {
|
||||||
/// possible to construct multiple state services in the same application (as
|
/// possible to construct multiple state services in the same application (as
|
||||||
/// long as they, e.g., use different storage locations), but doing so is
|
/// long as they, e.g., use different storage locations), but doing so is
|
||||||
/// probably not what you want.
|
/// probably not what you want.
|
||||||
pub fn init(config: Config, network: Network) -> BoxService<Request, Response, BoxError> {
|
pub fn init(
|
||||||
BoxService::new(StateService::new(config, network))
|
config: Config,
|
||||||
|
network: Network,
|
||||||
|
) -> (
|
||||||
|
BoxService<Request, Response, BoxError>,
|
||||||
|
watch::Receiver<Option<block::Height>>,
|
||||||
|
) {
|
||||||
|
let (state_service, best_tip_height) = StateService::new(config, network);
|
||||||
|
|
||||||
|
(BoxService::new(state_service), best_tip_height)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Initialize a state service with an ephemeral [`Config`] and a buffer with a single slot.
|
/// Initialize a state service with an ephemeral [`Config`] and a buffer with a single slot.
|
||||||
|
@ -757,7 +790,7 @@ pub fn init(config: Config, network: Network) -> BoxService<Request, Response, B
|
||||||
/// This can be used to create a state service for testing. See also [`init`].
|
/// This can be used to create a state service for testing. See also [`init`].
|
||||||
#[cfg(any(test, feature = "proptest-impl"))]
|
#[cfg(any(test, feature = "proptest-impl"))]
|
||||||
pub fn init_test(network: Network) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
|
pub fn init_test(network: Network) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
|
||||||
let state_service = StateService::new(Config::ephemeral(), network);
|
let (state_service, _) = StateService::new(Config::ephemeral(), network);
|
||||||
|
|
||||||
Buffer::new(BoxService::new(state_service), 1)
|
Buffer::new(BoxService::new(state_service), 1)
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,70 @@
|
||||||
|
use tokio::sync::watch;
|
||||||
|
|
||||||
|
use zebra_chain::block;
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests;
|
||||||
|
|
||||||
|
/// A helper type to determine the best chain tip block height.
|
||||||
|
///
|
||||||
|
/// The block height is determined based on the current finalized block height and the current best
|
||||||
|
/// non-finalized chain's tip block height. The height is made available from a [`watch::Receiver`].
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct BestTipHeight {
|
||||||
|
finalized: Option<block::Height>,
|
||||||
|
non_finalized: Option<block::Height>,
|
||||||
|
sender: watch::Sender<Option<block::Height>>,
|
||||||
|
// TODO: Replace with calls to `watch::Sender::borrow` once Tokio is updated to 1.0.0 (#2573)
|
||||||
|
active_value: Option<block::Height>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BestTipHeight {
|
||||||
|
/// Create a new instance of [`BestTipHeight`] and the [`watch::Receiver`] endpoint for the
|
||||||
|
/// current best tip block height.
|
||||||
|
pub fn new() -> (Self, watch::Receiver<Option<block::Height>>) {
|
||||||
|
let (sender, receiver) = watch::channel(None);
|
||||||
|
|
||||||
|
(
|
||||||
|
BestTipHeight {
|
||||||
|
finalized: None,
|
||||||
|
non_finalized: None,
|
||||||
|
sender,
|
||||||
|
active_value: None,
|
||||||
|
},
|
||||||
|
receiver,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Update the current finalized block height.
|
||||||
|
///
|
||||||
|
/// May trigger an update to best tip height.
|
||||||
|
pub fn set_finalized_height(&mut self, new_height: block::Height) {
|
||||||
|
if self.finalized != Some(new_height) {
|
||||||
|
self.finalized = Some(new_height);
|
||||||
|
self.update();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Update the current non-finalized block height.
|
||||||
|
///
|
||||||
|
/// May trigger an update to the best tip height.
|
||||||
|
pub fn set_best_non_finalized_height(&mut self, new_height: Option<block::Height>) {
|
||||||
|
if self.non_finalized != new_height {
|
||||||
|
self.non_finalized = new_height;
|
||||||
|
self.update();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Possibly send an update to listeners.
|
||||||
|
///
|
||||||
|
/// An update is only sent if the current best tip height is different from the last best tip
|
||||||
|
/// height that was sent.
|
||||||
|
fn update(&mut self) {
|
||||||
|
let new_value = self.non_finalized.max(self.finalized);
|
||||||
|
|
||||||
|
if new_value != self.active_value {
|
||||||
|
let _ = self.sender.send(new_value);
|
||||||
|
self.active_value = new_value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,2 @@
|
||||||
|
mod prop;
|
||||||
|
mod vectors;
|
|
@ -0,0 +1,47 @@
|
||||||
|
use proptest::prelude::*;
|
||||||
|
use proptest_derive::Arbitrary;
|
||||||
|
|
||||||
|
use zebra_chain::block;
|
||||||
|
|
||||||
|
use super::super::BestTipHeight;
|
||||||
|
|
||||||
|
proptest! {
|
||||||
|
#[test]
|
||||||
|
fn best_tip_value_is_heighest_of_latest_finalized_and_non_finalized_heights(
|
||||||
|
height_updates in any::<Vec<HeightUpdate>>(),
|
||||||
|
) {
|
||||||
|
let (mut best_tip_height, receiver) = BestTipHeight::new();
|
||||||
|
|
||||||
|
let mut latest_finalized_height = None;
|
||||||
|
let mut latest_non_finalized_height = None;
|
||||||
|
|
||||||
|
for update in height_updates {
|
||||||
|
match update {
|
||||||
|
HeightUpdate::Finalized(height) => {
|
||||||
|
best_tip_height.set_finalized_height(height);
|
||||||
|
latest_finalized_height = Some(height);
|
||||||
|
}
|
||||||
|
HeightUpdate::NonFinalized(height) => {
|
||||||
|
best_tip_height.set_best_non_finalized_height(height);
|
||||||
|
latest_non_finalized_height = height;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let expected_height = match (latest_finalized_height, latest_non_finalized_height) {
|
||||||
|
(Some(finalized_height), Some(non_finalized_height)) => {
|
||||||
|
Some(finalized_height.max(non_finalized_height))
|
||||||
|
}
|
||||||
|
(finalized_height, None) => finalized_height,
|
||||||
|
(None, non_finalized_height) => non_finalized_height,
|
||||||
|
};
|
||||||
|
|
||||||
|
prop_assert_eq!(*receiver.borrow(), expected_height);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Arbitrary, Clone, Copy, Debug)]
|
||||||
|
enum HeightUpdate {
|
||||||
|
Finalized(block::Height),
|
||||||
|
NonFinalized(Option<block::Height>),
|
||||||
|
}
|
|
@ -0,0 +1,8 @@
|
||||||
|
use super::super::BestTipHeight;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn best_tip_value_is_initially_empty() {
|
||||||
|
let (_best_tip_height, receiver) = BestTipHeight::new();
|
||||||
|
|
||||||
|
assert_eq!(*receiver.borrow(), None);
|
||||||
|
}
|
|
@ -2,16 +2,21 @@ use std::{env, sync::Arc};
|
||||||
|
|
||||||
use futures::stream::FuturesUnordered;
|
use futures::stream::FuturesUnordered;
|
||||||
use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};
|
use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};
|
||||||
|
|
||||||
use zebra_chain::{
|
use zebra_chain::{
|
||||||
block::Block,
|
block::Block,
|
||||||
parameters::{Network, NetworkUpgrade},
|
parameters::{Network, NetworkUpgrade},
|
||||||
serialization::ZcashDeserializeInto,
|
serialization::{ZcashDeserialize, ZcashDeserializeInto},
|
||||||
transaction, transparent,
|
transaction, transparent,
|
||||||
};
|
};
|
||||||
use zebra_test::{prelude::*, transcript::Transcript};
|
use zebra_test::{prelude::*, transcript::Transcript};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
constants, init_test, tests::setup::partial_nu5_chain_strategy, BoxError, Request, Response,
|
arbitrary::Prepare,
|
||||||
|
constants, init_test,
|
||||||
|
service::StateService,
|
||||||
|
tests::setup::{partial_nu5_chain_strategy, transaction_v4_from_coinbase},
|
||||||
|
BoxError, Config, FinalizedBlock, PreparedBlock, Request, Response,
|
||||||
};
|
};
|
||||||
|
|
||||||
const LAST_BLOCK_HEIGHT: u32 = 10;
|
const LAST_BLOCK_HEIGHT: u32 = 10;
|
||||||
|
@ -274,4 +279,85 @@ proptest! {
|
||||||
|
|
||||||
prop_assert_eq!(response, Ok(()));
|
prop_assert_eq!(response, Ok(()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Test that the best tip height is updated accordingly.
|
||||||
|
///
|
||||||
|
/// 1. Generate a finalized chain and some non-finalized blocks.
|
||||||
|
/// 2. Check that initially the best tip height is empty.
|
||||||
|
/// 3. Commit the finalized blocks and check that the best tip height is updated accordingly.
|
||||||
|
/// 4. Commit the non-finalized blocks and check that the best tip height is also updated
|
||||||
|
/// accordingly.
|
||||||
|
#[test]
|
||||||
|
fn best_tip_height_is_updated(
|
||||||
|
(network, finalized_blocks, non_finalized_blocks)
|
||||||
|
in continuous_empty_blocks_from_test_vectors(),
|
||||||
|
) {
|
||||||
|
zebra_test::init();
|
||||||
|
|
||||||
|
let (mut state_service, best_tip_height) = StateService::new(Config::ephemeral(), network);
|
||||||
|
|
||||||
|
prop_assert_eq!(*best_tip_height.borrow(), None);
|
||||||
|
|
||||||
|
for block in finalized_blocks {
|
||||||
|
let expected_height = block.height;
|
||||||
|
|
||||||
|
state_service.queue_and_commit_finalized(block);
|
||||||
|
|
||||||
|
prop_assert_eq!(*best_tip_height.borrow(), Some(expected_height));
|
||||||
|
}
|
||||||
|
|
||||||
|
for block in non_finalized_blocks {
|
||||||
|
let expected_height = block.height;
|
||||||
|
|
||||||
|
state_service.queue_and_commit_non_finalized(block);
|
||||||
|
|
||||||
|
prop_assert_eq!(*best_tip_height.borrow(), Some(expected_height));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test strategy to generate a chain split in two from the test vectors.
|
||||||
|
///
|
||||||
|
/// Selects either the mainnet or testnet chain test vector and randomly splits the chain in two
|
||||||
|
/// lists of blocks. The first containing the blocks to be finalized (which always includes at
|
||||||
|
/// least the genesis block) and the blocks to be stored in the non-finalized state.
|
||||||
|
fn continuous_empty_blocks_from_test_vectors(
|
||||||
|
) -> impl Strategy<Value = (Network, Vec<FinalizedBlock>, Vec<PreparedBlock>)> {
|
||||||
|
any::<Network>()
|
||||||
|
.prop_flat_map(|network| {
|
||||||
|
// Select the test vector based on the network
|
||||||
|
let raw_blocks = match network {
|
||||||
|
Network::Mainnet => &*zebra_test::vectors::CONTINUOUS_MAINNET_BLOCKS,
|
||||||
|
Network::Testnet => &*zebra_test::vectors::CONTINUOUS_TESTNET_BLOCKS,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Transform the test vector's block bytes into a vector of `PreparedBlock`s.
|
||||||
|
let blocks: Vec<_> = raw_blocks
|
||||||
|
.iter()
|
||||||
|
.map(|(_height, &block_bytes)| {
|
||||||
|
let mut block_reader: &[u8] = block_bytes;
|
||||||
|
let mut block = Block::zcash_deserialize(&mut block_reader)
|
||||||
|
.expect("Failed to deserialize block from test vector");
|
||||||
|
|
||||||
|
let coinbase = transaction_v4_from_coinbase(&block.transactions[0]);
|
||||||
|
block.transactions = vec![Arc::new(coinbase)];
|
||||||
|
|
||||||
|
Arc::new(block).prepare()
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Always finalize the genesis block
|
||||||
|
let finalized_blocks_count = 1..=blocks.len();
|
||||||
|
|
||||||
|
(Just(network), Just(blocks), finalized_blocks_count)
|
||||||
|
})
|
||||||
|
.prop_map(|(network, mut blocks, finalized_blocks_count)| {
|
||||||
|
let non_finalized_blocks = blocks.split_off(finalized_blocks_count);
|
||||||
|
let finalized_blocks: Vec<_> = blocks
|
||||||
|
.into_iter()
|
||||||
|
.map(|prepared_block| FinalizedBlock::from(prepared_block.block))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
(network, finalized_blocks, non_finalized_blocks)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,7 +83,7 @@ pub(crate) fn new_state_with_mainnet_genesis() -> (StateService, FinalizedBlock)
|
||||||
.zcash_deserialize_into::<Arc<Block>>()
|
.zcash_deserialize_into::<Arc<Block>>()
|
||||||
.expect("block should deserialize");
|
.expect("block should deserialize");
|
||||||
|
|
||||||
let mut state = StateService::new(Config::ephemeral(), Mainnet);
|
let (mut state, _) = StateService::new(Config::ephemeral(), Mainnet);
|
||||||
|
|
||||||
assert_eq!(None, state.best_tip());
|
assert_eq!(None, state.best_tip());
|
||||||
|
|
||||||
|
|
|
@ -7,7 +7,6 @@
|
||||||
use color_eyre::eyre::Report;
|
use color_eyre::eyre::Report;
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tempdir::TempDir;
|
|
||||||
use zebra_chain::{block::Block, parameters::Network, serialization::ZcashDeserialize};
|
use zebra_chain::{block::Block, parameters::Network, serialization::ZcashDeserialize};
|
||||||
use zebra_test::transcript::{ExpectedTranscriptError, Transcript};
|
use zebra_test::transcript::{ExpectedTranscriptError, Transcript};
|
||||||
|
|
||||||
|
@ -76,20 +75,10 @@ async fn check_transcripts(network: Network) -> Result<(), Report> {
|
||||||
Network::Mainnet => mainnet_transcript,
|
Network::Mainnet => mainnet_transcript,
|
||||||
Network::Testnet => testnet_transcript,
|
Network::Testnet => testnet_transcript,
|
||||||
} {
|
} {
|
||||||
let storage_guard = TempDir::new("")?;
|
let service = zebra_state::init_test(network);
|
||||||
let cache_dir = storage_guard.path().to_owned();
|
|
||||||
let service = zebra_state::init(
|
|
||||||
Config {
|
|
||||||
cache_dir,
|
|
||||||
..Config::default()
|
|
||||||
},
|
|
||||||
network,
|
|
||||||
);
|
|
||||||
let transcript = Transcript::from(transcript_data.iter().cloned());
|
let transcript = Transcript::from(transcript_data.iter().cloned());
|
||||||
/// SPANDOC: check the on disk service against the transcript
|
/// SPANDOC: check the on disk service against the transcript
|
||||||
transcript.check(service).await?;
|
transcript.check(service).await?;
|
||||||
// Delete the contents of the temp directory before going to the next case.
|
|
||||||
std::mem::drop(storage_guard);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
@ -49,10 +49,9 @@ impl StartCmd {
|
||||||
info!(?config);
|
info!(?config);
|
||||||
|
|
||||||
info!("initializing node state");
|
info!("initializing node state");
|
||||||
let state = ServiceBuilder::new().buffer(20).service(zebra_state::init(
|
let (state_service, best_tip_height) =
|
||||||
config.state.clone(),
|
zebra_state::init(config.state.clone(), config.network.network);
|
||||||
config.network.network,
|
let state = ServiceBuilder::new().buffer(20).service(state_service);
|
||||||
));
|
|
||||||
|
|
||||||
info!("initializing verifiers");
|
info!("initializing verifiers");
|
||||||
let verifier = zebra_consensus::chain::init(
|
let verifier = zebra_consensus::chain::init(
|
||||||
|
@ -72,7 +71,8 @@ impl StartCmd {
|
||||||
.buffer(20)
|
.buffer(20)
|
||||||
.service(Inbound::new(setup_rx, state.clone(), verifier.clone()));
|
.service(Inbound::new(setup_rx, state.clone(), verifier.clone()));
|
||||||
|
|
||||||
let (peer_set, address_book) = zebra_network::init(config.network.clone(), inbound).await;
|
let (peer_set, address_book) =
|
||||||
|
zebra_network::init(config.network.clone(), inbound, Some(best_tip_height)).await;
|
||||||
setup_tx
|
setup_tx
|
||||||
.send((peer_set.clone(), address_book))
|
.send((peer_set.clone(), address_book))
|
||||||
.map_err(|_| eyre!("could not send setup data to inbound service"))?;
|
.map_err(|_| eyre!("could not send setup data to inbound service"))?;
|
||||||
|
|
Loading…
Reference in New Issue