//! The inbound service handles requests from Zebra's peers. //! //! It downloads and verifies gossiped blocks and mempool transactions, //! when Zebra is close to the chain tip. //! //! It also responds to peer requests for blocks, transactions, and peer addresses. use std::{ future::Future, pin::Pin, sync::Arc, task::{Context, Poll}, }; use chrono::Utc; use futures::{ future::{FutureExt, TryFutureExt}, stream::Stream, }; use tokio::sync::oneshot::{self, error::TryRecvError}; use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service, ServiceExt}; use zebra_network as zn; use zebra_state as zs; use zebra_chain::block::{self, Block}; use zebra_consensus::chain::VerifyChainError; use zebra_network::{constants::MAX_ADDRS_IN_MESSAGE, AddressBook}; // Re-use the syncer timeouts for consistency. use super::{ mempool, mempool as mp, sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT}, }; mod downloads; #[cfg(test)] mod tests; use downloads::Downloads as BlockDownloads; /// A security parameter to return only 1/3 of available addresses as a /// response to a `Peers` request. const FRAC_OF_AVAILABLE_ADDRESS: f64 = 1. / 3.; type BlockDownloadPeerSet = Buffer, zn::Request>; type State = Buffer, zs::Request>; type Mempool = Buffer, mp::Request>; type BlockVerifier = Buffer, block::Hash, VerifyChainError>, Arc>; type GossipedBlockDownloads = BlockDownloads, Timeout, State>; /// The services used by the [`Inbound`] service. pub struct InboundSetupData { /// A shared list of peer addresses. pub address_book: Arc>, /// A service that can be used to download gossiped blocks. pub block_download_peer_set: BlockDownloadPeerSet, /// A service that verifies downloaded blocks. /// /// Given to `Inbound.block_downloads` after the required services are set up. pub block_verifier: BlockVerifier, /// A service that manages transactions in the memory pool. pub mempool: Mempool, /// A service that manages cached blockchain state. pub state: State, } /// Tracks the internal state of the [`Inbound`] service during setup. pub enum Setup { /// Waiting for service setup to complete. /// /// All requests are ignored. Pending { /// A oneshot channel used to receive required services, /// after they are set up. setup: oneshot::Receiver, }, /// Setup is complete. /// /// All requests are answered. Initialized { /// A shared list of peer addresses. address_book: Arc>, /// A `futures::Stream` that downloads and verifies gossiped blocks. block_downloads: Pin>, /// A service that manages transactions in the memory pool. mempool: Mempool, /// A service that manages cached blockchain state. state: State, }, /// Temporary state used in the inbound service's internal initialization code. /// /// If this state occurs outside the service initialization code, the service panics. FailedInit, /// Setup failed, because the setup channel permanently failed. /// The service keeps returning readiness errors for every request. FailedRecv { /// The original channel error. error: SharedRecvError, }, } /// A wrapper around `Arc` that implements `Error`. #[derive(thiserror::Error, Debug, Clone)] #[error(transparent)] pub struct SharedRecvError(Arc); impl From for SharedRecvError { fn from(source: TryRecvError) -> Self { Self(Arc::new(source)) } } /// Uses the node state to respond to inbound peer requests. /// /// This service, wrapped in appropriate middleware, is passed to /// `zebra_network::init` to respond to inbound peer requests. /// /// The `Inbound` service is responsible for: /// /// - supplying network data like peer addresses to other nodes; /// - supplying chain data like blocks to other nodes; /// - supplying mempool transactions to other nodes; /// - receiving gossiped transactions; and /// - receiving gossiped blocks. /// /// Because the `Inbound` service is responsible for participating in the gossip /// protocols used for transaction and block diffusion, there is a potential /// overlap with the `ChainSync` and `Mempool` components. /// /// The division of responsibility is that: /// /// The `ChainSync` and `Mempool` components are *internally driven*, /// periodically polling the network to check for new blocks or transactions. /// /// The `Inbound` service is *externally driven*, responding to block gossip /// by attempting to download and validate advertised blocks. /// /// Gossiped transactions are forwarded to the mempool downloader, /// which unifies polled and gossiped transactions into a single download list. pub struct Inbound { /// Provides service dependencies, if they are available. /// /// Some services are unavailable until Zebra has completed setup. setup: Setup, } impl Inbound { /// Create a new inbound service. /// /// Dependent services are sent via the `setup` channel after initialization. pub fn new(setup: oneshot::Receiver) -> Inbound { Inbound { setup: Setup::Pending { setup }, } } /// Remove `self.setup`, temporarily replacing it with an invalid state. fn take_setup(&mut self) -> Setup { let mut setup = Setup::FailedInit; std::mem::swap(&mut self.setup, &mut setup); setup } } impl Service for Inbound { type Response = zn::Response; type Error = zn::BoxError; type Future = Pin> + Send + 'static>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { // Check whether the setup is finished, but don't wait for it to // become ready before reporting readiness. We expect to get it "soon", // and reporting unreadiness might cause unwanted load-shedding, since // the load-shed middleware is unable to distinguish being unready due // to load from being unready while waiting on setup. // Every setup variant handler must provide a result let result; self.setup = match self.take_setup() { Setup::Pending { mut setup } => match setup.try_recv() { Ok(setup_data) => { let InboundSetupData { address_book, block_download_peer_set, block_verifier, mempool, state, } = setup_data; let block_downloads = Box::pin(BlockDownloads::new( Timeout::new(block_download_peer_set.clone(), BLOCK_DOWNLOAD_TIMEOUT), Timeout::new(block_verifier, BLOCK_VERIFY_TIMEOUT), state.clone(), )); result = Ok(()); Setup::Initialized { address_book, block_downloads, mempool, state, } } Err(TryRecvError::Empty) => { // There's no setup data yet, so keep waiting for it result = Ok(()); Setup::Pending { setup } } Err(error @ TryRecvError::Closed) => { // Mark the service as failed, because setup failed error!(?error, "inbound setup failed"); let error: SharedRecvError = error.into(); result = Err(error.clone().into()); Setup::FailedRecv { error } } }, // Make sure previous setups were left in a valid state Setup::FailedInit => unreachable!("incomplete previous Inbound initialization"), // If setup failed, report service failure Setup::FailedRecv { error } => { result = Err(error.clone().into()); Setup::FailedRecv { error } } // Clean up completed download tasks, ignoring their results Setup::Initialized { address_book, mut block_downloads, mempool, state, } => { while let Poll::Ready(Some(_)) = block_downloads.as_mut().poll_next(cx) {} result = Ok(()); Setup::Initialized { address_book, block_downloads, mempool, state, } } }; // Make sure we're leaving the setup in a valid state if matches!(self.setup, Setup::FailedInit) { unreachable!("incomplete Inbound initialization after poll_ready state handling"); } // TODO: // * do we want to propagate backpressure from the download queue or its outbound network? // currently, the download queue waits for the outbound network in the download future, // and drops new requests after it reaches a hard-coded limit. This is the // "load shed directly" pattern from #1618. // * currently, the state service is always ready, unless its buffer is full. // So we might also want to propagate backpressure from its buffer. // * poll_ready needs to be implemented carefully, to avoid hangs or deadlocks. // See #1593 for details. Poll::Ready(result) } /// Call the inbound service. /// /// Errors indicate that the peer has done something wrong or unexpected, /// and will cause callers to disconnect from the remote peer. #[instrument(name = "inbound", skip(self, req))] fn call(&mut self, req: zn::Request) -> Self::Future { let (address_book, block_downloads, mempool, state) = match &mut self.setup { Setup::Initialized { address_book, block_downloads, mempool, state, } => (address_book, block_downloads, mempool, state), _ => { debug!("ignoring request from remote peer during setup"); return async { Ok(zn::Response::Nil) }.boxed(); } }; match req { zn::Request::Peers => { // # Security // // We truncate the list to not reveal our entire peer set in one call. // But we don't monitor repeated requests and the results are shuffled, // a crawler could just send repeated queries and get the full list. // // # Correctness // // Briefly hold the address book threaded mutex while // cloning the address book. Then sanitize after releasing // the lock. let peers = address_book.lock().unwrap().clone(); // Correctness: get the current time after acquiring the address book lock. let now = Utc::now(); // Send a sanitized response let mut peers = peers.sanitized(now); // Truncate the list let truncate_at = MAX_ADDRS_IN_MESSAGE .min((peers.len() as f64 * FRAC_OF_AVAILABLE_ADDRESS).ceil() as usize); peers.truncate(truncate_at); if !peers.is_empty() { async { Ok(zn::Response::Peers(peers)) }.boxed() } else { info!("ignoring `Peers` request from remote peer because our address book is empty"); async { Ok(zn::Response::Nil) }.boxed() } } zn::Request::BlocksByHash(hashes) => { // Correctness: // // We can't use `call_all` here, because it can hold one buffer slot per concurrent // future, until the `CallAll` struct is dropped. We can't hold those slots in this // future because: // * we're not sure when the returned future will complete, and // * we don't limit how many returned futures can be concurrently running // https://github.com/tower-rs/tower/blob/master/tower/src/util/call_all/common.rs#L112 use futures::stream::TryStreamExt; hashes .into_iter() .map(|hash| zs::Request::Block(hash.into())) .map(|request| state.clone().oneshot(request)) .collect::>() .try_filter_map(|response| async move { Ok(match response { zs::Response::Block(Some(block)) => Some(block), // `zcashd` ignores missing blocks in GetData responses, // rather than including them in a trailing `NotFound` // message zs::Response::Block(None) => None, _ => unreachable!("wrong response from state"), }) }) .try_collect::>() .map_ok(|blocks| { if blocks.is_empty() { zn::Response::Nil } else { zn::Response::Blocks(blocks) } }) .boxed() } zn::Request::TransactionsById(transactions) => { let request = mempool::Request::TransactionsById(transactions); mempool.clone().oneshot(request).map_ok(|resp| match resp { mempool::Response::Transactions(transactions) if transactions.is_empty() => zn::Response::Nil, mempool::Response::Transactions(transactions) => zn::Response::Transactions(transactions), _ => unreachable!("Mempool component should always respond to a `TransactionsById` request with a `Transactions` response"), }) .boxed() } zn::Request::FindBlocks { known_blocks, stop } => { let request = zs::Request::FindBlockHashes { known_blocks, stop }; state.clone().oneshot(request).map_ok(|resp| match resp { zs::Response::BlockHashes(hashes) if hashes.is_empty() => zn::Response::Nil, zs::Response::BlockHashes(hashes) => zn::Response::BlockHashes(hashes), _ => unreachable!("zebra-state should always respond to a `FindBlockHashes` request with a `BlockHashes` response"), }) .boxed() } zn::Request::FindHeaders { known_blocks, stop } => { let request = zs::Request::FindBlockHeaders { known_blocks, stop }; state.clone().oneshot(request).map_ok(|resp| match resp { zs::Response::BlockHeaders(headers) if headers.is_empty() => zn::Response::Nil, zs::Response::BlockHeaders(headers) => zn::Response::BlockHeaders(headers), _ => unreachable!("zebra-state should always respond to a `FindBlockHeaders` request with a `BlockHeaders` response"), }) .boxed() } zn::Request::PushTransaction(transaction) => { mempool .clone() .oneshot(mempool::Request::Queue(vec![transaction.into()])) // The response just indicates if processing was queued or not; ignore it .map_ok(|_resp| zn::Response::Nil) .boxed() } zn::Request::AdvertiseTransactionIds(transactions) => { let transactions = transactions.into_iter().map(Into::into).collect(); mempool .clone() .oneshot(mempool::Request::Queue(transactions)) // The response just indicates if processing was queued or not; ignore it .map_ok(|_resp| zn::Response::Nil) .boxed() } zn::Request::AdvertiseBlock(hash) => { block_downloads.download_and_verify(hash); async { Ok(zn::Response::Nil) }.boxed() } zn::Request::MempoolTransactionIds => { mempool.clone().oneshot(mempool::Request::TransactionIds).map_ok(|resp| match resp { mempool::Response::TransactionIds(transaction_ids) if transaction_ids.is_empty() => zn::Response::Nil, mempool::Response::TransactionIds(transaction_ids) => zn::Response::TransactionIds(transaction_ids.into_iter().collect()), _ => unreachable!("Mempool component should always respond to a `TransactionIds` request with a `TransactionIds` response"), }) .boxed() } zn::Request::Ping(_) => { unreachable!("ping requests are handled internally"); } } } }