Refactor BestTipHeight into a generic ChainTip sender and receiver (#2676)

* Rename BestTipHeight so it can be generalised to ChainTipSender

`fastmod BestTipHeight ChainTipSender zebra*`

For senders:
`fastmod best_tip_height chain_tip_sender zebra*`

For receivers:
`fastmod best_tip_height chain_tip_receiver zebra*`

* Rename best_tip_height module to chain_tip

* Wrap the chain tip watch channel in a ChainTipReceiver type

* Create a ChainTip trait to avoid tricky crate dependencies

And add convenience impls for optional and empty chain tips.

* Use the ChainTip trait in zebra-network

* Replace `Option<ChainTip>` with `NoChainTip`

Co-authored-by: Janito Vaqueiro Ferreira Filho <janito.vff@gmail.com>

Co-authored-by: Janito Vaqueiro Ferreira Filho <janito.vff@gmail.com>
This commit is contained in:
teor 2021-08-27 11:34:33 +10:00 committed by GitHub
parent ac76d35cbb
commit d2e14b22f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 176 additions and 107 deletions

View File

@ -0,0 +1,23 @@
//! Chain tip interfaces.
use crate::block;
/// An interface for querying the chain tip.
///
/// This trait helps avoid dependencies between:
/// * zebra-chain and tokio
/// * zebra-network and zebra-state
pub trait ChainTip {
/// Return the height of the best chain tip.
fn best_tip_height(&self) -> Option<block::Height>;
}
/// A chain tip that is always empty.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct NoChainTip;
impl ChainTip for NoChainTip {
fn best_tip_height(&self) -> Option<block::Height> {
None
}
}

View File

@ -24,6 +24,7 @@ extern crate bitflags;
pub mod amount; pub mod amount;
pub mod block; pub mod block;
pub mod chain_tip;
pub mod fmt; pub mod fmt;
pub mod history_tree; pub mod history_tree;
pub mod orchard; pub mod orchard;

View File

@ -7,15 +7,18 @@ use std::{
}; };
use futures::future::{FutureExt, TryFutureExt}; use futures::future::{FutureExt, TryFutureExt};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tower::{ use tower::{
util::{BoxService, Oneshot}, util::{BoxService, Oneshot},
Service, Service,
}; };
use crate::{peer, BoxError, Config, Request, Response}; use zebra_chain::chain_tip::NoChainTip;
use peer::ConnectedAddr;
use crate::{
peer::{self, ConnectedAddr},
BoxError, Config, Request, Response,
};
/// Use the provided TCP connection to create a Zcash connection completely /// Use the provided TCP connection to create a Zcash connection completely
/// isolated from all other node state. /// isolated from all other node state.
@ -55,6 +58,7 @@ pub fn connect_isolated(
Ok::<Response, Box<dyn std::error::Error + Send + Sync + 'static>>(Response::Nil) Ok::<Response, Box<dyn std::error::Error + Send + Sync + 'static>>(Response::Nil)
})) }))
.with_user_agent(user_agent) .with_user_agent(user_agent)
.with_chain_tip_receiver(NoChainTip)
.finish() .finish()
.expect("provided mandatory builder parameters"); .expect("provided mandatory builder parameters");

View File

