Inbound `FindBlocks` and `FindHeaders` (#1347)

* implement inbound `FindBlocks`
* Handle inbound peer FindHeaders requests
* handle request before having any chain tip
* Split `find_chain_hashes` into smaller functions

Add a `max_len` argument to support `FindHeaders` requests.

Rewrite the hash collection code to use heights, so we can handle the
`stop` hash and "no intersection" cases correctly.

* Split state height functions into "any chain" and "best chain"
* Rename the best chain block method to `best_block`
* Move fmt utilities to zebra_chain::fmt
* Summarise Debug for some Message variants

Co-authored-by: teor <teor@riseup.net>
Co-authored-by: Jane Lusby <jlusby42@gmail.com>
This commit is contained in:
Alfredo Garcia 2020-11-30 18:30:37 -03:00 committed by GitHub
parent d007c76488
commit 4544463059
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 388 additions and 47 deletions

View File

@ -24,7 +24,7 @@ pub use root_hash::RootHash;
use serde::{Deserialize, Serialize};
use crate::{parameters::Network, transaction::Transaction, transparent};
use crate::{fmt::DisplayToDebug, parameters::Network, transaction::Transaction, transparent};
/// A Zcash block, containing a header and a list of transactions.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
@ -46,17 +46,6 @@ impl fmt::Display for Block {
}
}
struct DisplayToDebug<T>(T);
impl<T> fmt::Debug for DisplayToDebug<T>
where
T: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
impl Block {
/// Return the block height reported in the coinbase transaction, if any.
pub fn coinbase_height(&self) -> Option<Height> {

28
zebra-chain/src/fmt.rs Normal file
View File

@ -0,0 +1,28 @@
//! Format wrappers for Zebra
use std::fmt;
pub struct DisplayToDebug<T>(pub T);
impl<T> fmt::Debug for DisplayToDebug<T>
where
T: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
pub struct SummaryDebug<T>(pub T);
impl<T> fmt::Debug for SummaryDebug<Vec<T>> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}, len={}", std::any::type_name::<T>(), self.0.len())
}
}
impl<T> fmt::Debug for SummaryDebug<&Vec<T>> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}, len={}", std::any::type_name::<T>(), self.0.len())
}
}

View File

@ -16,6 +16,7 @@ extern crate serde;
pub mod amount;
pub mod block;
pub mod fmt;
pub mod parameters;
pub mod primitives;
pub mod sapling;

View File

@ -1,12 +1,13 @@
//! Definitions of network messages.
use std::error::Error;
use std::{net, sync::Arc};
use std::{fmt, net, sync::Arc};
use chrono::{DateTime, Utc};
use zebra_chain::{
block::{self, Block},
fmt::{DisplayToDebug, SummaryDebug},
transaction::Transaction,
};
@ -30,7 +31,7 @@ use crate::meta_addr::MetaAddr;
/// during serialization).
///
/// [btc_wiki_protocol]: https://en.bitcoin.it/wiki/Protocol_documentation
#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone, Eq, PartialEq)]
pub enum Message {
/// A `version` message.
///
@ -307,3 +308,93 @@ pub enum RejectReason {
Checkpoint = 0x43,
Other = 0x50,
}
/// Summarise `Vec`s when debugging messages
impl fmt::Debug for Message {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Message::Version {
version,
services,
timestamp,
address_recv,
address_from,
nonce,
user_agent,
start_height,
relay,
} => f
.debug_struct("Version")
.field("version", version)
.field("services", services)
.field("timestamp", timestamp)
.field("address_recv", address_recv)
.field("address_from", address_from)
.field("nonce", nonce)
.field("user_agent", user_agent)
.field("start_height", start_height)
.field("relay", relay)
.finish(),
Message::Verack => f.debug_tuple("Verack").finish(),
Message::Ping(nonce) => f.debug_tuple("Ping").field(nonce).finish(),
Message::Pong(nonce) => f.debug_tuple("Pong").field(nonce).finish(),
Message::Reject {
message,
ccode,
reason,
data,
} => f
.debug_struct("Reject")
.field("message", message)
.field("ccode", ccode)
.field("reason", reason)
.field("data", data)
.finish(),
Message::GetAddr => f.debug_tuple("GetAddr").finish(),
Message::Addr(addr) => f.debug_tuple("Addr").field(&SummaryDebug(addr)).finish(),
Message::GetBlocks { known_blocks, stop } => f
.debug_struct("GetBlocks")
.field("known_blocks", &SummaryDebug(known_blocks))
.field("stop", stop)
.finish(),
Message::Inv(inv) => f.debug_tuple("Inv").field(&SummaryDebug(inv)).finish(),
Message::GetHeaders { known_blocks, stop } => f
.debug_struct("GetHeaders")
.field("known_blocks", &SummaryDebug(known_blocks))
.field("stop", stop)
.finish(),
Message::Headers(headers) => f
.debug_tuple("Headers")
.field(&SummaryDebug(headers))
.finish(),
Message::GetData(data) => f.debug_tuple("GetData").field(&SummaryDebug(data)).finish(),
Message::Block(block) => f
.debug_tuple("Block")
.field(&DisplayToDebug(block))
.finish(),
Message::Tx(tx) => f.debug_tuple("Tx").field(&tx).finish(),
Message::NotFound(not_found) => f
.debug_tuple("NotFound")
.field(&SummaryDebug(not_found))
.finish(),
Message::Mempool => f.debug_tuple("Mempool").finish(),
Message::FilterLoad {
filter,
hash_functions_count,
tweak,
flags,
} => f
.debug_struct("FilterLoad")
.field("filter", filter)
.field("hash_functions_count", hash_functions_count)
.field("tweak", tweak)
.field("flags", flags)
.finish(),
Message::FilterAdd { data } => f
.debug_struct("FilterAdd")
.field("data", &SummaryDebug(data))
.finish(),
Message::FilterClear => f.debug_tuple("FilterClear").finish(),
}
}
}

