zebrad: serve blocks from Inbound service

The original version of this commit ran into

https://github.com/rust-lang/rust/issues/64552

again.  Thanks to @yaahc for suggesting a workaround (using futures combinators
to avoid writing an async block).
This commit is contained in:
Henry de Valence 2020-09-18 13:47:31 -07:00
parent 170f588ffb
commit 55f46967b2
2 changed files with 53 additions and 5 deletions

View File

@ -64,7 +64,7 @@ impl StartCmd {
let inbound = ServiceBuilder::new()
.load_shed()
.buffer(20)
.service(Inbound::new(setup_rx));
.service(Inbound::new(setup_rx, state.clone()));
let (peer_set, address_book) = zebra_network::init(config.network.clone(), inbound).await;
setup_tx

View File

@ -5,14 +5,19 @@ use std::{
task::{Context, Poll},
};
use futures::future::FutureExt;
use futures::{
future::{FutureExt, TryFutureExt},
stream::TryStreamExt,
};
use tokio::sync::oneshot;
use tower::{buffer::Buffer, util::BoxService, Service};
use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};
use zebra_network as zn;
use zebra_network::AddressBook;
use zebra_state as zs;
type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
pub type SetupData = (Outbound, Arc<Mutex<AddressBook>>);
@ -25,14 +30,16 @@ pub struct Inbound {
network_setup: Option<oneshot::Receiver<SetupData>>,
outbound: Option<Outbound>,
address_book: Option<Arc<Mutex<zn::AddressBook>>>,
state: State,
}
impl Inbound {
pub fn new(network_setup: oneshot::Receiver<SetupData>) -> Self {
pub fn new(network_setup: oneshot::Receiver<SetupData>, state: State) -> Self {
Self {
network_setup: Some(network_setup),
outbound: None,
address_book: None,
state,
}
}
}
@ -89,10 +96,51 @@ impl Service<zn::Request> for Inbound {
peers.truncate(MAX_ADDR);
async { Ok(zn::Response::Peers(peers)) }.boxed()
}
_ => {
zn::Request::BlocksByHash(hashes) => {
let state = self.state.clone();
let requests = futures::stream::iter(
hashes
.into_iter()
.map(|hash| zs::Request::Block(hash.into())),
);
state
.call_all(requests)
.try_filter_map(|rsp| {
futures::future::ready(match rsp {
zs::Response::Block(Some(block)) => Ok(Some(block)),
// XXX: check how zcashd handles missing blocks?
zs::Response::Block(None) => Err("missing block".into()),
_ => unreachable!("wrong response from state"),
})
})
.try_collect::<Vec<_>>()
.map_ok(zn::Response::Blocks)
.boxed()
}
zn::Request::TransactionsByHash(_transactions) => {
debug!("ignoring unimplemented request");
async { Ok(zn::Response::Nil) }.boxed()
}
zn::Request::FindBlocks { .. } => {
debug!("ignoring unimplemented request");
async { Ok(zn::Response::Nil) }.boxed()
}
zn::Request::PushTransaction(_transaction) => {
debug!("ignoring unimplemented request");
async { Ok(zn::Response::Nil) }.boxed()
}
zn::Request::AdvertiseTransactions(_transactions) => {
debug!("ignoring unimplemented request");
async { Ok(zn::Response::Nil) }.boxed()
}
zn::Request::AdvertiseBlock(_block) => {
debug!("ignoring unimplemented request");
async { Ok(zn::Response::Nil) }.boxed()
}
zn::Request::Ping(_) => {
unreachable!("ping requests are handled internally");
}
}
}
}