From 9c021025a7f64a97a117c31aeec967ab59991497 Mon Sep 17 00:00:00 2001 From: Henry de Valence Date: Fri, 18 Sep 2020 22:16:59 -0700 Subject: [PATCH] network: fill in remaining request/response pairs --- zebra-network/src/peer/connection.rs | 127 +++++++++++------- .../src/protocol/internal/request.rs | 21 +++ .../src/protocol/internal/response.rs | 8 +- zebrad/src/components/inbound.rs | 8 ++ 4 files changed, 118 insertions(+), 46 deletions(-) diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index 9049c5226..a9ab7b8a4 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -50,6 +50,7 @@ pub(super) enum Handler { Ping(Nonce), Peers, FindBlocks, + FindHeaders, BlocksByHash { hashes: HashSet, blocks: Vec>, @@ -58,6 +59,7 @@ pub(super) enum Handler { hashes: HashSet, transactions: Vec>, }, + MempoolTransactions, } impl Handler { @@ -133,15 +135,27 @@ impl Handler { Finished(Err(PeerError::WrongBlock.into())) } } - (FindBlocks, Message::Inv(inv_hashes)) => Finished(Ok(Response::BlockHashes( - inv_hashes - .into_iter() - .filter_map(|inv| match inv { - InventoryHash::Block(hash) => Some(hash), - _ => None, - }) - .collect(), - ))), + (FindBlocks, Message::Inv(items)) + if items + .iter() + .all(|item| matches!(item, InventoryHash::Block(_))) => + { + Finished(Ok(Response::BlockHashes( + block_hashes(&items[..]).collect(), + ))) + } + (MempoolTransactions, Message::Inv(items)) + if items + .iter() + .all(|item| matches!(item, InventoryHash::Tx(_))) => + { + Finished(Ok(Response::TransactionHashes( + transaction_hashes(&items[..]).collect(), + ))) + } + (FindHeaders, Message::Headers(headers)) => { + Finished(Ok(Response::BlockHeaders(headers))) + } // By default, messages are not responses. (state, msg) => { trace!(?msg, "did not interpret message as response"); @@ -441,6 +455,26 @@ where tx, span, }), + (AwaitingRequest, FindHeaders { known_blocks, stop }) => self + .peer_tx + .send(Message::GetHeaders { known_blocks, stop }) + .await + .map_err(|e| e.into()) + .map(|()| AwaitingResponse { + handler: Handler::FindHeaders, + tx, + span, + }), + (AwaitingRequest, MempoolTransactions) => self + .peer_tx + .send(Message::Mempool) + .await + .map_err(|e| e.into()) + .map(|()| AwaitingResponse { + handler: Handler::MempoolTransactions, + tx, + span, + }), (AwaitingRequest, PushTransaction(transaction)) => { // Since we're not waiting for further messages, we need to // send a response before dropping tx. @@ -543,7 +577,7 @@ where [InventoryHash::Tx(_), rest @ ..] if rest.iter().all(|item| matches!(item, InventoryHash::Tx(_))) => { - Request::TransactionsByHash(transaction_hashes(&items)) + Request::TransactionsByHash(transaction_hashes(&items).collect()) } _ => { self.fail_with(PeerError::WrongMessage("inv with mixed item types")); @@ -556,12 +590,12 @@ where .iter() .all(|item| matches!(item, InventoryHash::Block(_))) => { - Request::BlocksByHash(block_hashes(&items)) + Request::BlocksByHash(block_hashes(&items).collect()) } [InventoryHash::Tx(_), rest @ ..] if rest.iter().all(|item| matches!(item, InventoryHash::Tx(_))) => { - Request::TransactionsByHash(transaction_hashes(&items)) + Request::TransactionsByHash(transaction_hashes(&items).collect()) } _ => { self.fail_with(PeerError::WrongMessage("getdata with mixed item types")); @@ -569,18 +603,11 @@ where } }, Message::GetAddr => Request::Peers, - Message::GetBlocks { .. } => { - debug!("ignoring unimplemented getblocks message"); - return; - } - Message::GetHeaders { .. } => { - debug!("ignoring unimplemented getheaders message"); - return; - } - Message::Mempool => { - debug!("ignoring unimplemented mempool message"); - return; + Message::GetBlocks { known_blocks, stop } => Request::FindBlocks { known_blocks, stop }, + Message::GetHeaders { known_blocks, stop } => { + Request::FindHeaders { known_blocks, stop } } + Message::Mempool => Request::MempoolTransactions, }; self.drive_peer_request(req).await @@ -645,32 +672,42 @@ where self.fail_with(e.into()) } } + Response::BlockHeaders(headers) => { + if let Err(e) = self.peer_tx.send(Message::Headers(headers)).await { + self.fail_with(e.into()) + } + } + Response::TransactionHashes(hashes) => { + if let Err(e) = self + .peer_tx + .send(Message::Inv(hashes.into_iter().map(Into::into).collect())) + .await + { + self.fail_with(e.into()) + } + } } } } -fn transaction_hashes(items: &[InventoryHash]) -> HashSet { - items - .iter() - .filter_map(|item| { - if let InventoryHash::Tx(hash) = item { - Some(*hash) - } else { - None - } - }) - .collect() +fn transaction_hashes<'a>( + items: &'a [InventoryHash], +) -> impl Iterator + 'a { + items.iter().filter_map(|item| { + if let InventoryHash::Tx(hash) = item { + Some(*hash) + } else { + None + } + }) } -fn block_hashes(items: &[InventoryHash]) -> HashSet { - items - .iter() - .filter_map(|item| { - if let InventoryHash::Block(hash) = item { - Some(*hash) - } else { - None - } - }) - .collect() +fn block_hashes<'a>(items: &'a [InventoryHash]) -> impl Iterator + 'a { + items.iter().filter_map(|item| { + if let InventoryHash::Block(hash) = item { + Some(*hash) + } else { + None + } + }) } diff --git a/zebra-network/src/protocol/internal/request.rs b/zebra-network/src/protocol/internal/request.rs index deb4895fb..b70af56d2 100644 --- a/zebra-network/src/protocol/internal/request.rs +++ b/zebra-network/src/protocol/internal/request.rs @@ -86,6 +86,20 @@ pub enum Request { /// than a list of hashes of subsequent blocks. We believe that unsolicited /// `inv` messages will always have exactly one block hash. FindBlocks { + /// Hashes of known blocks, ordered from highest height to lowest height. + known_blocks: Vec, + /// Optionally, the last block hash to request. + stop: Option, + }, + + /// Request headers of subsequent blocks in the chain, giving hashes of + /// known blocks. + /// + /// # Returns + /// + /// Returns + /// [`Response::BlockHeaders`](super::Response::BlockHeaders). + FindHeaders { /// Hashes of known blocks, ordered from highest height to lowest height. known_blocks: Vec, /// Optionally, the last header to request. @@ -137,4 +151,11 @@ pub enum Request { /// /// Returns [`Response::Nil`](super::Response::Nil). AdvertiseBlock(block::Hash), + + /// Request the contents of this node's mempool. + /// + /// # Returns + /// + /// Returns [`Response::TransactionHashes`](super::Response::TransactionHashes). + MempoolTransactions, } diff --git a/zebra-network/src/protocol/internal/response.rs b/zebra-network/src/protocol/internal/response.rs index 80bef6651..9e0f638f6 100644 --- a/zebra-network/src/protocol/internal/response.rs +++ b/zebra-network/src/protocol/internal/response.rs @@ -1,6 +1,6 @@ use zebra_chain::{ block::{self, Block}, - transaction::Transaction, + transaction::{self, Transaction}, }; use crate::meta_addr::MetaAddr; @@ -21,6 +21,12 @@ pub enum Response { /// A list of block hashes. BlockHashes(Vec), + /// A list of block headers. + BlockHeaders(Vec), + /// A list of transactions. Transactions(Vec>), + + /// A list of transaction hashes. + TransactionHashes(Vec), } diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index 299cf1768..c774c63fd 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -158,6 +158,10 @@ impl Service for Inbound { debug!("ignoring unimplemented request"); async { Ok(zn::Response::Nil) }.boxed() } + zn::Request::FindHeaders { .. } => { + debug!("ignoring unimplemented request"); + async { Ok(zn::Response::Nil) }.boxed() + } zn::Request::PushTransaction(_transaction) => { debug!("ignoring unimplemented request"); async { Ok(zn::Response::Nil) }.boxed() @@ -170,6 +174,10 @@ impl Service for Inbound { debug!("ignoring unimplemented request"); async { Ok(zn::Response::Nil) }.boxed() } + zn::Request::MempoolTransactions => { + debug!("ignoring unimplemented request"); + async { Ok(zn::Response::Nil) }.boxed() + } zn::Request::Ping(_) => { unreachable!("ping requests are handled internally"); }