Gossip recently verified block hashes to peers (#2729)

* Implement a task that gossips verified block hashes

* Log an info message for block broadcasts

* Simplify the gossip task

Co-authored-by: Janito Vaqueiro Ferreira Filho <janito.vff@gmail.com>

* Re-use the old tip change if there is no new tip change

Also improve the comments.

* Add an assertion message

* Rename task join handles and futures in start method

* Add a dedicated BlockGossipError type

This type helps distinguish between syncer and state errors.

* Test that committed blocks are gossiped to peers

Also do a minor type cleanup on the existing test code,
replacing `Option<Vec<_>>` with `Vec<_>`.

* Formatting

* Remove excess newlines

Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>

* Clear the initial gossiped blocks during test setup

Co-authored-by: Janito Vaqueiro Ferreira Filho <janito.vff@gmail.com>
Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>
This commit is contained in:
teor 2021-10-07 20:46:37 +10:00 committed by GitHub
parent 0b82298645
commit 04d2cfb3d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 250 additions and 33 deletions

View File

@ -540,6 +540,15 @@ impl TipAction {
} }
} }
/// Returns the block height of this tip action,
/// regardless of the underlying variant.
pub fn best_tip_height(&self) -> block::Height {
match self {
Grow { block } => block.height,
Reset { height, .. } => *height,
}
}
/// Returns a [`Grow`] based on `block`. /// Returns a [`Grow`] based on `block`.
pub(crate) fn grow_with(block: ChainTipBlock) -> Self { pub(crate) fn grow_with(block: ChainTipBlock) -> Self {
Grow { block } Grow { block }

View File

@ -33,6 +33,7 @@ use tower::util::BoxService;
use crate::{ use crate::{
components::{ components::{
mempool::{self, Mempool}, mempool::{self, Mempool},
sync,
tokio::{RuntimeRun, TokioComponent}, tokio::{RuntimeRun, TokioComponent},
ChainSync, Inbound, ChainSync, Inbound,
}, },
@ -105,11 +106,26 @@ impl StartCmd {
.send((peer_set.clone(), address_book, mempool.clone())) .send((peer_set.clone(), address_book, mempool.clone()))
.map_err(|_| eyre!("could not send setup data to inbound service"))?; .map_err(|_| eyre!("could not send setup data to inbound service"))?;
let syncer_error_future = syncer.sync();
let sync_gossip_task_handle = tokio::spawn(sync::gossip_best_tip_block_hashes(
sync_status.clone(),
chain_tip_change,
peer_set.clone(),
));
let mempool_crawler_task_handle = mempool::Crawler::spawn(peer_set, mempool, sync_status);
select! { select! {
result = syncer.sync().fuse() => result, sync_result = syncer_error_future.fuse() => sync_result,
_ = mempool::Crawler::spawn(peer_set, mempool, sync_status).fuse() => {
unreachable!("The mempool crawler only stops if it panics"); sync_gossip_result = sync_gossip_task_handle.fuse() => sync_gossip_result
} .expect("unexpected panic in the chain tip block gossip task")
.map_err(|e| eyre!(e)),
mempool_crawl_result = mempool_crawler_task_handle.fuse() => mempool_crawl_result
.expect("unexpected panic in the mempool crawler")
.map_err(|e| eyre!(e)),
} }
} }
} }

View File

@ -8,7 +8,7 @@
mod inbound; mod inbound;
pub mod mempool; pub mod mempool;
pub mod metrics; pub mod metrics;
mod sync; pub mod sync;
pub mod tokio; pub mod tokio;
pub mod tracing; pub mod tracing;

View File

@ -1,38 +1,42 @@
use std::{collections::HashSet, iter::FromIterator, net::SocketAddr, str::FromStr, sync::Arc}; use std::{collections::HashSet, iter::FromIterator, net::SocketAddr, str::FromStr, sync::Arc};
use super::mempool::{unmined_transactions_in_blocks, Mempool};
use crate::components::sync::SyncStatus;
use futures::FutureExt; use futures::FutureExt;
use tokio::sync::oneshot; use tokio::{sync::oneshot, task::JoinHandle};
use tower::{ use tower::{
buffer::Buffer, builder::ServiceBuilder, load_shed::LoadShed, util::BoxService, Service, buffer::Buffer, builder::ServiceBuilder, load_shed::LoadShed, util::BoxService, Service,
ServiceExt, ServiceExt,
}; };
use tracing::Span; use tracing::Span;
use zebra_chain::{ use zebra_chain::{
block::Block, block::Block,
parameters::Network, parameters::Network,
serialization::ZcashDeserializeInto, serialization::ZcashDeserializeInto,
transaction::{UnminedTx, UnminedTxId}, transaction::{UnminedTx, UnminedTxId},
}; };
use zebra_consensus::{error::TransactionError, transaction, Config as ConsensusConfig}; use zebra_consensus::{error::TransactionError, transaction, Config as ConsensusConfig};
use zebra_network::{AddressBook, Request, Response}; use zebra_network::{AddressBook, Request, Response};
use zebra_state::Config as StateConfig; use zebra_state::Config as StateConfig;
use zebra_test::mock_service::{MockService, PanicAssertion}; use zebra_test::mock_service::{MockService, PanicAssertion};
use crate::components::{
mempool::{unmined_transactions_in_blocks, Mempool},
sync::{self, BlockGossipError, SyncStatus},
};
#[tokio::test] #[tokio::test]
async fn mempool_requests_for_transactions() { async fn mempool_requests_for_transactions() {
let (inbound_service, added_transactions, _, mut peer_set, _) = setup(true).await; let (
inbound_service,
_committed_blocks,
added_transactions,
_mock_tx_verifier,
mut peer_set,
_state_guard,
sync_gossip_task_handle,
) = setup(true).await;
let added_transaction_ids: Vec<UnminedTxId> = added_transactions let added_transaction_ids: Vec<UnminedTxId> = added_transactions.iter().map(|t| t.id).collect();
.clone()
.unwrap()
.iter()
.map(|t| t.id)
.collect();
// Test `Request::MempoolTransactionIds` // Test `Request::MempoolTransactionIds`
let response = inbound_service let response = inbound_service
@ -58,11 +62,19 @@ async fn mempool_requests_for_transactions() {
.await; .await;
match response { match response {
Ok(Response::Transactions(response)) => assert_eq!(response, added_transactions.unwrap()), Ok(Response::Transactions(response)) => assert_eq!(response, added_transactions),
_ => unreachable!("`TransactionsById` requests should always respond `Ok(Vec<UnminedTx>)`"), _ => unreachable!("`TransactionsById` requests should always respond `Ok(Vec<UnminedTx>)`"),
}; };
// check that nothing unexpected happened
peer_set.expect_no_requests().await; peer_set.expect_no_requests().await;
let sync_gossip_result = sync_gossip_task_handle.now_or_never();
assert!(
matches!(sync_gossip_result, None),
"unexpected error or panic in sync gossip task: {:?}",
sync_gossip_result,
);
} }
#[tokio::test] #[tokio::test]
@ -74,7 +86,15 @@ async fn mempool_push_transaction() -> Result<(), crate::BoxError> {
// use the first transaction that is not coinbase // use the first transaction that is not coinbase
let tx = block.transactions[1].clone(); let tx = block.transactions[1].clone();
let (inbound_service, _, mut tx_verifier, mut peer_set, _) = setup(false).await; let (
inbound_service,
_committed_blocks,
_added_transactions,
mut tx_verifier,
mut peer_set,
_state_guard,
sync_gossip_task_handle,
) = setup(false).await;
// Test `Request::PushTransaction` // Test `Request::PushTransaction`
let request = inbound_service let request = inbound_service
@ -104,8 +124,16 @@ async fn mempool_push_transaction() -> Result<(), crate::BoxError> {
), ),
}; };
// check that nothing unexpected happened
peer_set.expect_no_requests().await; peer_set.expect_no_requests().await;
let sync_gossip_result = sync_gossip_task_handle.now_or_never();
assert!(
matches!(sync_gossip_result, None),
"unexpected error or panic in sync gossip task: {:?}",
sync_gossip_result,
);
Ok(()) Ok(())
} }
@ -123,7 +151,15 @@ async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> {
let test_transaction_id = test_transaction.unmined_id(); let test_transaction_id = test_transaction.unmined_id();
let txs = HashSet::from_iter([test_transaction_id]); let txs = HashSet::from_iter([test_transaction_id]);
let (inbound_service, _, mut tx_verifier, mut peer_set, _) = setup(false).await; let (
inbound_service,
_committed_blocks,
_added_transactions,
mut tx_verifier,
mut peer_set,
_state_guard,
sync_gossip_task_handle,
) = setup(false).await;
// Test `Request::AdvertiseTransactionIds` // Test `Request::AdvertiseTransactionIds`
let request = inbound_service let request = inbound_service
@ -164,8 +200,16 @@ async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> {
), ),
}; };
// check that nothing unexpected happened
peer_set.expect_no_requests().await; peer_set.expect_no_requests().await;
let sync_gossip_result = sync_gossip_task_handle.now_or_never();
assert!(
matches!(sync_gossip_result, None),
"unexpected error or panic in sync gossip task: {:?}",
sync_gossip_result,
);
Ok(()) Ok(())
} }
@ -187,7 +231,15 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> {
let mut tx2_id = tx2.unmined_id(); let mut tx2_id = tx2.unmined_id();
// Get services // Get services
let (inbound_service, _, mut tx_verifier, _peer_set, state_service) = setup(false).await; let (
inbound_service,
_committed_blocks,
_added_transactions,
mut tx_verifier,
mut peer_set,
state_service,
sync_gossip_task_handle,
) = setup(false).await;
// Push test transaction // Push test transaction
let request = inbound_service let request = inbound_service
@ -220,17 +272,22 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> {
}; };
// Add a new block to the state (make the chain tip advance) // Add a new block to the state (make the chain tip advance)
let block_one: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_2_BYTES let block_two: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_2_BYTES
.zcash_deserialize_into() .zcash_deserialize_into()
.unwrap(); .unwrap();
state_service state_service
.clone() .clone()
.oneshot(zebra_state::Request::CommitFinalizedBlock( .oneshot(zebra_state::Request::CommitFinalizedBlock(
block_one.clone().into(), block_two.clone().into(),
)) ))
.await .await
.unwrap(); .unwrap();
peer_set
.expect_request(Request::AdvertiseBlock(block_two.hash()))
.await
.respond(Response::Nil);
// Make sure tx1 is still in the mempool as it is not expired yet. // Make sure tx1 is still in the mempool as it is not expired yet.
let request = inbound_service let request = inbound_service
.clone() .clone()
@ -247,17 +304,22 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> {
}; };
// As our test transaction will expire at a block height greater or equal to 3 we need to push block 3. // As our test transaction will expire at a block height greater or equal to 3 we need to push block 3.
let block_two: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_3_BYTES let block_three: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_3_BYTES
.zcash_deserialize_into() .zcash_deserialize_into()
.unwrap(); .unwrap();
state_service state_service
.clone() .clone()
.oneshot(zebra_state::Request::CommitFinalizedBlock( .oneshot(zebra_state::Request::CommitFinalizedBlock(
block_two.clone().into(), block_three.clone().into(),
)) ))
.await .await
.unwrap(); .unwrap();
peer_set
.expect_request(Request::AdvertiseBlock(block_three.hash()))
.await
.respond(Response::Nil);
// Push a second transaction to trigger `remove_expired_transactions()` // Push a second transaction to trigger `remove_expired_transactions()`
let request = inbound_service let request = inbound_service
.clone() .clone()
@ -322,6 +384,11 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> {
.await .await
.unwrap(); .unwrap();
peer_set
.expect_request(Request::AdvertiseBlock(block.hash()))
.await
.respond(Response::Nil);
let request = inbound_service let request = inbound_service
.clone() .clone()
.oneshot(Request::MempoolTransactionIds) .oneshot(Request::MempoolTransactionIds)
@ -338,6 +405,16 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> {
}; };
} }
// check that nothing unexpected happened
peer_set.expect_no_requests().await;
let sync_gossip_result = sync_gossip_task_handle.now_or_never();
assert!(
matches!(sync_gossip_result, None),
"unexpected error or panic in sync gossip task: {:?}",
sync_gossip_result,
);
Ok(()) Ok(())
} }
@ -345,7 +422,8 @@ async fn setup(
add_transactions: bool, add_transactions: bool,
) -> ( ) -> (
LoadShed<tower::buffer::Buffer<super::Inbound, zebra_network::Request>>, LoadShed<tower::buffer::Buffer<super::Inbound, zebra_network::Request>>,
Option<Vec<UnminedTx>>, Vec<Arc<Block>>,
Vec<UnminedTx>,
MockService<transaction::Request, transaction::Response, PanicAssertion, TransactionError>, MockService<transaction::Request, transaction::Response, PanicAssertion, TransactionError>,
MockService<Request, Response, PanicAssertion>, MockService<Request, Response, PanicAssertion>,
Buffer< Buffer<
@ -356,6 +434,7 @@ async fn setup(
>, >,
zebra_state::Request, zebra_state::Request,
>, >,
JoinHandle<Result<(), BlockGossipError>>,
) { ) {
let network = Network::Mainnet; let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default(); let consensus_config = ConsensusConfig::default();
@ -372,12 +451,14 @@ async fn setup(
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await; .await;
let peer_set = MockService::build().for_unit_tests(); let mut peer_set = MockService::build().for_unit_tests();
let buffered_peer_set = Buffer::new(BoxService::new(peer_set.clone()), 10); let buffered_peer_set = Buffer::new(BoxService::new(peer_set.clone()), 10);
let mock_tx_verifier = MockService::build().for_unit_tests(); let mock_tx_verifier = MockService::build().for_unit_tests();
let buffered_tx_verifier = Buffer::new(BoxService::new(mock_tx_verifier.clone()), 10); let buffered_tx_verifier = Buffer::new(BoxService::new(mock_tx_verifier.clone()), 10);
let mut committed_blocks = Vec::new();
// Push the genesis block to the state. // Push the genesis block to the state.
// This must be done before creating the mempool to avoid `chain_tip_change` // This must be done before creating the mempool to avoid `chain_tip_change`
// returning "reset" which would clear the mempool. // returning "reset" which would clear the mempool.
@ -393,6 +474,7 @@ async fn setup(
)) ))
.await .await
.unwrap(); .unwrap();
committed_blocks.push(genesis_block);
// Also push block 1. // Also push block 1.
// Block one is a network upgrade and the mempool will be cleared at it, // Block one is a network upgrade and the mempool will be cleared at it,
@ -407,23 +489,24 @@ async fn setup(
)) ))
.await .await
.unwrap(); .unwrap();
committed_blocks.push(block_one);
let mut mempool_service = Mempool::new( let mut mempool_service = Mempool::new(
network, network,
buffered_peer_set.clone(), buffered_peer_set.clone(),
state_service.clone(), state_service.clone(),
buffered_tx_verifier.clone(), buffered_tx_verifier.clone(),
sync_status, sync_status.clone(),
latest_chain_tip, latest_chain_tip,
chain_tip_change, chain_tip_change.clone(),
); );
// Enable the mempool // Enable the mempool
let _ = mempool_service.enable(&mut recent_syncs).await; let _ = mempool_service.enable(&mut recent_syncs).await;
let mut added_transactions = None; let mut added_transactions = Vec::new();
if add_transactions { if add_transactions {
added_transactions = Some(add_some_stuff_to_mempool(&mut mempool_service, network)); added_transactions.extend(add_some_stuff_to_mempool(&mut mempool_service, network));
} }
let mempool_service = BoxService::new(mempool_service); let mempool_service = BoxService::new(mempool_service);
@ -442,14 +525,33 @@ async fn setup(
let r = setup_tx.send((buffered_peer_set, address_book, mempool)); let r = setup_tx.send((buffered_peer_set, address_book, mempool));
// We can't expect or unwrap because the returned Result does not implement Debug // We can't expect or unwrap because the returned Result does not implement Debug
assert!(r.is_ok()); assert!(r.is_ok(), "unexpected setup channel send failure");
let sync_gossip_task_handle = tokio::spawn(sync::gossip_best_tip_block_hashes(
sync_status.clone(),
chain_tip_change,
peer_set.clone(),
));
// Make sure there is an additional request broadcasting the
// committed blocks to peers.
//
// (The genesis block gets skipped, because block 1 is committed before the task is spawned.)
for block in committed_blocks.iter().skip(1) {
peer_set
.expect_request(Request::AdvertiseBlock(block.hash()))
.await
.respond(Response::Nil);
}
( (
inbound_service, inbound_service,
committed_blocks,
added_transactions, added_transactions,
mock_tx_verifier, mock_tx_verifier,
peer_set, peer_set,
state_service, state_service,
sync_gossip_task_handle,
) )
} }

