network: fill in remaining request/response pairs

This commit is contained in:
Henry de Valence 2020-09-18 22:16:59 -07:00
parent b289cb9164
commit 9c021025a7
4 changed files with 118 additions and 46 deletions

View File

@ -50,6 +50,7 @@ pub(super) enum Handler {
Ping(Nonce), Ping(Nonce),
Peers, Peers,
FindBlocks, FindBlocks,
FindHeaders,
BlocksByHash { BlocksByHash {
hashes: HashSet<block::Hash>, hashes: HashSet<block::Hash>,
blocks: Vec<Arc<Block>>, blocks: Vec<Arc<Block>>,
@ -58,6 +59,7 @@ pub(super) enum Handler {
hashes: HashSet<transaction::Hash>, hashes: HashSet<transaction::Hash>,
transactions: Vec<Arc<Transaction>>, transactions: Vec<Arc<Transaction>>,
}, },
MempoolTransactions,
} }
impl Handler { impl Handler {
@ -133,15 +135,27 @@ impl Handler {
Finished(Err(PeerError::WrongBlock.into())) Finished(Err(PeerError::WrongBlock.into()))
} }
} }
(FindBlocks, Message::Inv(inv_hashes)) => Finished(Ok(Response::BlockHashes( (FindBlocks, Message::Inv(items))
inv_hashes if items
.into_iter() .iter()
.filter_map(|inv| match inv { .all(|item| matches!(item, InventoryHash::Block(_))) =>
InventoryHash::Block(hash) => Some(hash), {
_ => None, Finished(Ok(Response::BlockHashes(
}) block_hashes(&items[..]).collect(),
.collect(), )))
))), }
(MempoolTransactions, Message::Inv(items))
if items
.iter()
.all(|item| matches!(item, InventoryHash::Tx(_))) =>
{
Finished(Ok(Response::TransactionHashes(
transaction_hashes(&items[..]).collect(),
)))
}
(FindHeaders, Message::Headers(headers)) => {
Finished(Ok(Response::BlockHeaders(headers)))
}
// By default, messages are not responses. // By default, messages are not responses.
(state, msg) => { (state, msg) => {
trace!(?msg, "did not interpret message as response"); trace!(?msg, "did not interpret message as response");
@ -441,6 +455,26 @@ where
tx, tx,
span, span,
}), }),
(AwaitingRequest, FindHeaders { known_blocks, stop }) => self
.peer_tx
.send(Message::GetHeaders { known_blocks, stop })
.await
.map_err(|e| e.into())
.map(|()| AwaitingResponse {
handler: Handler::FindHeaders,
tx,
span,
}),
(AwaitingRequest, MempoolTransactions) => self
.peer_tx
.send(Message::Mempool)
.await
.map_err(|e| e.into())
.map(|()| AwaitingResponse {
handler: Handler::MempoolTransactions,
tx,
span,
}),
(AwaitingRequest, PushTransaction(transaction)) => { (AwaitingRequest, PushTransaction(transaction)) => {
// Since we're not waiting for further messages, we need to // Since we're not waiting for further messages, we need to
// send a response before dropping tx. // send a response before dropping tx.
@ -543,7 +577,7 @@ where
[InventoryHash::Tx(_), rest @ ..] [InventoryHash::Tx(_), rest @ ..]
if rest.iter().all(|item| matches!(item, InventoryHash::Tx(_))) => if rest.iter().all(|item| matches!(item, InventoryHash::Tx(_))) =>
{ {
Request::TransactionsByHash(transaction_hashes(&items)) Request::TransactionsByHash(transaction_hashes(&items).collect())
} }
_ => { _ => {
self.fail_with(PeerError::WrongMessage("inv with mixed item types")); self.fail_with(PeerError::WrongMessage("inv with mixed item types"));
@ -556,12 +590,12 @@ where
.iter() .iter()
.all(|item| matches!(item, InventoryHash::Block(_))) => .all(|item| matches!(item, InventoryHash::Block(_))) =>
{ {
Request::BlocksByHash(block_hashes(&items)) Request::BlocksByHash(block_hashes(&items).collect())
} }
[InventoryHash::Tx(_), rest @ ..] [InventoryHash::Tx(_), rest @ ..]
if rest.iter().all(|item| matches!(item, InventoryHash::Tx(_))) => if rest.iter().all(|item| matches!(item, InventoryHash::Tx(_))) =>
{ {
Request::TransactionsByHash(transaction_hashes(&items)) Request::TransactionsByHash(transaction_hashes(&items).collect())
} }
_ => { _ => {
self.fail_with(PeerError::WrongMessage("getdata with mixed item types")); self.fail_with(PeerError::WrongMessage("getdata with mixed item types"));
@ -569,18 +603,11 @@ where
} }
}, },
Message::GetAddr => Request::Peers, Message::GetAddr => Request::Peers,
Message::GetBlocks { .. } => { Message::GetBlocks { known_blocks, stop } => Request::FindBlocks { known_blocks, stop },
debug!("ignoring unimplemented getblocks message"); Message::GetHeaders { known_blocks, stop } => {
return; Request::FindHeaders { known_blocks, stop }
}
Message::GetHeaders { .. } => {
debug!("ignoring unimplemented getheaders message");
return;
}
Message::Mempool => {
debug!("ignoring unimplemented mempool message");
return;
} }
Message::Mempool => Request::MempoolTransactions,
}; };
self.drive_peer_request(req).await self.drive_peer_request(req).await
@ -645,32 +672,42 @@ where
self.fail_with(e.into()) self.fail_with(e.into())
} }
} }
Response::BlockHeaders(headers) => {
if let Err(e) = self.peer_tx.send(Message::Headers(headers)).await {
self.fail_with(e.into())
}
}
Response::TransactionHashes(hashes) => {
if let Err(e) = self
.peer_tx
.send(Message::Inv(hashes.into_iter().map(Into::into).collect()))
.await
{
self.fail_with(e.into())
}
}
} }
} }
} }
fn transaction_hashes(items: &[InventoryHash]) -> HashSet<transaction::Hash> { fn transaction_hashes<'a>(
items items: &'a [InventoryHash],
.iter() ) -> impl Iterator<Item = transaction::Hash> + 'a {
.filter_map(|item| { items.iter().filter_map(|item| {
if let InventoryHash::Tx(hash) = item { if let InventoryHash::Tx(hash) = item {
Some(*hash) Some(*hash)
} else { } else {
None None
} }
}) })
.collect()
} }
fn block_hashes(items: &[InventoryHash]) -> HashSet<block::Hash> { fn block_hashes<'a>(items: &'a [InventoryHash]) -> impl Iterator<Item = block::Hash> + 'a {
items items.iter().filter_map(|item| {
.iter() if let InventoryHash::Block(hash) = item {
.filter_map(|item| { Some(*hash)
if let InventoryHash::Block(hash) = item { } else {
Some(*hash) None
} else { }
None })
}
})
.collect()
} }