@ -10,6 +10,8 @@ use tokio::net::TcpStream;
use tower::{discover::Change, Service, ServiceExt}; use tower::{discover::Change, Service, ServiceExt};
use tracing_futures::Instrument; use tracing_futures::Instrument;
use zebra_chain::chain_tip::{ChainTip, NoChainTip};
use crate::{BoxError, Request, Response}; use crate::{BoxError, Request, Response};
use super::{Client, ConnectedAddr, Handshake}; use super::{Client, ConnectedAddr, Handshake};
@ -17,11 +19,11 @@ use super::{Client, ConnectedAddr, Handshake};
/// A wrapper around [`peer::Handshake`] that opens a TCP connection before /// A wrapper around [`peer::Handshake`] that opens a TCP connection before
/// forwarding to the inner handshake service. Writing this as its own /// forwarding to the inner handshake service. Writing this as its own
/// [`tower::Service`] lets us apply unified timeout policies, etc. /// [`tower::Service`] lets us apply unified timeout policies, etc.
pub struct Connector<S> { pub struct Connector<S, C = NoChainTip> {
handshaker: Handshake<S>, handshaker: Handshake<S, C>,
} }
impl<S: Clone> Clone for Connector<S> { impl<S: Clone, C: Clone> Clone for Connector<S, C> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Connector { Connector {
handshaker: self.handshaker.clone(), handshaker: self.handshaker.clone(),
@ -29,16 +31,17 @@ impl<S: Clone> Clone for Connector<S> {
} }
} }
impl<S> Connector<S> { impl<S, C> Connector<S, C> {
pub fn new(handshaker: Handshake<S>) -> Self { pub fn new(handshaker: Handshake<S, C>) -> Self {
Connector { handshaker } Connector { handshaker }
} }
} }
impl<S> Service<SocketAddr> for Connector<S> impl<S, C> Service<SocketAddr> for Connector<S, C>
where where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static, S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send, S::Future: Send,
C: ChainTip + Clone + Send + 'static,
{ {
type Response = Change<SocketAddr, Client>; type Response = Change<SocketAddr, Client>;
type Error = BoxError; type Error = BoxError;

View File

@ -13,18 +13,17 @@ use futures::{
channel::{mpsc, oneshot}, channel::{mpsc, oneshot},
future, FutureExt, SinkExt, StreamExt, future, FutureExt, SinkExt, StreamExt,
}; };
use tokio::{ use tokio::{net::TcpStream, sync::broadcast, task::JoinError, time::timeout};
net::TcpStream,
sync::{broadcast, watch},
task::JoinError,
time::timeout,
};
use tokio_util::codec::Framed; use tokio_util::codec::Framed;
use tower::Service; use tower::Service;
use tracing::{span, Level, Span}; use tracing::{span, Level, Span};
use tracing_futures::Instrument; use tracing_futures::Instrument;
use zebra_chain::{block, parameters::Network}; use zebra_chain::{
block,
chain_tip::{ChainTip, NoChainTip},
parameters::Network,
};
use crate::{ use crate::{
constants, constants,
@ -48,7 +47,7 @@ use super::{Client, ClientRequest, Connection, ErrorSlot, HandshakeError, PeerEr
/// - launched in a separate task, and /// - launched in a separate task, and
/// - wrapped in a timeout. /// - wrapped in a timeout.
#[derive(Clone)] #[derive(Clone)]
pub struct Handshake<S> { pub struct Handshake<S, C = NoChainTip> {
config: Config, config: Config,
inbound_service: S, inbound_service: S,
timestamp_collector: mpsc::Sender<MetaAddrChange>, timestamp_collector: mpsc::Sender<MetaAddrChange>,
@ -58,7 +57,7 @@ pub struct Handshake<S> {
our_services: PeerServices, our_services: PeerServices,
relay: bool, relay: bool,
parent_span: Span, parent_span: Span,
best_tip_height: Option<watch::Receiver<Option<block::Height>>>, chain_tip_receiver: C,
} }
/// The peer address that we are handshaking with. /// The peer address that we are handshaking with.
@ -300,7 +299,7 @@ impl fmt::Debug for ConnectedAddr {
} }
/// A builder for `Handshake`. /// A builder for `Handshake`.
pub struct Builder<S> { pub struct Builder<S, C = NoChainTip> {
config: Option<Config>, config: Option<Config>,
inbound_service: Option<S>, inbound_service: Option<S>,
timestamp_collector: Option<mpsc::Sender<MetaAddrChange>>, timestamp_collector: Option<mpsc::Sender<MetaAddrChange>>,
@ -308,13 +307,14 @@ pub struct Builder<S> {
user_agent: Option<String>, user_agent: Option<String>,
relay: Option<bool>, relay: Option<bool>,
inv_collector: Option<broadcast::Sender<(InventoryHash, SocketAddr)>>, inv_collector: Option<broadcast::Sender<(InventoryHash, SocketAddr)>>,
best_tip_height: Option<watch::Receiver<Option<block::Height>>>, chain_tip_receiver: C,
} }
impl<S> Builder<S> impl<S, C> Builder<S, C>
where where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static, S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send, S::Future: Send,
C: ChainTip,
{ {
/// Provide a config. Mandatory. /// Provide a config. Mandatory.
pub fn with_config(mut self, config: Config) -> Self { pub fn with_config(mut self, config: Config) -> Self {
@ -372,12 +372,20 @@ where
/// ///
/// If this is unset, the minimum accepted protocol version for peer connections is kept /// If this is unset, the minimum accepted protocol version for peer connections is kept
/// constant over network upgrade activations. /// constant over network upgrade activations.
pub fn with_best_tip_height( ///
mut self, /// Use [`NoChainTip`] to explicitly provide no chain tip.
best_tip_height: Option<watch::Receiver<Option<block::Height>>>, pub fn with_chain_tip_receiver<NewC>(self, chain_tip_receiver: NewC) -> Builder<S, NewC> {
) -> Self { Builder {
self.best_tip_height = best_tip_height; chain_tip_receiver,
self // TODO: Until Rust RFC 2528 reaches stable, we can't do `..self`
config: self.config,
inbound_service: self.inbound_service,
timestamp_collector: self.timestamp_collector,
our_services: self.our_services,
user_agent: self.user_agent,
relay: self.relay,
inv_collector: self.inv_collector,
}
} }
/// Whether to request that peers relay transactions to our node. Optional. /// Whether to request that peers relay transactions to our node. Optional.
@ -391,7 +399,7 @@ where
/// Consume this builder and produce a [`Handshake`]. /// Consume this builder and produce a [`Handshake`].
/// ///
/// Returns an error only if any mandatory field was unset. /// Returns an error only if any mandatory field was unset.
pub fn finish(self) -> Result<Handshake<S>, &'static str> { pub fn finish(self) -> Result<Handshake<S, C>, &'static str> {
let config = self.config.ok_or("did not specify config")?; let config = self.config.ok_or("did not specify config")?;
let inbound_service = self let inbound_service = self
.inbound_service .inbound_service
@ -421,18 +429,18 @@ where
our_services, our_services,
relay, relay,
parent_span: Span::current(), parent_span: Span::current(),
best_tip_height: self.best_tip_height, chain_tip_receiver: self.chain_tip_receiver,
}) })
} }
} }
impl<S> Handshake<S> impl<S> Handshake<S, NoChainTip>
where where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static, S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send, S::Future: Send,
{ {
/// Create a builder that configures a [`Handshake`] service. /// Create a builder that configures a [`Handshake`] service.
pub fn builder() -> Builder<S> { pub fn builder() -> Builder<S, NoChainTip> {
// We don't derive `Default` because the derive inserts a `where S: // We don't derive `Default` because the derive inserts a `where S:
// Default` bound even though `Option<S>` implements `Default` even if // Default` bound even though `Option<S>` implements `Default` even if
// `S` does not. // `S` does not.
@ -444,7 +452,7 @@ where
our_services: None, our_services: None,
relay: None, relay: None,
inv_collector: None, inv_collector: None,
best_tip_height: None, chain_tip_receiver: NoChainTip,
} }
} }
} }
@ -463,7 +471,7 @@ pub async fn negotiate_version(
user_agent: String, user_agent: String,
our_services: PeerServices, our_services: PeerServices,
relay: bool, relay: bool,
best_tip_height: Option<watch::Receiver<Option<block::Height>>>, chain_tip_receiver: impl ChainTip,
) -> Result<(Version, PeerServices, SocketAddr), HandshakeError> { ) -> Result<(Version, PeerServices, SocketAddr), HandshakeError> {
// Create a random nonce for this connection // Create a random nonce for this connection
let local_nonce = Nonce::default(); let local_nonce = Nonce::default();
@ -577,7 +585,7 @@ pub async fn negotiate_version(
// SECURITY: Reject connections to peers on old versions, because they might not know about all // SECURITY: Reject connections to peers on old versions, because they might not know about all
// network upgrades and could lead to chain forks or slower block propagation. // network upgrades and could lead to chain forks or slower block propagation.
let height = best_tip_height.and_then(|height| *height.borrow()); let height = chain_tip_receiver.best_tip_height();
let min_version = Version::min_remote_for_height(config.network, height); let min_version = Version::min_remote_for_height(config.network, height);
if remote_version < min_version { if remote_version < min_version {
// Disconnect if peer is using an obsolete version. // Disconnect if peer is using an obsolete version.
@ -601,10 +609,11 @@ pub async fn negotiate_version(
pub type HandshakeRequest = (TcpStream, ConnectedAddr); pub type HandshakeRequest = (TcpStream, ConnectedAddr);
impl<S> Service<HandshakeRequest> for Handshake<S> impl<S, C> Service<HandshakeRequest> for Handshake<S, C>
where where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static, S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send, S::Future: Send,
C: ChainTip + Clone + Send + 'static,
{ {
type Response = Client; type Response = Client;
type Error = BoxError; type Error = BoxError;
@ -634,7 +643,7 @@ where
let user_agent = self.user_agent.clone(); let user_agent = self.user_agent.clone();
let our_services = self.our_services; let our_services = self.our_services;
let relay = self.relay; let relay = self.relay;
let best_tip_height = self.best_tip_height.clone(); let chain_tip_receiver = self.chain_tip_receiver.clone();
let fut = async move { let fut = async move {
debug!( debug!(
@ -665,7 +674,7 @@ where
user_agent, user_agent,
our_services, our_services,
relay, relay,
best_tip_height, chain_tip_receiver,
), ),
) )
.await??; .await??;

View File

@ -12,11 +12,7 @@ use futures::{
stream::{FuturesUnordered, StreamExt}, stream::{FuturesUnordered, StreamExt},
TryFutureExt, TryFutureExt,
}; };
use tokio::{ use tokio::{net::TcpListener, sync::broadcast, time::Instant};
net::TcpListener,
sync::{broadcast, watch},
time::Instant,
};
use tower::{ use tower::{
buffer::Buffer, discover::Change, layer::Layer, load::peak_ewma::PeakEwmaDiscover, buffer::Buffer, discover::Change, layer::Layer, load::peak_ewma::PeakEwmaDiscover,
util::BoxService, Service, ServiceExt, util::BoxService, Service, ServiceExt,
@ -29,10 +25,10 @@ use crate::{
BoxError, Config, Request, Response, BoxError, Config, Request, Response,
}; };
use zebra_chain::{block, parameters::Network}; use zebra_chain::{chain_tip::ChainTip, parameters::Network};
use super::{CandidateSet, PeerSet};
use super::CandidateSet;
use super::PeerSet;
use peer::Client; use peer::Client;
#[cfg(test)] #[cfg(test)]
@ -40,7 +36,8 @@ mod tests;
type PeerChange = Result<Change<SocketAddr, peer::Client>, BoxError>; type PeerChange = Result<Change<SocketAddr, peer::Client>, BoxError>;
/// Initialize a peer set. /// Initialize a peer set, using a network `config`, `inbound_service`,
/// and `chain_tip_receiver`.
/// ///
/// The peer set abstracts away peer management to provide a /// The peer set abstracts away peer management to provide a
/// [`tower::Service`] representing "the network" that load-balances requests /// [`tower::Service`] representing "the network" that load-balances requests
@ -57,13 +54,15 @@ type PeerChange = Result<Change<SocketAddr, peer::Client>, BoxError>;
/// cause the peer set to shrink when the inbound service is unable to keep up /// cause the peer set to shrink when the inbound service is unable to keep up
/// with the volume of inbound requests. /// with the volume of inbound requests.
/// ///
/// Use [`NoChainTip`] to explicitly provide no chain tip receiver.
///
/// In addition to returning a service for outbound requests, this method /// In addition to returning a service for outbound requests, this method
/// returns a shared [`AddressBook`] updated with last-seen timestamps for /// returns a shared [`AddressBook`] updated with last-seen timestamps for
/// connected peers. /// connected peers.
pub async fn init<S>( pub async fn init<S, C>(
config: Config, config: Config,
inbound_service: S, inbound_service: S,
best_tip_height: Option<watch::Receiver<Option<block::Height>>>, chain_tip_receiver: C,
) -> ( ) -> (
Buffer<BoxService<Request, Response, BoxError>, Request>, Buffer<BoxService<Request, Response, BoxError>, Request>,
Arc<std::sync::Mutex<AddressBook>>, Arc<std::sync::Mutex<AddressBook>>,
@ -71,6 +70,7 @@ pub async fn init<S>(
where where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static, S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send + 'static, S::Future: Send + 'static,
C: ChainTip + Clone + Send + 'static,
{ {
let (tcp_listener, listen_addr) = open_listener(&config.clone()).await; let (tcp_listener, listen_addr) = open_listener(&config.clone()).await;
@ -92,7 +92,7 @@ where
.with_timestamp_collector(timestamp_collector) .with_timestamp_collector(timestamp_collector)
.with_advertised_services(PeerServices::NODE_NETWORK) .with_advertised_services(PeerServices::NODE_NETWORK)
.with_user_agent(crate::constants::USER_AGENT.to_string()) .with_user_agent(crate::constants::USER_AGENT.to_string())
.with_best_tip_height(best_tip_height) .with_chain_tip_receiver(chain_tip_receiver)
.want_transactions(true) .want_transactions(true)
.finish() .finish()
.expect("configured all required parameters"); .expect("configured all required parameters");

View File

@ -17,7 +17,7 @@ use std::{collections::HashSet, net::SocketAddr};
use tower::service_fn; use tower::service_fn;
use zebra_chain::parameters::Network; use zebra_chain::{chain_tip::NoChainTip, parameters::Network};
use zebra_test::net::random_known_port; use zebra_test::net::random_known_port;
use crate::Config; use crate::Config;
@ -110,7 +110,7 @@ async fn local_listener_port_with(listen_addr: SocketAddr, network: Network) {
let inbound_service = let inbound_service =
service_fn(|_| async { unreachable!("inbound service should never be called") }); service_fn(|_| async { unreachable!("inbound service should never be called") });
let (_peer_service, address_book) = init(config, inbound_service, None).await; let (_peer_service, address_book) = init(config, inbound_service, NoChainTip).await;
let local_listener = address_book.lock().unwrap().local_listener_meta_addr(); let local_listener = address_book.lock().unwrap().local_listener_meta_addr();
if listen_addr.port() == 0 { if listen_addr.port() == 0 {

View File

@ -35,7 +35,8 @@ pub use constants::MAX_BLOCK_REORG_HEIGHT;
pub use error::{BoxError, CloneError, CommitBlockError, ValidateContextError}; pub use error::{BoxError, CloneError, CommitBlockError, ValidateContextError};
pub use request::{FinalizedBlock, HashOrHeight, PreparedBlock, Request}; pub use request::{FinalizedBlock, HashOrHeight, PreparedBlock, Request};
pub use response::Response; pub use response::Response;
pub use service::init; pub use service::{chain_tip::ChainTipReceiver, init};
#[cfg(any(test, feature = "proptest-impl"))] #[cfg(any(test, feature = "proptest-impl"))]
pub use service::init_test; pub use service::init_test;

View File

@ -7,12 +7,13 @@ use std::{
}; };
use futures::future::FutureExt; use futures::future::FutureExt;
use non_finalized_state::{NonFinalizedState, QueuedBlocks}; use tokio::sync::oneshot;
use tokio::sync::{oneshot, watch};
#[cfg(any(test, feature = "proptest-impl"))]
use tower::buffer::Buffer;
use tower::{util::BoxService, Service}; use tower::{util::BoxService, Service};
use tracing::instrument; use tracing::instrument;
#[cfg(any(test, feature = "proptest-impl"))]
use tower::buffer::Buffer;
use zebra_chain::{ use zebra_chain::{
block::{self, Block}, block::{self, Block},
parameters::{Network, NetworkUpgrade}, parameters::{Network, NetworkUpgrade},
@ -21,13 +22,17 @@ use zebra_chain::{
transparent, transparent,
}; };
use self::best_tip_height::BestTipHeight;
use crate::{ use crate::{
constants, request::HashOrHeight, BoxError, CloneError, CommitBlockError, Config, constants, request::HashOrHeight, BoxError, CloneError, CommitBlockError, Config,
FinalizedBlock, PreparedBlock, Request, Response, ValidateContextError, FinalizedBlock, PreparedBlock, Request, Response, ValidateContextError,
}; };
mod best_tip_height; use self::{
chain_tip::{ChainTipReceiver, ChainTipSender},
non_finalized_state::{NonFinalizedState, QueuedBlocks},
};
pub mod chain_tip;
pub(crate) mod check; pub(crate) mod check;
mod finalized_state; mod finalized_state;
mod non_finalized_state; mod non_finalized_state;
@ -64,18 +69,18 @@ pub(crate) struct StateService {
/// Instant tracking the last time `pending_utxos` was pruned /// Instant tracking the last time `pending_utxos` was pruned
last_prune: Instant, last_prune: Instant,
/// The current best chain tip height. /// The current best chain tip height.
best_tip_height: BestTipHeight, chain_tip_sender: ChainTipSender,
} }
impl StateService { impl StateService {
const PRUNE_INTERVAL: Duration = Duration::from_secs(30); const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
pub fn new(config: Config, network: Network) -> (Self, watch::Receiver<Option<block::Height>>) { pub fn new(config: Config, network: Network) -> (Self, ChainTipReceiver) {
let (mut best_tip_height, best_tip_height_receiver) = BestTipHeight::new(); let (mut chain_tip_sender, chain_tip_receiver) = ChainTipSender::new();
let disk = FinalizedState::new(&config, network); let disk = FinalizedState::new(&config, network);
if let Some(finalized_height) = disk.finalized_tip_height() { if let Some(finalized_height) = disk.finalized_tip_height() {
best_tip_height.set_finalized_height(finalized_height); chain_tip_sender.set_finalized_height(finalized_height);
} }
let mem = NonFinalizedState::new(network); let mem = NonFinalizedState::new(network);
@ -89,7 +94,7 @@ impl StateService {
pending_utxos, pending_utxos,
network, network,
last_prune: Instant::now(), last_prune: Instant::now(),
best_tip_height, chain_tip_sender,
}; };
tracing::info!("starting legacy chain check"); tracing::info!("starting legacy chain check");
@ -116,7 +121,7 @@ impl StateService {
} }
tracing::info!("no legacy chain found"); tracing::info!("no legacy chain found");
(state, best_tip_height_receiver) (state, chain_tip_receiver)
} }
/// Queue a finalized block for verification and storage in the finalized state. /// Queue a finalized block for verification and storage in the finalized state.
@ -129,7 +134,7 @@ impl StateService {
self.disk.queue_and_commit_finalized((finalized, rsp_tx)); self.disk.queue_and_commit_finalized((finalized, rsp_tx));
if let Some(finalized_height) = self.disk.finalized_tip_height() { if let Some(finalized_height) = self.disk.finalized_tip_height() {
self.best_tip_height.set_finalized_height(finalized_height); self.chain_tip_sender.set_finalized_height(finalized_height);
} }
rsp_rx rsp_rx
@ -196,9 +201,9 @@ impl StateService {
self.queued_blocks.prune_by_height(finalized_tip_height); self.queued_blocks.prune_by_height(finalized_tip_height);
self.best_tip_height self.chain_tip_sender
.set_finalized_height(finalized_tip_height); .set_finalized_height(finalized_tip_height);
self.best_tip_height self.chain_tip_sender
.set_best_non_finalized_height(non_finalized_tip_height); .set_best_non_finalized_height(non_finalized_tip_height);
tracing::trace!("finished processing queued block"); tracing::trace!("finished processing queued block");
@ -766,6 +771,7 @@ impl Service<Request> for StateService {
} }
/// Initialize a state service from the provided [`Config`]. /// Initialize a state service from the provided [`Config`].
/// Returns a boxed state service, and a receiver for state chain tip updates.
/// ///
/// Each `network` has its own separate on-disk database. /// Each `network` has its own separate on-disk database.
/// ///
@ -776,13 +782,10 @@ impl Service<Request> for StateService {
pub fn init( pub fn init(
config: Config, config: Config,
network: Network, network: Network,
) -> ( ) -> (BoxService<Request, Response, BoxError>, ChainTipReceiver) {
BoxService<Request, Response, BoxError>, let (state_service, chain_tip_receiver) = StateService::new(config, network);
watch::Receiver<Option<block::Height>>,
) {
let (state_service, best_tip_height) = StateService::new(config, network);
(BoxService::new(state_service), best_tip_height) (BoxService::new(state_service), chain_tip_receiver)
} }
/// Initialize a state service with an ephemeral [`Config`] and a buffer with a single slot. /// Initialize a state service with an ephemeral [`Config`] and a buffer with a single slot.

View File

@ -1,8 +0,0 @@
use super::super::BestTipHeight;
#[test]
fn best_tip_value_is_initially_empty() {
let (_best_tip_height, receiver) = BestTipHeight::new();
assert_eq!(*receiver.borrow(), None);
}

View File

@ -1,16 +1,13 @@
use tokio::sync::watch; use tokio::sync::watch;
use zebra_chain::block; use zebra_chain::{block, chain_tip::ChainTip};
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
/// A helper type to determine the best chain tip block height. /// A sender for recent changes to the non-finalized and finalized chain tips.
///
/// The block height is determined based on the current finalized block height and the current best
/// non-finalized chain's tip block height. The height is made available from a [`watch::Receiver`].
#[derive(Debug)] #[derive(Debug)]
pub struct BestTipHeight { pub struct ChainTipSender {
finalized: Option<block::Height>, finalized: Option<block::Height>,
non_finalized: Option<block::Height>, non_finalized: Option<block::Height>,
sender: watch::Sender<Option<block::Height>>, sender: watch::Sender<Option<block::Height>>,
@ -18,20 +15,19 @@ pub struct BestTipHeight {
active_value: Option<block::Height>, active_value: Option<block::Height>,
} }
impl BestTipHeight { impl ChainTipSender {
/// Create a new instance of [`BestTipHeight`] and the [`watch::Receiver`] endpoint for the /// Create new linked instances of [`ChainTipSender`] and [`ChainTipReceiver`].
/// current best tip block height. pub fn new() -> (Self, ChainTipReceiver) {
pub fn new() -> (Self, watch::Receiver<Option<block::Height>>) {
let (sender, receiver) = watch::channel(None); let (sender, receiver) = watch::channel(None);
( (
BestTipHeight { ChainTipSender {
finalized: None, finalized: None,
non_finalized: None, non_finalized: None,
sender, sender,
active_value: None, active_value: None,
}, },
receiver, ChainTipReceiver::new(receiver),
) )
} }
@ -68,3 +64,29 @@ impl BestTipHeight {
} }
} }
} }
/// A receiver for recent changes to the non-finalized and finalized chain tips.
///
/// The latest changes are available from all cloned instances of this type.
#[derive(Clone, Debug)]
pub struct ChainTipReceiver {
receiver: watch::Receiver<Option<block::Height>>,
}
impl ChainTipReceiver {
/// Create a new chain tip receiver from a watch channel receiver.
fn new(receiver: watch::Receiver<Option<block::Height>>) -> Self {
Self { receiver }
}
}
impl ChainTip for ChainTipReceiver {
/// Return the height of the best chain tip.
///
/// The returned block height comes from:
/// * the best non-finalized chain tip, if available, or
/// * the finalized tip.
fn best_tip_height(&self) -> Option<block::Height> {
*self.receiver.borrow()
}
}

View File

@ -1,16 +1,16 @@
use proptest::prelude::*; use proptest::prelude::*;
use proptest_derive::Arbitrary; use proptest_derive::Arbitrary;
use zebra_chain::block; use zebra_chain::{block, chain_tip::ChainTip};
use super::super::BestTipHeight; use super::super::ChainTipSender;
proptest! { proptest! {
#[test] #[test]
fn best_tip_value_is_heighest_of_latest_finalized_and_non_finalized_heights( fn best_tip_is_highest_of_latest_finalized_and_non_finalized_heights(
height_updates in any::<Vec<HeightUpdate>>(), height_updates in any::<Vec<HeightUpdate>>(),
) { ) {
let (mut best_tip_height, receiver) = BestTipHeight::new(); let (mut chain_tip_sender, chain_tip_receiver) = ChainTipSender::new();
let mut latest_finalized_height = None; let mut latest_finalized_height = None;
let mut latest_non_finalized_height = None; let mut latest_non_finalized_height = None;
@ -18,11 +18,11 @@ proptest! {
for update in height_updates { for update in height_updates {
match update { match update {
HeightUpdate::Finalized(height) => { HeightUpdate::Finalized(height) => {
best_tip_height.set_finalized_height(height); chain_tip_sender.set_finalized_height(height);
latest_finalized_height = Some(height); latest_finalized_height = Some(height);
} }
HeightUpdate::NonFinalized(height) => { HeightUpdate::NonFinalized(height) => {
best_tip_height.set_best_non_finalized_height(height); chain_tip_sender.set_best_non_finalized_height(height);
latest_non_finalized_height = height; latest_non_finalized_height = height;
} }
} }
@ -36,7 +36,7 @@ proptest! {
(None, non_finalized_height) => non_finalized_height, (None, non_finalized_height) => non_finalized_height,
}; };
prop_assert_eq!(*receiver.borrow(), expected_height); prop_assert_eq!(chain_tip_receiver.best_tip_height(), expected_height);
} }
} }

View File

@ -0,0 +1,10 @@
use zebra_chain::chain_tip::ChainTip;
use super::super::ChainTipSender;
#[test]
fn best_tip_height_is_initially_empty() {
let (_chain_tip_sender, chain_tip_receiver) = ChainTipSender::new();
assert_eq!(chain_tip_receiver.best_tip_height(), None);
}

View File

@ -5,6 +5,7 @@ use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};
use zebra_chain::{ use zebra_chain::{
block::{self, Block}, block::{self, Block},
chain_tip::ChainTip,
fmt::SummaryDebug, fmt::SummaryDebug,
parameters::{Network, NetworkUpgrade}, parameters::{Network, NetworkUpgrade},
serialization::{ZcashDeserialize, ZcashDeserializeInto}, serialization::{ZcashDeserialize, ZcashDeserializeInto},
@ -290,22 +291,22 @@ proptest! {
/// 4. Commit the non-finalized blocks and check that the best tip height is also updated /// 4. Commit the non-finalized blocks and check that the best tip height is also updated
/// accordingly. /// accordingly.
#[test] #[test]
fn best_tip_height_is_updated( fn chain_tip_sender_is_updated(
(network, finalized_blocks, non_finalized_blocks) (network, finalized_blocks, non_finalized_blocks)
in continuous_empty_blocks_from_test_vectors(), in continuous_empty_blocks_from_test_vectors(),
) { ) {
zebra_test::init(); zebra_test::init();
let (mut state_service, best_tip_height) = StateService::new(Config::ephemeral(), network); let (mut state_service, chain_tip_receiver) = StateService::new(Config::ephemeral(), network);
prop_assert_eq!(*best_tip_height.borrow(), None); prop_assert_eq!(chain_tip_receiver.best_tip_height(), None);
for block in finalized_blocks { for block in finalized_blocks {
let expected_height = block.height; let expected_height = block.height;
state_service.queue_and_commit_finalized(block); state_service.queue_and_commit_finalized(block);
prop_assert_eq!(*best_tip_height.borrow(), Some(expected_height)); prop_assert_eq!(chain_tip_receiver.best_tip_height(), Some(expected_height));
} }
for block in non_finalized_blocks { for block in non_finalized_blocks {
@ -313,7 +314,7 @@ proptest! {
state_service.queue_and_commit_non_finalized(block); state_service.queue_and_commit_non_finalized(block);
prop_assert_eq!(*best_tip_height.borrow(), Some(expected_height)); prop_assert_eq!(chain_tip_receiver.best_tip_height(), Some(expected_height));
} }
} }

View File

@ -50,7 +50,7 @@ impl StartCmd {
info!(?config); info!(?config);
info!("initializing node state"); info!("initializing node state");
let (state_service, best_tip_height) = let (state_service, chain_tip_receiver) =
zebra_state::init(config.state.clone(), config.network.network); zebra_state::init(config.state.clone(), config.network.network);
let state = ServiceBuilder::new().buffer(20).service(state_service); let state = ServiceBuilder::new().buffer(20).service(state_service);
@ -78,7 +78,7 @@ impl StartCmd {
)); ));
let (peer_set, address_book) = let (peer_set, address_book) =
zebra_network::init(config.network.clone(), inbound, Some(best_tip_height)).await; zebra_network::init(config.network.clone(), inbound, chain_tip_receiver).await;
setup_tx setup_tx
.send((peer_set.clone(), address_book)) .send((peer_set.clone(), address_book))
.map_err(|_| eyre!("could not send setup data to inbound service"))?; .map_err(|_| eyre!("could not send setup data to inbound service"))?;