diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index 786f10b17..d4233df83 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -123,7 +123,8 @@ impl Service for Inbound { // Clean up completed download tasks if let Some(downloads) = self.downloads.as_mut() { - while let Poll::Ready(Some(_)) = Pin::new(downloads).poll_next(cx) {} + let downloads = Pin::new(downloads); + while let Poll::Ready(Some(_)) = downloads.poll_next(cx) {} } // Now report readiness based on readiness of the inner services, if they're available. @@ -200,24 +201,10 @@ impl Service for Inbound { async { Ok(zn::Response::Nil) }.boxed() } zn::Request::AdvertiseBlock(hash) => { - // this sucks - let mut downloads = self.downloads.take().unwrap(); - self.downloads = Some(Downloads::new( - self.outbound.as_ref().unwrap().clone(), - self.verifier.clone(), - self.state.clone(), - )); - - async move { - if downloads.download_and_verify(hash).await? { - tracing::info!(?hash, "queued download and verification of gossiped block"); - } else { - tracing::debug!(?hash, "gossiped block already queued or verified"); - } - - Ok(zn::Response::Nil) + if let Some(downloads) = self.downloads.as_mut() { + downloads.download_and_verify(hash); } - .boxed() + async { Ok(zn::Response::Nil) }.boxed() } zn::Request::MempoolTransactions => { debug!("ignoring unimplemented request"); diff --git a/zebrad/src/components/inbound/downloads.rs b/zebrad/src/components/inbound/downloads.rs index be312865f..136c2959e 100644 --- a/zebrad/src/components/inbound/downloads.rs +++ b/zebrad/src/components/inbound/downloads.rs @@ -108,21 +108,23 @@ where #[instrument(skip(self))] pub fn download_and_verify(&mut self, hash: block::Hash) -> bool { if self.cancel_handles.contains_key(&hash) { + tracing::debug!("hash already queued for download"); return false; } // This oneshot is used to signal cancellation to the download task. let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>(); - let mut state = self.state.clone(); - let mut network = self.network.clone(); - let mut verifier = self.verifier.clone(); + let state = self.state.clone(); + let network = self.network.clone(); + let verifier = self.verifier.clone(); let fut = async move { // Check if the block is already in the state. match state.oneshot(zs::Request::Depth(hash.into())).await { Ok(zs::Response::Depth(None)) => Ok(()), Ok(zs::Response::Depth(Some(_))) => Err("already present".into()), + Ok(_) => unreachable!("wrong response"), Err(e) => Err(e), }?; @@ -142,8 +144,8 @@ where verifier.oneshot(block).await } .map_ok(|hash| { - tracing::info!(?hash, "verified advertised block"); metrics::counter!("gossip.verified.block.count", 1); + hash }) // Tack the hash onto the error so we can remove the cancel handle // on failure as well as on success. @@ -155,6 +157,7 @@ where _ = &mut cancel_rx => { tracing::trace!("task cancelled prior to completion"); metrics::counter!("gossip.cancelled.count", 1); + Err(("canceled".into(), hash)) } verification = fut => verification, } @@ -167,6 +170,7 @@ where "blocks are only queued once" ); + tracing::debug!("queued hash for download"); true } }