View File

@ -86,6 +86,20 @@ pub enum Request {
/// than a list of hashes of subsequent blocks. We believe that unsolicited /// than a list of hashes of subsequent blocks. We believe that unsolicited
/// `inv` messages will always have exactly one block hash. /// `inv` messages will always have exactly one block hash.
FindBlocks { FindBlocks {
/// Hashes of known blocks, ordered from highest height to lowest height.
known_blocks: Vec<block::Hash>,
/// Optionally, the last block hash to request.
stop: Option<block::Hash>,
},
/// Request headers of subsequent blocks in the chain, giving hashes of
/// known blocks.
///
/// # Returns
///
/// Returns
/// [`Response::BlockHeaders`](super::Response::BlockHeaders).
FindHeaders {
/// Hashes of known blocks, ordered from highest height to lowest height. /// Hashes of known blocks, ordered from highest height to lowest height.
known_blocks: Vec<block::Hash>, known_blocks: Vec<block::Hash>,
/// Optionally, the last header to request. /// Optionally, the last header to request.
@ -137,4 +151,11 @@ pub enum Request {
/// ///
/// Returns [`Response::Nil`](super::Response::Nil). /// Returns [`Response::Nil`](super::Response::Nil).
AdvertiseBlock(block::Hash), AdvertiseBlock(block::Hash),
/// Request the contents of this node's mempool.
///
/// # Returns
///
/// Returns [`Response::TransactionHashes`](super::Response::TransactionHashes).
MempoolTransactions,
} }

View File

@ -1,6 +1,6 @@
use zebra_chain::{ use zebra_chain::{
block::{self, Block}, block::{self, Block},
transaction::Transaction, transaction::{self, Transaction},
}; };
use crate::meta_addr::MetaAddr; use crate::meta_addr::MetaAddr;
@ -21,6 +21,12 @@ pub enum Response {
/// A list of block hashes. /// A list of block hashes.
BlockHashes(Vec<block::Hash>), BlockHashes(Vec<block::Hash>),
/// A list of block headers.
BlockHeaders(Vec<block::Header>),
/// A list of transactions. /// A list of transactions.
Transactions(Vec<Arc<Transaction>>), Transactions(Vec<Arc<Transaction>>),
/// A list of transaction hashes.
TransactionHashes(Vec<transaction::Hash>),
} }

View File

@ -158,6 +158,10 @@ impl Service<zn::Request> for Inbound {
debug!("ignoring unimplemented request"); debug!("ignoring unimplemented request");
async { Ok(zn::Response::Nil) }.boxed() async { Ok(zn::Response::Nil) }.boxed()
} }
zn::Request::FindHeaders { .. } => {
debug!("ignoring unimplemented request");
async { Ok(zn::Response::Nil) }.boxed()
}
zn::Request::PushTransaction(_transaction) => { zn::Request::PushTransaction(_transaction) => {
debug!("ignoring unimplemented request"); debug!("ignoring unimplemented request");
async { Ok(zn::Response::Nil) }.boxed() async { Ok(zn::Response::Nil) }.boxed()
@ -170,6 +174,10 @@ impl Service<zn::Request> for Inbound {
debug!("ignoring unimplemented request"); debug!("ignoring unimplemented request");
async { Ok(zn::Response::Nil) }.boxed() async { Ok(zn::Response::Nil) }.boxed()
} }
zn::Request::MempoolTransactions => {
debug!("ignoring unimplemented request");
async { Ok(zn::Response::Nil) }.boxed()
}
zn::Request::Ping(_) => { zn::Request::Ping(_) => {
unreachable!("ping requests are handled internally"); unreachable!("ping requests are handled internally");
} }