From 55f46967b27bfb22c4596fee7100f803ac69f229 Mon Sep 17 00:00:00 2001 From: Henry de Valence Date: Fri, 18 Sep 2020 13:47:31 -0700 Subject: [PATCH] zebrad: serve blocks from Inbound service The original version of this commit ran into https://github.com/rust-lang/rust/issues/64552 again. Thanks to @yaahc for suggesting a workaround (using futures combinators to avoid writing an async block). --- zebrad/src/commands/start.rs | 2 +- zebrad/src/components/inbound.rs | 56 +++++++++++++++++++++++++++++--- 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index d64432989..24b5e95ac 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -64,7 +64,7 @@ impl StartCmd { let inbound = ServiceBuilder::new() .load_shed() .buffer(20) - .service(Inbound::new(setup_rx)); + .service(Inbound::new(setup_rx, state.clone())); let (peer_set, address_book) = zebra_network::init(config.network.clone(), inbound).await; setup_tx diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index 04b366536..40503a503 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -5,14 +5,19 @@ use std::{ task::{Context, Poll}, }; -use futures::future::FutureExt; +use futures::{ + future::{FutureExt, TryFutureExt}, + stream::TryStreamExt, +}; use tokio::sync::oneshot; -use tower::{buffer::Buffer, util::BoxService, Service}; +use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt}; use zebra_network as zn; use zebra_network::AddressBook; +use zebra_state as zs; type Outbound = Buffer, zn::Request>; +type State = Buffer, zs::Request>; pub type SetupData = (Outbound, Arc>); @@ -25,14 +30,16 @@ pub struct Inbound { network_setup: Option>, outbound: Option, address_book: Option>>, + state: State, } impl Inbound { - pub fn new(network_setup: oneshot::Receiver) -> Self { + pub fn new(network_setup: oneshot::Receiver, state: State) -> Self { Self { network_setup: Some(network_setup), outbound: None, address_book: None, + state, } } } @@ -89,10 +96,51 @@ impl Service for Inbound { peers.truncate(MAX_ADDR); async { Ok(zn::Response::Peers(peers)) }.boxed() } - _ => { + zn::Request::BlocksByHash(hashes) => { + let state = self.state.clone(); + let requests = futures::stream::iter( + hashes + .into_iter() + .map(|hash| zs::Request::Block(hash.into())), + ); + + state + .call_all(requests) + .try_filter_map(|rsp| { + futures::future::ready(match rsp { + zs::Response::Block(Some(block)) => Ok(Some(block)), + // XXX: check how zcashd handles missing blocks? + zs::Response::Block(None) => Err("missing block".into()), + _ => unreachable!("wrong response from state"), + }) + }) + .try_collect::>() + .map_ok(zn::Response::Blocks) + .boxed() + } + zn::Request::TransactionsByHash(_transactions) => { debug!("ignoring unimplemented request"); async { Ok(zn::Response::Nil) }.boxed() } + zn::Request::FindBlocks { .. } => { + debug!("ignoring unimplemented request"); + async { Ok(zn::Response::Nil) }.boxed() + } + zn::Request::PushTransaction(_transaction) => { + debug!("ignoring unimplemented request"); + async { Ok(zn::Response::Nil) }.boxed() + } + zn::Request::AdvertiseTransactions(_transactions) => { + debug!("ignoring unimplemented request"); + async { Ok(zn::Response::Nil) }.boxed() + } + zn::Request::AdvertiseBlock(_block) => { + debug!("ignoring unimplemented request"); + async { Ok(zn::Response::Nil) }.boxed() + } + zn::Request::Ping(_) => { + unreachable!("ping requests are handled internally"); + } } } }