View File

@ -239,4 +239,50 @@ pub enum Request {
/// Code making this request should apply a timeout layer to the service to
/// handle missing UTXOs.
AwaitUtxo(transparent::OutPoint),
/// Finds the first hash that's in the peer's `known_blocks` and the local best chain.
/// Returns a list of hashes that follow that intersection, from the best chain.
///
/// If there is no matching hash in the best chain, starts from the genesis hash.
///
/// Stops the list of hashes after:
/// * adding the best tip,
/// * adding the `stop` hash to the list, if it is in the best chain, or
/// * adding 500 hashes to the list.
///
/// Returns an empty list if the state is empty.
///
/// Returns
///
/// [`Response::BlockHashes(Vec<block::Hash>)`](Response::BlockHashes).
/// See https://en.bitcoin.it/wiki/Protocol_documentation#getblocks
FindBlockHashes {
/// Hashes of known blocks, ordered from highest height to lowest height.
known_blocks: Vec<block::Hash>,
/// Optionally, the last block hash to request.
stop: Option<block::Hash>,
},
/// Finds the first hash that's in the peer's `known_blocks` and the local best chain.
/// Returns a list of headers that follow that intersection, from the best chain.
///
/// If there is no matching hash in the best chain, starts from the genesis header.
///
/// Stops the list of headers after:
/// * adding the best tip,
/// * adding the header matching the `stop` hash to the list, if it is in the best chain, or
/// * adding 160 headers to the list.
///
/// Returns an empty list if the state is empty.
///
/// Returns
///
/// [`Response::BlockHeaders(Vec<block::Header>)`](Response::BlockHeaders).
/// See https://en.bitcoin.it/wiki/Protocol_documentation#getheaders
FindBlockHeaders {
/// Hashes of known blocks, ordered from highest height to lowest height.
known_blocks: Vec<block::Hash>,
/// Optionally, the hash of the last header to request.
stop: Option<block::Hash>,
},
}

View File

@ -33,6 +33,12 @@ pub enum Response {
/// Response to [`Request::Block`] with the specified block.
Block(Option<Arc<Block>>),
/// The response to a `AwaitUtxo` request
/// The response to a `AwaitUtxo` request.
Utxo(Utxo),
/// The response to a `FindBlockHashes` request.
BlockHashes(Vec<block::Hash>),
/// The response to a `FindBlockHeaders` request.
BlockHeaders(Vec<block::Header>),
}

View File