View File

@ -69,6 +69,8 @@ where
/// Runs until the [`SyncStatus`] loses its connection to the chain syncer, which happens when /// Runs until the [`SyncStatus`] loses its connection to the chain syncer, which happens when
/// Zebra is shutting down. /// Zebra is shutting down.
pub async fn run(mut self) -> Result<(), BoxError> { pub async fn run(mut self) -> Result<(), BoxError> {
info!("initializing mempool crawler task");
while self.status.wait_until_close_to_tip().await.is_ok() { while self.status.wait_until_close_to_tip().await.is_ok() {
self.crawl_transactions().await?; self.crawl_transactions().await?;
sleep(RATE_LIMIT_DELAY).await; sleep(RATE_LIMIT_DELAY).await;

View File

@ -21,6 +21,7 @@ use zebra_state as zs;
use crate::{config::ZebradConfig, BoxError}; use crate::{config::ZebradConfig, BoxError};
mod downloads; mod downloads;
mod gossip;
mod recent_sync_lengths; mod recent_sync_lengths;
mod status; mod status;
@ -28,6 +29,8 @@ mod status;
mod tests; mod tests;
use downloads::{AlwaysHedge, Downloads}; use downloads::{AlwaysHedge, Downloads};
pub use gossip::{gossip_best_tip_block_hashes, BlockGossipError};
pub use recent_sync_lengths::RecentSyncLengths; pub use recent_sync_lengths::RecentSyncLengths;
pub use status::SyncStatus; pub use status::SyncStatus;

View File

@ -0,0 +1,85 @@
//! A task that gossips newly verified [`block::Hash`]es to peers.
use thiserror::Error;
use tokio::sync::watch;
use tower::{timeout::Timeout, Service, ServiceExt};
use zebra_network as zn;
use zebra_state::ChainTipChange;
use crate::BoxError;
use super::{SyncStatus, TIPS_RESPONSE_TIMEOUT};
use BlockGossipError::*;
/// Errors that can occur when gossiping committed blocks
#[derive(Error, Debug)]
pub enum BlockGossipError {
#[error("chain tip sender was dropped")]
TipChange(watch::error::RecvError),
#[error("sync status sender was dropped")]
SyncStatus(watch::error::RecvError),
#[error("permanent peer set failure")]
PeerSetReadiness(zn::BoxError),
}
/// Run continuously, gossiping newly verified [`block::Hash`]es to peers.
///
/// Once the state has reached the chain tip, broadcast the [`block::Hash`]es
/// of newly verified blocks to all ready peers.
///
/// Blocks are only gossiped if they are:
/// - on the best chain, and
/// - the most recent block verified since the last gossip.
///
/// In particular, if a lot of blocks are committed at the same time,
/// gossips will be disabled or skipped until the state reaches the latest tip.
pub async fn gossip_best_tip_block_hashes<ZN>(
mut sync_status: SyncStatus,
mut chain_state: ChainTipChange,
broadcast_network: ZN,
) -> Result<(), BlockGossipError>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
ZN::Future: Send,
{
info!("initializing block gossip task");
// use the same timeout as tips requests,
// so broadcasts don't delay the syncer too long
let mut broadcast_network = Timeout::new(broadcast_network, TIPS_RESPONSE_TIMEOUT);
loop {
// wait for at least one tip change, to make sure we have a new block hash to broadcast
let tip_action = chain_state.wait_for_tip_change().await.map_err(TipChange)?;
// wait until we're close to the tip, because broadcasts are only useful for nodes near the tip
// (if they're a long way from the tip, they use the syncer and block locators)
sync_status
.wait_until_close_to_tip()
.await
.map_err(SyncStatus)?;
// get the latest tip change - it might be different to the change we awaited,
// because the syncer might take a long time to reach the tip
let tip_action = chain_state.last_tip_change().unwrap_or(tip_action);
// block broadcasts inform other nodes about new blocks,
// so our internal Grow or Reset state doesn't matter to them
let request = zn::Request::AdvertiseBlock(tip_action.best_tip_hash());
let height = tip_action.best_tip_height();
info!(?height, ?request, "sending committed block broadcast");
// broadcast requests don't return errors, and we'd just want to ignore them anyway
let _ = broadcast_network
.ready_and()
.await
.map_err(PeerSetReadiness)?
.call(request)
.await;
}
}