From 486e55104a7880c64683fffa93801666b4c7dbd6 Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Thu, 19 Nov 2020 14:55:06 -0300 Subject: [PATCH] create Downloads for Inbound --- zebrad/src/commands/start.rs | 2 +- zebrad/src/components/inbound.rs | 50 ++++- zebrad/src/components/inbound/downloads.rs | 206 +++++++++++++++++++++ 3 files changed, 253 insertions(+), 5 deletions(-) create mode 100644 zebrad/src/components/inbound/downloads.rs diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index f829f3552..ee8910737 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -67,7 +67,7 @@ impl StartCmd { let inbound = ServiceBuilder::new() .load_shed() .buffer(20) - .service(Inbound::new(setup_rx, state.clone())); + .service(Inbound::new(setup_rx, state.clone(), verifier.clone())); let (peer_set, address_book) = zebra_network::init(config.network.clone(), inbound).await; setup_tx diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index ef94a301b..67fb41d87 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -12,12 +12,21 @@ use futures::{ use tokio::sync::oneshot; use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt}; +use zebra_chain as zc; +use zebra_consensus as zcon; use zebra_network as zn; use zebra_network::AddressBook; use zebra_state as zs; +mod downloads; +use downloads::Downloads; + type Outbound = Buffer, zn::Request>; type State = Buffer, zs::Request>; +type Verifier = Buffer< + BoxService, zc::block::Hash, zcon::chain::VerifyChainError>, + Arc, +>; pub type SetupData = (Outbound, Arc>); @@ -52,15 +61,23 @@ pub struct Inbound { outbound: Option, address_book: Option>>, state: State, + verifier: Verifier, + downloads: Option>, } impl Inbound { - pub fn new(network_setup: oneshot::Receiver, state: State) -> Self { + pub fn new( + network_setup: oneshot::Receiver, + state: State, + verifier: Verifier, + ) -> Self { Self { network_setup: Some(network_setup), outbound: None, address_book: None, state, + verifier, + downloads: None, } } } @@ -85,6 +102,11 @@ impl Service for Inbound { self.outbound = Some(outbound); self.address_book = Some(address_book); self.network_setup = None; + self.downloads = Some(Downloads::new( + self.outbound.clone().unwrap(), + self.verifier.clone(), + self.state.clone(), + )); } Err(TryRecvError::Empty) => { self.network_setup = Some(rx); @@ -98,6 +120,10 @@ impl Service for Inbound { } }; } + + // Clean up completed download tasks + while let Poll::Ready(Some(_)) = self.downloads.unwrap().poll_next(cx) {} + // Now report readiness based on readiness of the inner services, if they're available. // XXX do we want to propagate backpressure from the network here? match ( @@ -171,9 +197,25 @@ impl Service for Inbound { debug!("ignoring unimplemented request"); async { Ok(zn::Response::Nil) }.boxed() } - zn::Request::AdvertiseBlock(_block) => { - debug!("ignoring unimplemented request"); - async { Ok(zn::Response::Nil) }.boxed() + zn::Request::AdvertiseBlock(hash) => { + // this sucks + let mut downloads = self.downloads.take().unwrap(); + self.downloads = Some(Downloads::new( + self.outbound.as_ref().unwrap().clone(), + self.verifier.clone(), + self.state.clone(), + )); + + async move { + if downloads.download_and_verify(hash).await? { + tracing::info!(?hash, "queued download and verification of gossiped block"); + } else { + tracing::debug!(?hash, "gossiped block already queued or verified"); + } + + Ok(zn::Response::Nil) + } + .boxed() } zn::Request::MempoolTransactions => { debug!("ignoring unimplemented request"); diff --git a/zebrad/src/components/inbound/downloads.rs b/zebrad/src/components/inbound/downloads.rs new file mode 100644 index 000000000..05ba567de --- /dev/null +++ b/zebrad/src/components/inbound/downloads.rs @@ -0,0 +1,206 @@ +use std::{ + collections::HashMap, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use color_eyre::eyre::{eyre, Report}; +use futures::{ + future::TryFutureExt, + ready, + stream::{FuturesUnordered, Stream}, +}; +use pin_project::pin_project; +use tokio::{sync::oneshot, task::JoinHandle}; +use tower::{Service, ServiceExt}; +use tracing_futures::Instrument; + +use zebra_chain::block::{self, Block}; +use zebra_network as zn; +use zebra_state as zs; + +type BoxError = Box; + +/// Manages download and verification of blocks gossiped to this peer. +#[pin_project] +#[derive(Debug)] +pub struct Downloads +where + ZN: Service + Send + 'static, + ZN::Future: Send, + ZV: Service, Response = block::Hash, Error = BoxError> + Send + Clone + 'static, + ZV::Future: Send, + ZS: Service + Send + 'static, + ZS::Future: Send, +{ + network: ZN, + verifier: ZV, + state: ZS, + #[pin] + pending: FuturesUnordered>>, + cancel_handles: HashMap>, +} + +impl Stream for Downloads +where + ZN: Service + Send + 'static, + ZN::Future: Send, + ZV: Service, Response = block::Hash, Error = BoxError> + Send + Clone + 'static, + ZV::Future: Send, + ZS: Service + Send + 'static, + ZS::Future: Send, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.project(); + // This would be cleaner with poll_map #63514, but that's nightly only. + if let Some(join_result) = ready!(this.pending.poll_next(cx)) { + match join_result.expect("block download and verify tasks must not panic") { + Ok(hash) => { + this.cancel_handles.remove(&hash); + Poll::Ready(Some(Ok(hash))) + } + Err((e, hash)) => { + this.cancel_handles.remove(&hash); + Poll::Ready(Some(Err(e))) + } + } + } else { + Poll::Ready(None) + } + } + + fn size_hint(&self) -> (usize, Option) { + self.pending.size_hint() + } +} + +impl Downloads +where + ZN: Service + Send + 'static, + ZN::Future: Send, + ZV: Service, Response = block::Hash, Error = BoxError> + Send + Clone + 'static, + ZV::Future: Send, + ZS: Service + Send + 'static, + ZS::Future: Send, +{ + /// Initialize a new download stream with the provided `network` and + /// `verifier` services. + /// + /// The [`Downloads`] stream is agnostic to the network policy, so retry and + /// timeout limits should be applied to the `network` service passed into + /// this constructor. + pub fn new(network: ZN, verifier: ZV, state: ZS) -> Self { + Self { + network, + verifier, + state, + pending: FuturesUnordered::new(), + cancel_handles: HashMap::new(), + } + } + + /// Queue a block for download and verification. + /// + /// This method waits for the network to become ready, and returns an error + /// only if the network service fails. It returns immediately after queuing + /// the request. + #[instrument(skip(self))] + pub async fn download_and_verify(&mut self, hash: block::Hash) -> Result { + if self.cancel_handles.contains_key(&hash) { + return Ok(false); + } + + // Check if the block is already in the state. + let block_state_req = self + .state + .ready_and() + .await + .map_err(|e| eyre!(e))? + .call(zs::Request::Block(zs::HashOrHeight::from(hash))) + .await; + + let block_in_chain = match block_state_req { + Ok(zs::Response::Block(block)) => block, + _ => None, + }; + + // Block already in state, get out. + if block_in_chain.is_some() { + return Ok(false); + } + + // We construct the block requests sequentially, waiting for the peer + // set to be ready to process each request. This ensures that we start + // block downloads in the order we want them (though they may resolve + // out of order), and it means that we respect backpressure. Otherwise, + // if we waited for readiness and did the service call in the spawned + // tasks, all of the spawned tasks would race each other waiting for the + // network to become ready. + tracing::debug!("waiting to request block"); + let block_req = self + .network + .ready_and() + .await + .map_err(|e| eyre!(e))? + .call(zn::Request::BlocksByHash(std::iter::once(hash).collect())); + tracing::debug!("requested block"); + + // This oneshot is used to signal cancellation to the download task. + let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>(); + + let mut verifier = self.verifier.clone(); + let task = tokio::spawn( + async move { + let rsp = tokio::select! { + _ = &mut cancel_rx => { + tracing::trace!("task cancelled prior to download completion"); + metrics::counter!("gossip.cancelled.download.count", 1); + return Err("canceled block_fetch_verify".into()) + } + rsp = block_req => rsp?, + }; + + let block = if let zn::Response::Blocks(blocks) = rsp { + blocks + .into_iter() + .next() + .expect("successful response has the block in it") + } else { + unreachable!("wrong response to block request"); + }; + metrics::counter!("gossip.downloaded.block.count", 1); + + let rsp = verifier.ready_and().await?.call(block); + let verification = tokio::select! { + _ = &mut cancel_rx => { + tracing::trace!("task cancelled prior to verification"); + metrics::counter!("gossip.cancelled.verify.count", 1); + return Err("canceled block_fetch_verify".into()) + } + verification = rsp => verification, + }; + if verification.is_ok() { + metrics::counter!("sync.verified.block.count", 1); + } + + verification + } + .in_current_span() + // Tack the hash onto the error so we can remove the cancel handle + // on failure as well as on success. + .map_err(move |e| (e, hash)), + ); + + self.pending.push(task); + // XXX replace with expect_none when stable + assert!( + self.cancel_handles.insert(hash, cancel_tx).is_none(), + "blocks are only queued once" + ); + + Ok(true) + } +}