@ -208,7 +208,7 @@ impl StateService {
let mut hashes = Vec::with_capacity(heights.len());
for height in heights {
if let Some(hash) = self.hash(height) {
if let Some(hash) = self.best_hash(height) {
hashes.push(hash);
}
}
@ -224,16 +224,19 @@ impl StateService {
/// Return the depth of block `hash` in the current best chain.
pub fn depth(&self, hash: block::Hash) -> Option<u32> {
let tip = self.tip()?.0;
let height = self.mem.height(hash).or_else(|| self.disk.height(hash))?;
let height = self
.mem
.best_height_by_hash(hash)
.or_else(|| self.disk.height(hash))?;
Some(tip.0 - height.0)
}
/// Return the block identified by either its `height` or `hash` if it exists
/// in the current best chain.
pub fn block(&self, hash_or_height: HashOrHeight) -> Option<Arc<Block>> {
pub fn best_block(&self, hash_or_height: HashOrHeight) -> Option<Arc<Block>> {
self.mem
.block(hash_or_height)
.best_block(hash_or_height)
.or_else(|| self.disk.block(hash_or_height))
}
@ -246,14 +249,29 @@ impl StateService {
}
/// Return the hash for the block at `height` in the current best chain.
pub fn hash(&self, height: block::Height) -> Option<block::Hash> {
self.mem.hash(height).or_else(|| self.disk.hash(height))
pub fn best_hash(&self, height: block::Height) -> Option<block::Hash> {
self.mem
.best_hash(height)
.or_else(|| self.disk.hash(height))
}
/// Return true if `hash` is in the current best chain.
pub fn best_chain_contains(&self, hash: block::Hash) -> bool {
self.best_height_by_hash(hash).is_some()
}
/// Return the height for the block at `hash`, if `hash` is in the best chain.
pub fn best_height_by_hash(&self, hash: block::Hash) -> Option<block::Height> {
self.mem
.best_height_by_hash(hash)
.or_else(|| self.disk.height(hash))
}
/// Return the height for the block at `hash` in any chain.
pub fn height_by_hash(&self, hash: block::Hash) -> Option<block::Height> {
#[allow(dead_code)]
pub fn any_height_by_hash(&self, hash: block::Hash) -> Option<block::Height> {
self.mem
.height_by_hash(hash)
.any_height_by_hash(hash)
.or_else(|| self.disk.height(hash))
}
@ -276,6 +294,143 @@ impl StateService {
state: IterState::NonFinalized(hash),
}
}
/// Find the first hash that's in the peer's `known_blocks` and the local best chain.
///
/// Returns `None` if:
/// * there is no matching hash in the best chain, or
/// * the state is empty.
fn find_chain_intersection(&self, known_blocks: Vec<block::Hash>) -> Option<block::Hash> {
// We can get a block locator request before we have downloaded the genesis block
self.tip()?;
known_blocks
.iter()
.find(|&&hash| self.best_chain_contains(hash))
.cloned()
}
/// Returns a list of block hashes in the best chain, following the `intersection` with the best
/// chain. If there is no intersection with the best chain, starts from the genesis hash.
///
/// Includes finalized and non-finalized blocks.
///
/// Stops the list of hashes after:
/// * adding the best tip,
/// * adding the `stop` hash to the list, if it is in the best chain, or
/// * adding `max_len` hashes to the list.
///
/// Returns an empty list if the state is empty.
pub fn collect_chain_hashes(
&self,
intersection: Option<block::Hash>,
stop: Option<block::Hash>,
max_len: usize,
) -> Vec<block::Hash> {
assert!(max_len > 0, "max_len must be at least 1");
// We can get a block locator request before we have downloaded the genesis block
let chain_tip_height = if let Some((height, _)) = self.tip() {
height
} else {
return Vec::new();
};
let intersection_height = intersection.map(|hash| {
self.best_height_by_hash(hash)
.expect("the intersection hash must be in the best chain")
});
let max_len_height = if let Some(intersection_height) = intersection_height {
// start after the intersection_height, and return max_len hashes
(intersection_height + (max_len as i32))
.expect("the Find response height does not exceed Height::MAX")
} else {
// start at genesis, and return max_len hashes
block::Height((max_len - 1) as _)
};
let stop_height = stop.map(|hash| self.best_height_by_hash(hash)).flatten();
// Compute the final height, making sure it is:
// * at or below our chain tip, and
// * at or below the height of the stop hash.
let final_height = std::cmp::min(max_len_height, chain_tip_height);
let final_height = stop_height
.map(|stop_height| std::cmp::min(final_height, stop_height))
.unwrap_or(final_height);
let final_hash = self
.best_hash(final_height)
.expect("final height must have a hash");
// We can use an "any chain" method here, because `final_hash` is in the best chain
let mut res: Vec<_> = self
.chain(final_hash)
.map(|block| block.hash())
.take_while(|&hash| Some(hash) != intersection)
.inspect(|hash| {
tracing::trace!(
?hash,
height = ?self.best_height_by_hash(*hash)
.expect("if hash is in the state then it should have an associated height"),
"adding hash to peer Find response",
)
})
.collect();
res.reverse();
tracing::info!(
?final_height,
response_len = ?res.len(),
?chain_tip_height,
?stop_height,
?intersection_height,
"responding to peer GetBlocks or GetHeaders",
);
// Check the function implements the Find protocol
assert!(
res.len() <= max_len,
"a Find response must not exceed the maximum response length"
);
assert!(
intersection
.map(|hash| !res.contains(&hash))
.unwrap_or(true),
"the list must not contain the intersection hash"
);
assert!(
stop.map(|hash| !res[..(res.len() - 1)].contains(&hash))
.unwrap_or(true),
"if the stop hash is in the list, it must be the final hash"
);
res
}
/// Finds the first hash that's in the peer's `known_blocks` and the local best chain.
/// Returns a list of hashes that follow that intersection, from the best chain.
///
/// Starts from the first matching hash in the best chain, ignoring all other hashes in
/// `known_blocks`. If there is no matching hash in the best chain, starts from the genesis
/// hash.
///
/// Includes finalized and non-finalized blocks.
///
/// Stops the list of hashes after:
/// * adding the best tip,
/// * adding the `stop` hash to the list, if it is in the best chain, or
/// * adding 500 hashes to the list.
///
/// Returns an empty list if the state is empty.
pub fn find_chain_hashes(
&self,
known_blocks: Vec<block::Hash>,
stop: Option<block::Hash>,
max_len: usize,
) -> Vec<block::Hash> {
let intersection = self.find_chain_intersection(known_blocks);
self.collect_chain_hashes(intersection, stop, max_len)
}
}
struct Iter<'a> {
@ -361,7 +516,7 @@ impl ExactSizeIterator for Iter<'_> {
match self.state {
IterState::NonFinalized(hash) => self
.service
.height_by_hash(hash)
.best_height_by_hash(hash)
.map(|height| (height.0 + 1) as _)
.unwrap_or(0),
IterState::Finalized(height) => (height.0 + 1) as _,
@ -463,7 +618,7 @@ impl Service<Request> for StateService {
}
Request::Block(hash_or_height) => {
metrics::counter!("state.requests", 1, "type" => "block");
let rsp = Ok(self.block(hash_or_height)).map(Response::Block);
let rsp = Ok(self.best_block(hash_or_height)).map(Response::Block);
async move { rsp }.boxed()
}
Request::AwaitUtxo(outpoint) => {
@ -477,6 +632,25 @@ impl Service<Request> for StateService {
fut.boxed()
}
Request::FindBlockHashes { known_blocks, stop } => {
const MAX_FIND_BLOCK_HASHES_RESULTS: usize = 500;
let res = self.find_chain_hashes(known_blocks, stop, MAX_FIND_BLOCK_HASHES_RESULTS);
async move { Ok(Response::BlockHashes(res)) }.boxed()
}
Request::FindBlockHeaders { known_blocks, stop } => {
const MAX_FIND_BLOCK_HEADERS_RESULTS: usize = 160;
let res =
self.find_chain_hashes(known_blocks, stop, MAX_FIND_BLOCK_HEADERS_RESULTS);
let res: Vec<_> = res
.iter()
.map(|&hash| {
self.best_block(hash.into())
.expect("block for found hash is in the best chain")
.header
})
.collect();
async move { Ok(Response::BlockHeaders(res)) }.boxed()
}
}
}
}

View File

@ -169,7 +169,7 @@ impl NonFinalizedState {
}
/// Returns the `block` at a given height or hash in the best chain.
pub fn block(&self, hash_or_height: HashOrHeight) -> Option<Arc<Block>> {
pub fn best_block(&self, hash_or_height: HashOrHeight) -> Option<Arc<Block>> {
let best_chain = self.best_chain()?;
let height =
hash_or_height.height_or_else(|hash| best_chain.height_by_hash.get(&hash).cloned())?;
@ -181,8 +181,11 @@ impl NonFinalizedState {
}
/// Returns the hash for a given `block::Height` if it is present in the best chain.
pub fn hash(&self, height: block::Height) -> Option<block::Hash> {
self.block(height.into()).map(|block| block.hash())
pub fn best_hash(&self, height: block::Height) -> Option<block::Hash> {
self.best_chain()?
.blocks
.get(&height)
.map(|prepared| prepared.hash)
}
/// Returns the tip of the best chain.
@ -194,15 +197,15 @@ impl NonFinalizedState {
Some((height, hash))
}
/// Returns the depth of `hash` in the best chain.
pub fn height(&self, hash: block::Hash) -> Option<block::Height> {
/// Returns the height of `hash` in the best chain.
pub fn best_height_by_hash(&self, hash: block::Hash) -> Option<block::Height> {
let best_chain = self.best_chain()?;
let height = *best_chain.height_by_hash.get(&hash)?;
Some(height)
}
/// Returns the height of `hash` in any chain.
pub fn height_by_hash(&self, hash: block::Hash) -> Option<block::Height> {
pub fn any_height_by_hash(&self, hash: block::Hash) -> Option<block::Height> {
for chain in self.chain_set.iter().rev() {
if let Some(height) = chain.height_by_hash.get(&hash) {
return Some(*height);

View File

@ -385,12 +385,13 @@ impl Ord for Chain {
#[cfg(test)]
mod tests {
use std::{env, fmt, sync::Arc};
use std::{env, sync::Arc};
use zebra_chain::serialization::ZcashDeserializeInto;
use zebra_chain::{
block::Block,
fmt::SummaryDebug,
parameters::{Network, NetworkUpgrade},
serialization::ZcashDeserializeInto,
LedgerState,
};
use zebra_test::prelude::*;
@ -400,14 +401,6 @@ mod tests {
use self::assert_eq;
use super::*;
struct SummaryDebug<T>(T);
impl<T> fmt::Debug for SummaryDebug<Vec<T>> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}, len={}", std::any::type_name::<T>(), self.0.len())
}
}
#[test]
fn construct_empty() {
zebra_test::init();

View File

@ -180,13 +180,23 @@ impl Service<zn::Request> for Inbound {
debug!("ignoring unimplemented request");
async { Ok(zn::Response::Nil) }.boxed()
}
zn::Request::FindBlocks { .. } => {
debug!("ignoring unimplemented request");
async { Ok(zn::Response::Nil) }.boxed()
zn::Request::FindBlocks { known_blocks, stop } => {
let request = zs::Request::FindBlockHashes { known_blocks, stop };
self.state.call(request).map_ok(|resp| match resp {
zs::Response::BlockHashes(hashes) if hashes.is_empty() => zn::Response::Nil,
zs::Response::BlockHashes(hashes) => zn::Response::BlockHashes(hashes),
_ => unreachable!("zebra-state should always respond to a `FindBlockHashes` request with a `BlockHashes` response"),
})
.boxed()
}
zn::Request::FindHeaders { .. } => {
debug!("ignoring unimplemented request");
async { Ok(zn::Response::Nil) }.boxed()
zn::Request::FindHeaders { known_blocks, stop } => {
let request = zs::Request::FindBlockHeaders { known_blocks, stop };
self.state.call(request).map_ok(|resp| match resp {
zs::Response::BlockHeaders(headers) if headers.is_empty() => zn::Response::Nil,
zs::Response::BlockHeaders(headers) => zn::Response::BlockHeaders(headers),
_ => unreachable!("zebra-state should always respond to a `FindBlockHeaders` request with a `BlockHeaders` response"),
})
.boxed()
}
zn::Request::PushTransaction(_transaction) => {
debug!("ignoring unimplemented request");