Fix syncer download order and add sync tests (#3168)

* Refactor so that RetryLimit::Future is std::marker::Sync

* Make the syncer future std::marker::Send by spawning tips futures

* Download synced blocks in chain order, not HashSet order

* Improve MockService failure messages

* Add closure-based responses to the MockService API

* Move MockChainTip to zebra-chain

* Add a MockChainTipSender type alias

* Support MockChainTip in ChainSync and its downloader

* Add syncer tests for obtain tips, extend tips, and wrong block hashes

* Add block too high tests for obtain tips and extend tips

* Add syncer tests for duplicate FindBlocks response hashes

* Allow longer request delays for mocked services in syncer tests
This commit is contained in:
teor 2022-01-12 03:11:35 +10:00 committed by GitHub
parent fc1a1cdac1
commit d076b999f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1293 additions and 129 deletions

2
Cargo.lock generated
View File

@ -4376,6 +4376,7 @@ dependencies = [
"static_assertions",
"subtle",
"thiserror",
"tokio",
"tracing",
"uint",
"x25519-dalek",
@ -4564,6 +4565,7 @@ dependencies = [
"futures",
"gumdrop",
"hyper",
"indexmap",
"inferno",
"lazy_static",
"metrics",

View File

@ -9,7 +9,7 @@ edition = "2018"
[features]
default = []
proptest-impl = ["proptest", "proptest-derive", "zebra-test", "rand", "rand_chacha"]
proptest-impl = ["proptest", "proptest-derive", "zebra-test", "rand", "rand_chacha", "tokio"]
bench = ["zebra-test"]
[dependencies]
@ -60,6 +60,7 @@ proptest-derive = { version = "0.3.0", optional = true }
rand = { version = "0.8", optional = true }
rand_chacha = { version = "0.3", optional = true }
tokio = { version = "1.15.0", optional = true }
# ZF deps
ed25519-zebra = "3.0.0"

View File

@ -4,6 +4,9 @@ use std::sync::Arc;
use crate::{block, transaction};
#[cfg(any(test, feature = "proptest-impl"))]
pub mod mock;
/// An interface for querying the chain tip.
///
/// This trait helps avoid dependencies between:

View File

@ -0,0 +1,48 @@
//! Mock [`ChainTip`]s for use in tests.
use std::sync::Arc;
use tokio::sync::watch;
use crate::{block, chain_tip::ChainTip, transaction};
/// A sender that sets the `best_tip_height` of a [`MockChainTip`].]
pub type MockChainTipSender = watch::Sender<Option<block::Height>>;
/// A mock [`ChainTip`] implementation that allows setting the `best_tip_height` externally.
#[derive(Clone, Debug)]
pub struct MockChainTip {
best_tip_height: watch::Receiver<Option<block::Height>>,
}
impl MockChainTip {
/// Create a new [`MockChainTip`].
///
/// Returns the [`MockChainTip`] instance and the endpoint to modiy the current best tip
/// height.
///
/// Initially, the best tip height is [`None`].
pub fn new() -> (Self, MockChainTipSender) {
let (sender, receiver) = watch::channel(None);
let mock_chain_tip = MockChainTip {
best_tip_height: receiver,
};
(mock_chain_tip, sender)
}
}
impl ChainTip for MockChainTip {
fn best_tip_height(&self) -> Option<block::Height> {
*self.best_tip_height.borrow()
}
fn best_tip_hash(&self) -> Option<block::Hash> {
unreachable!("Method not used in tests");
}
fn best_tip_mined_transaction_ids(&self) -> Arc<[transaction::Hash]> {
unreachable!("Method not used in tests");
}
}

View File

@ -1,54 +1,17 @@
use std::sync::Arc;
//! Test utilities and tests for minimum network peer version requirements.
use tokio::sync::watch;
use zebra_chain::{block, chain_tip::ChainTip, parameters::Network, transaction};
use zebra_chain::{
chain_tip::mock::{MockChainTip, MockChainTipSender},
parameters::Network,
};
use super::MinimumPeerVersion;
#[cfg(test)]
mod prop;
/// A mock [`ChainTip`] implementation that allows setting the `best_tip_height` externally.
#[derive(Clone)]
pub struct MockChainTip {
best_tip_height: watch::Receiver<Option<block::Height>>,
}
impl MockChainTip {
/// Create a new [`MockChainTip`].
///
/// Returns the [`MockChainTip`] instance and the endpoint to modiy the current best tip
/// height.
///
/// Initially, the best tip height is [`None`].
pub fn new() -> (Self, watch::Sender<Option<block::Height>>) {
let (sender, receiver) = watch::channel(None);
let mock_chain_tip = MockChainTip {
best_tip_height: receiver,
};
(mock_chain_tip, sender)
}
}
impl ChainTip for MockChainTip {
fn best_tip_height(&self) -> Option<block::Height> {
*self.best_tip_height.borrow()
}
fn best_tip_hash(&self) -> Option<block::Hash> {
unreachable!("Method not used in `MinimumPeerVersion` tests");
}
fn best_tip_mined_transaction_ids(&self) -> Arc<[transaction::Hash]> {
unreachable!("Method not used in `MinimumPeerVersion` tests");
}
}
impl MinimumPeerVersion<MockChainTip> {
pub fn with_mock_chain_tip(network: Network) -> (Self, watch::Sender<Option<block::Height>>) {
pub fn with_mock_chain_tip(network: Network) -> (Self, MockChainTipSender) {
let (chain_tip, best_tip_height) = MockChainTip::new();
let minimum_peer_version = MinimumPeerVersion::new(chain_tip, network);

View File

@ -6,7 +6,7 @@ use tower::retry::Policy;
/// A very basic retry policy with a limited number of retry attempts.
///
/// XXX Remove this when https://github.com/tower-rs/tower/pull/414 lands.
#[derive(Clone, Debug)]
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct RetryLimit {
remaining_tries: usize,
}
@ -21,25 +21,23 @@ impl RetryLimit {
}
impl<Req: Clone + std::fmt::Debug, Res, E: std::fmt::Debug> Policy<Req, Res, E> for RetryLimit {
type Future = Pin<Box<dyn Future<Output = Self> + Send + 'static>>;
type Future = Pin<Box<dyn Future<Output = Self> + Send + Sync + 'static>>;
fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option<Self::Future> {
if let Err(e) = result {
if self.remaining_tries > 0 {
tracing::debug!(?req, ?e, remaining_tries = self.remaining_tries, "retrying");
let remaining_tries = self.remaining_tries - 1;
let retry_outcome = RetryLimit { remaining_tries };
Some(
async move {
// Let other tasks run, so we're more likely to choose a different peer,
// and so that any notfound inv entries win the race to the PeerSet.
//
// TODO: move syncer retries into the PeerSet,
// so we always choose different peers (#3235)
tokio::task::yield_now().await;
RetryLimit { remaining_tries }
}
.boxed(),
// Let other tasks run, so we're more likely to choose a different peer,
// and so that any notfound inv entries win the race to the PeerSet.
//
// TODO: move syncer retries into the PeerSet,
// so we always choose different peers (#3235)
Box::pin(tokio::task::yield_now().map(move |()| retry_outcome)),
)
} else {
None

View File

@ -306,12 +306,13 @@ impl<Request, Response, Error> MockService<Request, Response, PanicAssertion, Er
/// # let mut service = mock_service.clone();
/// #
/// let call = tokio::spawn(mock_service.clone().oneshot("request"));
///
///
/// mock_service.expect_request("request").await.respond("response");
///
/// assert!(matches!(call.await, Ok(Ok("response"))));
/// # });
/// ```
#[track_caller]
pub async fn expect_request(
&mut self,
expected: Request,
@ -321,7 +322,13 @@ impl<Request, Response, Error> MockService<Request, Response, PanicAssertion, Er
{
let response_sender = self.next_request().await;
assert_eq!(response_sender.request, expected);
assert_eq!(
response_sender.request,
expected,
"received an unexpected request\n \
in {}",
std::any::type_name::<Self>(),
);
response_sender
}
@ -362,13 +369,23 @@ impl<Request, Response, Error> MockService<Request, Response, PanicAssertion, Er
/// assert!(matches!(call.await, Ok(Ok("response"))));
/// # });
/// ```
#[track_caller]
pub async fn expect_request_that(
&mut self,
condition: impl FnOnce(&Request) -> bool,
) -> ResponseSender<Request, Response, Error> {
) -> ResponseSender<Request, Response, Error>
where
Request: Debug,
{
let response_sender = self.next_request().await;
assert!(condition(&response_sender.request));
assert!(
condition(&response_sender.request),
"condition was false for request: {:?},\n \
in {}",
response_sender.request,
std::any::type_name::<Self>(),
);
response_sender
}
@ -400,14 +417,17 @@ impl<Request, Response, Error> MockService<Request, Response, PanicAssertion, Er
/// mock_service.expect_no_requests().await;
/// # });
/// ```
#[track_caller]
pub async fn expect_no_requests(&mut self)
where
Request: Debug,
{
if let Some(response_sender) = self.try_next_request().await {
panic!(
"Received an unexpected request: {:?}",
response_sender.request
"received an unexpected request: {:?},\n \
in {}",
response_sender.request,
std::any::type_name::<Self>(),
);
}
}
@ -422,10 +442,15 @@ impl<Request, Response, Error> MockService<Request, Response, PanicAssertion, Er
///
/// If the queue is empty and a request is not received before the max request delay timeout
/// expires.
#[track_caller]
async fn next_request(&mut self) -> ResponseSender<Request, Response, Error> {
match self.try_next_request().await {
Some(request) => request,
None => panic!("Timeout while waiting for a request"),
None => panic!(
"timeout while waiting for a request\n \
in {}",
std::any::type_name::<Self>(),
),
}
}
}
@ -478,6 +503,7 @@ impl<Request, Response, Error> MockService<Request, Response, PropTestAssertion,
/// # test_code().await
/// # }).unwrap();
/// ```
#[track_caller]
pub async fn expect_request(
&mut self,
expected: Request,
@ -487,7 +513,13 @@ impl<Request, Response, Error> MockService<Request, Response, PropTestAssertion,
{
let response_sender = self.next_request().await?;
prop_assert_eq!(&response_sender.request, &expected);
prop_assert_eq!(
&response_sender.request,
&expected,
"received an unexpected request\n \
in {}",
std::any::type_name::<Self>(),
);
Ok(response_sender)
}
@ -538,13 +570,23 @@ impl<Request, Response, Error> MockService<Request, Response, PropTestAssertion,
/// # test_code().await
/// # }).unwrap();
/// ```
#[track_caller]
pub async fn expect_request_that(
&mut self,
condition: impl FnOnce(&Request) -> bool,
) -> Result<ResponseSender<Request, Response, Error>, TestCaseError> {
) -> Result<ResponseSender<Request, Response, Error>, TestCaseError>
where
Request: Debug,
{
let response_sender = self.next_request().await?;
prop_assert!(condition(&response_sender.request));
prop_assert!(
condition(&response_sender.request),
"condition was false for request: {:?},\n \
in {}",
&response_sender.request,
std::any::type_name::<Self>(),
);
Ok(response_sender)
}
@ -583,6 +625,7 @@ impl<Request, Response, Error> MockService<Request, Response, PropTestAssertion,
/// # test_code().await
/// # }).unwrap();
/// ```
#[track_caller]
pub async fn expect_no_requests(&mut self) -> Result<(), TestCaseError>
where
Request: Debug,
@ -591,8 +634,10 @@ impl<Request, Response, Error> MockService<Request, Response, PropTestAssertion,
Some(response_sender) => {
prop_assert!(
false,
"Received an unexpected request: {:?}",
response_sender.request
"received an unexpected request: {:?},\n \
in {}",
response_sender.request,
std::any::type_name::<Self>(),
);
unreachable!("prop_assert!(false) returns an early error");
}
@ -608,13 +653,19 @@ impl<Request, Response, Error> MockService<Request, Response, PropTestAssertion,
///
/// If the queue is empty and a request is not received before the max request delay timeout
/// expires, an error generated by a [`proptest`] assertion is returned.
#[track_caller]
async fn next_request(
&mut self,
) -> Result<ResponseSender<Request, Response, Error>, TestCaseError> {
match self.try_next_request().await {
Some(request) => Ok(request),
None => {
prop_assert!(false, "Timeout while waiting for a request");
prop_assert!(
false,
"timeout while waiting for a request\n \
in {}",
std::any::type_name::<Self>(),
);
unreachable!("prop_assert!(false) returns an early error");
}
}
@ -643,7 +694,7 @@ impl<Request, Response, Assertion, Error> MockService<Request, Response, Asserti
}
}
Ok(Err(RecvError::Lagged(_))) => continue,
Ok(Err(RecvError::Closed)) => unreachable!("Sender is never closed"),
Ok(Err(RecvError::Closed)) => unreachable!("sender is never closed"),
Err(_timeout) => return None,
}
}
@ -685,7 +736,7 @@ impl<Request, Response, Error> ResponseSender<Request, Response, Error> {
&self.request
}
/// Respond to the request.
/// Respond to the request using a fixed response value.
///
/// The `response` can be of the `Response` type or a [`Result`]. This allows sending an error
/// representing an error while processing the request.
@ -693,7 +744,7 @@ impl<Request, Response, Error> ResponseSender<Request, Response, Error> {
/// This method takes ownership of the [`ResponseSender`] so that only one response can be
/// sent.
///
/// If this method is not called, the caller will panic.
/// If `respond` or `respond_with` are not called, the caller will panic.
///
/// # Example
///
@ -733,6 +784,60 @@ impl<Request, Response, Error> ResponseSender<Request, Response, Error> {
pub fn respond(self, response: impl ResponseResult<Response, Error>) {
let _ = self.response_sender.send(response.into_result());
}
/// Respond to the request by calculating a value from the request.
///
/// The response can be of the `Response` type or a [`Result`]. This allows sending an error
/// representing an error while processing the request.
///
/// This method takes ownership of the [`ResponseSender`] so that only one response can be
/// sent.
///
/// If `respond` or `respond_with` are not called, the caller will panic.
///
/// # Example
///
/// ```
/// # use zebra_test::mock_service::MockService;
/// # use tower::{Service, ServiceExt};
/// #
/// # let reactor = tokio::runtime::Builder::new_current_thread()
/// # .enable_all()
/// # .build()
/// # .expect("Failed to build Tokio runtime");
/// #
/// # reactor.block_on(async {
/// // Mock a service with a `String` as the service `Error` type.
/// let mut mock_service: MockService<_, _, _, String> =
/// MockService::build().for_unit_tests();
///
/// # let mut service = mock_service.clone();
/// # let task = tokio::spawn(async move {
/// # let first_call_result = (&mut service).oneshot(1).await;
/// # let second_call_result = service.oneshot(1).await;
/// #
/// # (first_call_result, second_call_result)
/// # });
/// #
/// mock_service
/// .expect_request(1)
/// .await
/// .respond_with(|req| format!("Received: {}", req));
///
/// mock_service
/// .expect_request(1)
/// .await
/// .respond_with(|req| Err(format!("Duplicate request: {}", req)));
/// # });
/// ```
pub fn respond_with<F, R>(self, response_fn: F)
where
F: FnOnce(&Request) -> R,
R: ResponseResult<Response, Error>,
{
let response_result = response_fn(self.request()).into_result();
let _ = self.response_sender.send(response_result);
}
}
/// A representation of an assertion type.
@ -757,7 +862,7 @@ impl AssertionType for PropTestAssertion {}
/// A helper trait to improve ergonomics when sending a response.
///
/// This allows the [`ResponseSender::respond`] method to receive either a [`Result`] or just the
/// response type that is wrapped in an `Ok` variant.
/// response type, which it automatically wraps in an `Ok` variant.
pub trait ResponseResult<Response, Error> {
/// Converts the type into a [`Result`] that can be sent as a response.
fn into_result(self) -> Result<Response, Error>;

View File

@ -18,6 +18,7 @@ zebra-state = { path = "../zebra-state" }
abscissa_core = "0.5"
chrono = "0.4"
gumdrop = "0.7"
indexmap = "1.7.0"
lazy_static = "1.4.0"
serde = { version = "1", features = ["serde_derive"] }
toml = "0.5"

View File

@ -144,9 +144,9 @@ impl StartCmd {
.send(setup_data)
.map_err(|_| eyre!("could not send setup data to inbound service"))?;
let syncer_error_future = syncer.sync();
let syncer_task_handle = tokio::spawn(syncer.sync());
let mut sync_gossip_task_handle = tokio::spawn(sync::gossip_best_tip_block_hashes(
let mut block_gossip_task_handle = tokio::spawn(sync::gossip_best_tip_block_hashes(
sync_status.clone(),
chain_tip_change.clone(),
peer_set.clone(),
@ -169,11 +169,10 @@ impl StartCmd {
info!("spawned initial Zebra tasks");
// TODO: spawn the syncer task, after making the PeerSet marker::Sync and marker::Send
// turn these tasks into a FuturesUnordered?
// TODO: put tasks into an ongoing FuturesUnordered and a startup FuturesUnordered?
// ongoing futures & tasks
pin!(syncer_error_future);
// ongoing tasks
pin!(syncer_task_handle);
pin!(mempool_crawler_task_handle);
pin!(mempool_queue_checker_task_handle);
pin!(tx_gossip_task_handle);
@ -187,12 +186,11 @@ impl StartCmd {
let mut exit_when_task_finishes = true;
let result = select! {
// We don't spawn the syncer future into a separate task yet.
// So syncer panics automatically propagate to the main zebrad task.
sync_result = &mut syncer_error_future => sync_result
sync_result = &mut syncer_task_handle => sync_result
.expect("unexpected panic in the syncer task")
.map(|_| info!("syncer task exited")),
sync_gossip_result = &mut sync_gossip_task_handle => sync_gossip_result
block_gossip_result = &mut block_gossip_task_handle => block_gossip_result
.expect("unexpected panic in the chain tip block gossip task")
.map(|_| info!("chain tip block gossip task exited"))
.map_err(|e| eyre!(e)),
@ -238,11 +236,9 @@ impl StartCmd {
info!("exiting Zebra because an ongoing task exited: stopping other tasks");
// futures
std::mem::drop(syncer_error_future);
// ongoing tasks
sync_gossip_task_handle.abort();
syncer_task_handle.abort();
block_gossip_task_handle.abort();
mempool_crawler_task_handle.abort();
mempool_queue_checker_task_handle.abort();
tx_gossip_task_handle.abort();

View File

@ -6,6 +6,7 @@ use std::{collections::HashSet, pin::Pin, sync::Arc, task::Poll, time::Duration}
use color_eyre::eyre::{eyre, Report};
use futures::stream::{FuturesUnordered, StreamExt};
use indexmap::IndexSet;
use tokio::time::sleep;
use tower::{
builder::ServiceBuilder, hedge::Hedge, limit::ConcurrencyLimit, retry::Retry, timeout::Timeout,
@ -14,6 +15,7 @@ use tower::{
use zebra_chain::{
block::{self, Block},
chain_tip::ChainTip,
parameters::genesis_hash,
};
use zebra_consensus::{
@ -21,7 +23,6 @@ use zebra_consensus::{
};
use zebra_network as zn;
use zebra_state as zs;
use zs::LatestChainTip;
use crate::{
components::sync::downloads::BlockDownloadVerifyError, config::ZebradConfig, BoxError,
@ -184,14 +185,27 @@ struct CheckedTip {
expected_next: block::Hash,
}
pub struct ChainSync<ZN, ZS, ZV>
pub struct ChainSync<ZN, ZS, ZV, ZSTip>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
ZN::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
ZS::Future: Send,
ZV: Service<Arc<Block>, Response = block::Hash, Error = BoxError> + Send + Clone + 'static,
ZV: Service<Arc<Block>, Response = block::Hash, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
ZV::Future: Send,
ZSTip: ChainTip + Clone + Send + 'static,
{
// Configuration
/// The genesis hash for the configured network
@ -214,6 +228,7 @@ where
Downloads<
Hedge<ConcurrencyLimit<Retry<zn::RetryLimit, Timeout<ZN>>>, AlwaysHedge>,
Timeout<ZV>,
ZSTip,
>,
>,
>,
@ -235,14 +250,27 @@ where
/// This component is used for initial block sync, but the `Inbound` service is
/// responsible for participating in the gossip protocols used for block
/// diffusion.
impl<ZN, ZS, ZV> ChainSync<ZN, ZS, ZV>
impl<ZN, ZS, ZV, ZSTip> ChainSync<ZN, ZS, ZV, ZSTip>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
ZN::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
ZS::Future: Send,
ZV: Service<Arc<Block>, Response = block::Hash, Error = BoxError> + Send + Clone + 'static,
ZV: Service<Arc<Block>, Response = block::Hash, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
ZV::Future: Send,
ZSTip: ChainTip + Clone + Send + 'static,
{
/// Returns a new syncer instance, using:
/// - chain: the zebra-chain `Network` to download (Mainnet or Testnet)
@ -257,7 +285,7 @@ where
peers: ZN,
verifier: ZV,
state: ZS,
latest_chain_tip: LatestChainTip,
latest_chain_tip: ZSTip,
) -> (Self, SyncStatus) {
let tip_network = Timeout::new(peers.clone(), TIPS_RESPONSE_TIMEOUT);
// The Hedge middleware is the outermost layer, hedging requests
@ -442,17 +470,21 @@ where
tokio::task::yield_now().await;
}
requests.push(self.tip_network.ready().await.map_err(|e| eyre!(e))?.call(
let ready_tip_network = self.tip_network.ready().await;
requests.push(tokio::spawn(ready_tip_network.map_err(|e| eyre!(e))?.call(
zn::Request::FindBlocks {
known_blocks: block_locator.clone(),
stop: None,
},
));
)));
}
let mut download_set = HashSet::new();
let mut download_set = IndexSet::new();
while let Some(res) = requests.next().await {
match res.map_err::<Report, _>(|e| eyre!(e)) {
match res
.expect("panic in spawned obtain tips request")
.map_err::<Report, _>(|e| eyre!(e))
{
Ok(zn::Response::BlockHashes(hashes)) => {
tracing::trace!(?hashes);
@ -515,6 +547,9 @@ where
);
}
// security: the first response determines our download order
//
// TODO: can we make the download order independent of response order?
let prev_download_len = download_set.len();
download_set.extend(unknown_hashes);
let new_download_len = download_set.len();
@ -543,7 +578,7 @@ where
metrics::gauge!("sync.obtain.queued.hash.count", new_downloads as f64);
// security: use the actual number of new downloads from all peers,
// so a single trailing peer can't toggle our mempool
// so the last peer to respond can't toggle our mempool
self.recent_syncs.push_obtain_tips_length(new_downloads);
self.request_blocks(download_set).await?;
@ -555,7 +590,7 @@ where
async fn extend_tips(&mut self) -> Result<(), Report> {
let tips = std::mem::take(&mut self.prospective_tips);
let mut download_set = HashSet::new();
let mut download_set = IndexSet::new();
tracing::info!(tips = ?tips.len(), "trying to extend chain tips");
for tip in tips {
tracing::debug!(?tip, "asking peers to extend chain tip");
@ -568,15 +603,19 @@ where
tokio::task::yield_now().await;
}
responses.push(self.tip_network.ready().await.map_err(|e| eyre!(e))?.call(
let ready_tip_network = self.tip_network.ready().await;
responses.push(tokio::spawn(ready_tip_network.map_err(|e| eyre!(e))?.call(
zn::Request::FindBlocks {
known_blocks: vec![tip.tip],
stop: None,
},
));
)));
}
while let Some(res) = responses.next().await {
match res.map_err::<Report, _>(|e| eyre!(e)) {
match res
.expect("panic in spawned extend tips request")
.map_err::<Report, _>(|e| eyre!(e))
{
Ok(zn::Response::BlockHashes(hashes)) => {
tracing::debug!(first = ?hashes.first(), len = ?hashes.len());
tracing::trace!(?hashes);
@ -654,6 +693,9 @@ where
);
}
// security: the first response determines our download order
//
// TODO: can we make the download order independent of response order?
let prev_download_len = download_set.len();
download_set.extend(unknown_hashes);
let new_download_len = download_set.len();
@ -673,7 +715,7 @@ where
metrics::gauge!("sync.extend.queued.hash.count", new_downloads as f64);
// security: use the actual number of new downloads from all peers,
// so a single trailing peer can't toggle our mempool
// so the last peer to respond can't toggle our mempool
self.recent_syncs.push_extend_tips_length(new_downloads);
self.request_blocks(download_set).await?;
@ -710,7 +752,7 @@ where
}
/// Queue download and verify tasks for each block that isn't currently known to our node
async fn request_blocks(&mut self, hashes: HashSet<block::Hash>) -> Result<(), Report> {
async fn request_blocks(&mut self, hashes: IndexSet<block::Hash>) -> Result<(), Report> {
tracing::debug!(hashes.len = hashes.len(), "requesting blocks");
for hash in hashes.into_iter() {
self.downloads.download_and_verify(hash).await?;
@ -740,7 +782,7 @@ where
}
}
fn update_metrics(&self) {
fn update_metrics(&mut self) {
metrics::gauge!(
"sync.prospective_tips.len",
self.prospective_tips.len() as f64
@ -775,6 +817,14 @@ where
tracing::debug!(error = ?e, "block verification was cancelled, continuing");
false
}
BlockDownloadVerifyError::BehindTipHeightLimit => {
tracing::debug!(
error = ?e,
"block height is behind the current state tip, \
assuming the syncer will eventually catch up to the state, continuing"
);
false
}
// String matches
BlockDownloadVerifyError::Invalid(VerifyChainError::Block(
@ -806,6 +856,7 @@ where
if err_str.contains("AlreadyVerified")
|| err_str.contains("AlreadyInChain")
|| err_str.contains("Cancelled")
|| err_str.contains("BehindTipHeight")
|| err_str.contains("block is already committed to the state")
|| err_str.contains("NotFound")
{

View File

@ -36,17 +36,20 @@ type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// `lookahead_limit / VERIFICATION_PIPELINE_SCALING_DIVISOR`.
///
/// For the default lookahead limit, the extra number of blocks is
/// `2 * MAX_TIPS_RESPONSE_HASH_COUNT`.
/// `4 * MAX_TIPS_RESPONSE_HASH_COUNT`.
///
/// This allows the verifier and state queues to hold an extra two tips responses worth of blocks,
/// This allows the verifier and state queues to hold a few extra tips responses worth of blocks,
/// even if the syncer queue is full. Any unused capacity is shared between both queues.
///
/// If this capacity is exceeded, the downloader will start failing download blocks with
/// [`BlockDownloadVerifyError::AboveLookaheadHeightLimit`], and the syncer will reset.
///
/// Since the syncer queue is limited to the `lookahead_limit`,
/// the rest of the capacity is reserved for the other queues.
/// There is no reserved capacity for the syncer queue:
/// if the other queues stay full, the syncer will eventually time out and reset.
const VERIFICATION_PIPELINE_SCALING_DIVISOR: usize =
DEFAULT_LOOKAHEAD_LIMIT / (2 * MAX_TIPS_RESPONSE_HASH_COUNT);
DEFAULT_LOOKAHEAD_LIMIT / (4 * MAX_TIPS_RESPONSE_HASH_COUNT);
#[derive(Copy, Clone, Debug)]
pub(super) struct AlwaysHedge;
@ -92,12 +95,17 @@ pub enum BlockDownloadVerifyError {
/// Represents a [`Stream`] of download and verification tasks during chain sync.
#[pin_project]
#[derive(Debug)]
pub struct Downloads<ZN, ZV>
pub struct Downloads<ZN, ZV, ZSTip>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + 'static,
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Sync + 'static,
ZN::Future: Send,
ZV: Service<Arc<Block>, Response = block::Hash, Error = BoxError> + Send + Clone + 'static,
ZV: Service<Arc<Block>, Response = block::Hash, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
ZV::Future: Send,
ZSTip: ChainTip + Clone + Send + 'static,
{
// Services
/// A service that forwards requests to connected peers, and returns their
@ -108,7 +116,7 @@ where
verifier: ZV,
/// Allows efficient access to the best tip of the blockchain.
latest_chain_tip: zs::LatestChainTip,
latest_chain_tip: ZSTip,
// Configuration
/// The configured lookahead limit, after applying the minimum limit.
@ -125,12 +133,17 @@ where
cancel_handles: HashMap<block::Hash, oneshot::Sender<()>>,
}
impl<ZN, ZV> Stream for Downloads<ZN, ZV>
impl<ZN, ZV, ZSTip> Stream for Downloads<ZN, ZV, ZSTip>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + 'static,
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Sync + 'static,
ZN::Future: Send,
ZV: Service<Arc<Block>, Response = block::Hash, Error = BoxError> + Send + Clone + 'static,
ZV: Service<Arc<Block>, Response = block::Hash, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
ZV::Future: Send,
ZSTip: ChainTip + Clone + Send + 'static,
{
type Item = Result<block::Hash, BlockDownloadVerifyError>;
@ -166,12 +179,17 @@ where
}
}
impl<ZN, ZV> Downloads<ZN, ZV>
impl<ZN, ZV, ZSTip> Downloads<ZN, ZV, ZSTip>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + 'static,
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Sync + 'static,
ZN::Future: Send,
ZV: Service<Arc<Block>, Response = block::Hash, Error = BoxError> + Send + Clone + 'static,
ZV: Service<Arc<Block>, Response = block::Hash, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
ZV::Future: Send,
ZSTip: ChainTip + Clone + Send + 'static,
{
/// Initialize a new download stream with the provided `network` and
/// `verifier` services. Uses the `latest_chain_tip` and `lookahead_limit`
@ -180,12 +198,7 @@ where
/// The [`Downloads`] stream is agnostic to the network policy, so retry and
/// timeout limits should be applied to the `network` service passed into
/// this constructor.
pub fn new(
network: ZN,
verifier: ZV,
latest_chain_tip: zs::LatestChainTip,
lookahead_limit: usize,
) -> Self {
pub fn new(network: ZN, verifier: ZV, latest_chain_tip: ZSTip, lookahead_limit: usize) -> Self {
Self {
network,
verifier,
@ -395,7 +408,7 @@ where
}
/// Get the number of currently in-flight download tasks.
pub fn in_flight(&self) -> usize {
pub fn in_flight(&mut self) -> usize {
self.pending.len()
}
}

View File

@ -1 +1,4 @@
//! Syncer tests
mod timing;
mod vectors;

View File

@ -0,0 +1,980 @@
//! Fixed test vectors for the syncer.
use std::{collections::HashMap, iter, sync::Arc, time::Duration};
use color_eyre::Report;
use futures::{Future, FutureExt};
use zebra_chain::{
block::{self, Block},
chain_tip::mock::{MockChainTip, MockChainTipSender},
serialization::ZcashDeserializeInto,
};
use zebra_consensus::Config as ConsensusConfig;
use zebra_state::Config as StateConfig;
use zebra_test::mock_service::{MockService, PanicAssertion};
use zebra_network as zn;
use zebra_state as zs;
use crate::{
components::{
sync::{self, SyncStatus},
ChainSync,
},
config::ZebradConfig,
};
/// Maximum time to wait for a request to any test service.
///
/// The default [`MockService`] value can be too short for some of these tests that take a little
/// longer than expected to actually send the request.
///
/// Increasing this value causes the tests to take longer to complete, so it can't be too large.
const MAX_SERVICE_REQUEST_DELAY: Duration = Duration::from_millis(500);
/// Test that the syncer downloads genesis, blocks 1-2 using obtain_tips, and blocks 3-4 using extend_tips.
///
/// This test also makes sure that the syncer downloads blocks in order.
#[tokio::test(flavor = "multi_thread")]
async fn sync_blocks_ok() -> Result<(), crate::BoxError> {
// Get services
let (
chain_sync_future,
_sync_status,
mut chain_verifier,
mut peer_set,
mut state_service,
_mock_chain_tip_sender,
) = setup();
// Get blocks
let block0: Arc<Block> =
zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES.zcash_deserialize_into()?;
let block0_hash = block0.hash();
let block1: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_1_BYTES.zcash_deserialize_into()?;
let block1_hash = block1.hash();
let block2: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_2_BYTES.zcash_deserialize_into()?;
let block2_hash = block2.hash();
let block3: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_3_BYTES.zcash_deserialize_into()?;
let block3_hash = block3.hash();
let block4: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_4_BYTES.zcash_deserialize_into()?;
let block4_hash = block4.hash();
let block5: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_5_BYTES.zcash_deserialize_into()?;
let block5_hash = block5.hash();
// Start the syncer
let chain_sync_task_handle = tokio::spawn(chain_sync_future);
// ChainSync::request_genesis
// State is checked for genesis
state_service
.expect_request(zs::Request::Depth(block0_hash))
.await
.respond(zs::Response::Depth(None));
// Block 0 is fetched and committed to the state
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block0.clone()]));
chain_verifier
.expect_request(block0)
.await
.respond(block0_hash);
// Check that nothing unexpected happened.
// We expect more requests to the state service, because the syncer keeps on running.
peer_set.expect_no_requests().await;
chain_verifier.expect_no_requests().await;
// State is checked for genesis again
state_service
.expect_request(zs::Request::Depth(block0_hash))
.await
.respond(zs::Response::Depth(Some(0)));
// ChainSync::obtain_tips
// State is asked for a block locator.
state_service
.expect_request(zs::Request::BlockLocator)
.await
.respond(zs::Response::BlockLocator(vec![block0_hash]));
// Network is sent the block locator
peer_set
.expect_request(zn::Request::FindBlocks {
known_blocks: vec![block0_hash],
stop: None,
})
.await
.respond(zn::Response::BlockHashes(vec![
block1_hash, // tip
block2_hash, // expected_next
block3_hash, // (discarded - last hash, possibly incorrect)
]));
// State is checked for the first unknown block (block 1)
state_service
.expect_request(zs::Request::Depth(block1_hash))
.await
.respond(zs::Response::Depth(None));
// Clear remaining block locator requests
for _ in 0..(sync::FANOUT - 1) {
peer_set
.expect_request(zn::Request::FindBlocks {
known_blocks: vec![block0_hash],
stop: None,
})
.await
.respond(Err(zn::BoxError::from("synthetic test obtain tips error")));
}
// Check that nothing unexpected happened.
peer_set.expect_no_requests().await;
chain_verifier.expect_no_requests().await;
// State is checked for all non-tip blocks (blocks 1 & 2) in response order
state_service
.expect_request(zs::Request::Depth(block1_hash))
.await
.respond(zs::Response::Depth(None));
state_service
.expect_request(zs::Request::Depth(block2_hash))
.await
.respond(zs::Response::Depth(None));
// Blocks 1 & 2 are fetched in order, then verified concurrently
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block1_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block1.clone()]));
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block2_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block2.clone()]));
// We can't guarantee the verification request order
let mut remaining_blocks: HashMap<block::Hash, Arc<Block>> =
[(block1_hash, block1), (block2_hash, block2)]
.iter()
.cloned()
.collect();
for _ in 1..=2 {
chain_verifier
.expect_request_that(|req| remaining_blocks.remove(&req.hash()).is_some())
.await
.respond_with(|req| req.hash());
}
assert_eq!(
remaining_blocks,
HashMap::new(),
"expected all non-tip blocks to be verified by obtain tips"
);
// Check that nothing unexpected happened.
chain_verifier.expect_no_requests().await;
state_service.expect_no_requests().await;
// ChainSync::extend_tips
// Network is sent a block locator based on the tip
peer_set
.expect_request(zn::Request::FindBlocks {
known_blocks: vec![block1_hash],
stop: None,
})
.await
.respond(zn::Response::BlockHashes(vec![
block2_hash, // tip (discarded - already fetched)
block3_hash, // expected_next
block4_hash,
block5_hash, // (discarded - last hash, possibly incorrect)
]));
// Clear remaining block locator requests
for _ in 0..(sync::FANOUT - 1) {
peer_set
.expect_request(zn::Request::FindBlocks {
known_blocks: vec![block1_hash],
stop: None,
})
.await
.respond(Err(zn::BoxError::from("synthetic test extend tips error")));
}
// Check that nothing unexpected happened.
chain_verifier.expect_no_requests().await;
state_service.expect_no_requests().await;
// Blocks 3 & 4 are fetched in order, then verified concurrently
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block3_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block3.clone()]));
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block4_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block4.clone()]));
// We can't guarantee the verification request order
let mut remaining_blocks: HashMap<block::Hash, Arc<Block>> =
[(block3_hash, block3), (block4_hash, block4)]
.iter()
.cloned()
.collect();
for _ in 3..=4 {
chain_verifier
.expect_request_that(|req| remaining_blocks.remove(&req.hash()).is_some())
.await
.respond_with(|req| req.hash());
}
assert_eq!(
remaining_blocks,
HashMap::new(),
"expected all non-tip blocks to be verified by extend tips"
);
// Check that nothing unexpected happened.
chain_verifier.expect_no_requests().await;
state_service.expect_no_requests().await;
let chain_sync_result = chain_sync_task_handle.now_or_never();
assert!(
matches!(chain_sync_result, None),
"unexpected error or panic in chain sync task: {:?}",
chain_sync_result,
);
Ok(())
}
/// Test that the syncer downloads genesis, blocks 1-2 using obtain_tips, and blocks 3-4 using extend_tips,
/// with duplicate block hashes.
///
/// This test also makes sure that the syncer downloads blocks in order.
#[tokio::test(flavor = "multi_thread")]
async fn sync_blocks_duplicate_hashes_ok() -> Result<(), crate::BoxError> {
// Get services
let (
chain_sync_future,
_sync_status,
mut chain_verifier,
mut peer_set,
mut state_service,
_mock_chain_tip_sender,
) = setup();
// Get blocks
let block0: Arc<Block> =
zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES.zcash_deserialize_into()?;
let block0_hash = block0.hash();
let block1: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_1_BYTES.zcash_deserialize_into()?;
let block1_hash = block1.hash();
let block2: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_2_BYTES.zcash_deserialize_into()?;
let block2_hash = block2.hash();
let block3: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_3_BYTES.zcash_deserialize_into()?;
let block3_hash = block3.hash();
let block4: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_4_BYTES.zcash_deserialize_into()?;
let block4_hash = block4.hash();
let block5: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_5_BYTES.zcash_deserialize_into()?;
let block5_hash = block5.hash();
// Start the syncer
let chain_sync_task_handle = tokio::spawn(chain_sync_future);
// ChainSync::request_genesis
// State is checked for genesis
state_service
.expect_request(zs::Request::Depth(block0_hash))
.await
.respond(zs::Response::Depth(None));
// Block 0 is fetched and committed to the state
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block0.clone()]));
chain_verifier
.expect_request(block0)
.await
.respond(block0_hash);
// Check that nothing unexpected happened.
// We expect more requests to the state service, because the syncer keeps on running.
peer_set.expect_no_requests().await;
chain_verifier.expect_no_requests().await;
// State is checked for genesis again
state_service
.expect_request(zs::Request::Depth(block0_hash))
.await
.respond(zs::Response::Depth(Some(0)));
// ChainSync::obtain_tips
// State is asked for a block locator.
state_service
.expect_request(zs::Request::BlockLocator)
.await
.respond(zs::Response::BlockLocator(vec![block0_hash]));
// Network is sent the block locator
peer_set
.expect_request(zn::Request::FindBlocks {
known_blocks: vec![block0_hash],
stop: None,
})
.await
.respond(zn::Response::BlockHashes(vec![
block1_hash,
block1_hash,
block1_hash, // tip
block2_hash, // expected_next
block3_hash, // (discarded - last hash, possibly incorrect)
]));
// State is checked for the first unknown block (block 1)
state_service
.expect_request(zs::Request::Depth(block1_hash))
.await
.respond(zs::Response::Depth(None));
// Clear remaining block locator requests
for _ in 0..(sync::FANOUT - 1) {
peer_set
.expect_request(zn::Request::FindBlocks {
known_blocks: vec![block0_hash],
stop: None,
})
.await
.respond(Err(zn::BoxError::from("synthetic test obtain tips error")));
}
// Check that nothing unexpected happened.
peer_set.expect_no_requests().await;
chain_verifier.expect_no_requests().await;
// State is checked for all non-tip blocks (blocks 1 & 2) in response order
state_service
.expect_request(zs::Request::Depth(block1_hash))
.await
.respond(zs::Response::Depth(None));
state_service
.expect_request(zs::Request::Depth(block2_hash))
.await
.respond(zs::Response::Depth(None));
// Blocks 1 & 2 are fetched in order, then verified concurrently
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block1_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block1.clone()]));
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block2_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block2.clone()]));
// We can't guarantee the verification request order
let mut remaining_blocks: HashMap<block::Hash, Arc<Block>> =
[(block1_hash, block1), (block2_hash, block2)]
.iter()
.cloned()
.collect();
for _ in 1..=2 {
chain_verifier
.expect_request_that(|req| remaining_blocks.remove(&req.hash()).is_some())
.await
.respond_with(|req| req.hash());
}
assert_eq!(
remaining_blocks,
HashMap::new(),
"expected all non-tip blocks to be verified by obtain tips"
);
// Check that nothing unexpected happened.
chain_verifier.expect_no_requests().await;
state_service.expect_no_requests().await;
// ChainSync::extend_tips
// Network is sent a block locator based on the tip
peer_set
.expect_request(zn::Request::FindBlocks {
known_blocks: vec![block1_hash],
stop: None,
})
.await
.respond(zn::Response::BlockHashes(vec![
block2_hash, // tip (discarded - already fetched)
block3_hash, // expected_next
block4_hash,
block3_hash,
block4_hash,
block5_hash, // (discarded - last hash, possibly incorrect)
]));
// Clear remaining block locator requests
for _ in 0..(sync::FANOUT - 1) {
peer_set
.expect_request(zn::Request::FindBlocks {
known_blocks: vec![block1_hash],
stop: None,
})
.await
.respond(Err(zn::BoxError::from("synthetic test extend tips error")));
}
// Check that nothing unexpected happened.
chain_verifier.expect_no_requests().await;
state_service.expect_no_requests().await;
// Blocks 3 & 4 are fetched in order, then verified concurrently
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block3_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block3.clone()]));
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block4_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block4.clone()]));
// We can't guarantee the verification request order
let mut remaining_blocks: HashMap<block::Hash, Arc<Block>> =
[(block3_hash, block3), (block4_hash, block4)]
.iter()
.cloned()
.collect();
for _ in 3..=4 {
chain_verifier
.expect_request_that(|req| remaining_blocks.remove(&req.hash()).is_some())
.await
.respond_with(|req| req.hash());
}
assert_eq!(
remaining_blocks,
HashMap::new(),
"expected all non-tip blocks to be verified by extend tips"
);
// Check that nothing unexpected happened.
chain_verifier.expect_no_requests().await;
state_service.expect_no_requests().await;
let chain_sync_result = chain_sync_task_handle.now_or_never();
assert!(
matches!(chain_sync_result, None),
"unexpected error or panic in chain sync task: {:?}",
chain_sync_result,
);
Ok(())
}
/// Test that zebra-network rejects blocks with the wrong hash.
#[tokio::test]
async fn sync_block_wrong_hash() -> Result<(), crate::BoxError> {
// Get services
let (
chain_sync_future,
_sync_status,
mut chain_verifier,
mut peer_set,
mut state_service,
_mock_chain_tip_sender,
) = setup();
// Get blocks
let block0: Arc<Block> =
zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES.zcash_deserialize_into()?;
let block0_hash = block0.hash();
// Get a block that is a long way away from genesis
let block982k: Arc<Block> =
zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?;
// Start the syncer
let chain_sync_task_handle = tokio::spawn(chain_sync_future);
// State is checked for genesis
state_service
.expect_request(zs::Request::Depth(block0_hash))
.await
.respond(zs::Response::Depth(None));
// Block 0 is fetched, but the peer returns a much higher block
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block982k.clone()]));
// Block is dropped because it has the wrong hash.
// We expect more requests to the state service, because the syncer keeps on running.
peer_set.expect_no_requests().await;
chain_verifier.expect_no_requests().await;
let chain_sync_result = chain_sync_task_handle.now_or_never();
assert!(
matches!(chain_sync_result, None),
"unexpected error or panic in chain sync task: {:?}",
chain_sync_result,
);
Ok(())
}
/// Test that the sync downloader rejects blocks that are too high in obtain_tips.
///
/// TODO: also test that it rejects blocks behind the tip limit. (Needs ~100 fake blocks.)
#[tokio::test(flavor = "multi_thread")]
async fn sync_block_too_high_obtain_tips() -> Result<(), crate::BoxError> {
// Get services
let (
chain_sync_future,
_sync_status,
mut chain_verifier,
mut peer_set,
mut state_service,
_mock_chain_tip_sender,
) = setup();
// Get blocks
let block0: Arc<Block> =
zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES.zcash_deserialize_into()?;
let block0_hash = block0.hash();
let block1: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_1_BYTES.zcash_deserialize_into()?;
let block1_hash = block1.hash();
let block2: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_2_BYTES.zcash_deserialize_into()?;
let block2_hash = block2.hash();
let block3: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_3_BYTES.zcash_deserialize_into()?;
let block3_hash = block3.hash();
// Also get a block that is a long way away from genesis
let block982k: Arc<Block> =
zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?;
let block982k_hash = block982k.hash();
// Start the syncer
let chain_sync_task_handle = tokio::spawn(chain_sync_future);
// ChainSync::request_genesis
// State is checked for genesis
state_service
.expect_request(zs::Request::Depth(block0_hash))
.await
.respond(zs::Response::Depth(None));
// Block 0 is fetched and committed to the state
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block0.clone()]));
chain_verifier
.expect_request(block0)
.await
.respond(block0_hash);
// Check that nothing unexpected happened.
// We expect more requests to the state service, because the syncer keeps on running.
peer_set.expect_no_requests().await;
chain_verifier.expect_no_requests().await;
// State is checked for genesis again
state_service
.expect_request(zs::Request::Depth(block0_hash))
.await
.respond(zs::Response::Depth(Some(0)));
// ChainSync::obtain_tips
// State is asked for a block locator.
state_service
.expect_request(zs::Request::BlockLocator)
.await
.respond(zs::Response::BlockLocator(vec![block0_hash]));
// Network is sent the block locator
peer_set
.expect_request(zn::Request::FindBlocks {
known_blocks: vec![block0_hash],
stop: None,
})
.await
.respond(zn::Response::BlockHashes(vec![
block982k_hash,
block1_hash, // tip
block2_hash, // expected_next
block3_hash, // (discarded - last hash, possibly incorrect)
]));
// State is checked for the first unknown block (block 982k)
state_service
.expect_request(zs::Request::Depth(block982k_hash))
.await
.respond(zs::Response::Depth(None));
// Clear remaining block locator requests
for _ in 0..(sync::FANOUT - 1) {
peer_set
.expect_request(zn::Request::FindBlocks {
known_blocks: vec![block0_hash],
stop: None,
})
.await
.respond(Err(zn::BoxError::from("synthetic test obtain tips error")));
}
// Check that nothing unexpected happened.
peer_set.expect_no_requests().await;
chain_verifier.expect_no_requests().await;
// State is checked for all non-tip blocks (blocks 982k, 1, 2) in response order
state_service
.expect_request(zs::Request::Depth(block982k_hash))
.await
.respond(zs::Response::Depth(None));
state_service
.expect_request(zs::Request::Depth(block1_hash))
.await
.respond(zs::Response::Depth(None));
state_service
.expect_request(zs::Request::Depth(block2_hash))
.await
.respond(zs::Response::Depth(None));
// Blocks 982k, 1, 2 are fetched in order, then verified concurrently,
// but block 982k verification is skipped because it is too high.
peer_set
.expect_request(zn::Request::BlocksByHash(
iter::once(block982k_hash).collect(),
))
.await
.respond(zn::Response::Blocks(vec![block982k.clone()]));
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block1_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block1.clone()]));
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block2_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block2.clone()]));
// At this point, the following tasks race:
// - The valid chain verifier requests
// - The block too high error, which causes a syncer reset and ChainSync::obtain_tips
// - ChainSync::extend_tips for the next tip
let chain_sync_result = chain_sync_task_handle.now_or_never();
assert!(
matches!(chain_sync_result, None),
"unexpected error or panic in chain sync task: {:?}",
chain_sync_result,
);
Ok(())
}
/// Test that the sync downloader rejects blocks that are too high in extend_tips.
///
/// TODO: also test that it rejects blocks behind the tip limit. (Needs ~100 fake blocks.)
#[tokio::test(flavor = "multi_thread")]
async fn sync_block_too_high_extend_tips() -> Result<(), crate::BoxError> {
// Get services
let (
chain_sync_future,
_sync_status,
mut chain_verifier,
mut peer_set,
mut state_service,
_mock_chain_tip_sender,
) = setup();
// Get blocks
let block0: Arc<Block> =
zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES.zcash_deserialize_into()?;
let block0_hash = block0.hash();
let block1: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_1_BYTES.zcash_deserialize_into()?;
let block1_hash = block1.hash();
let block2: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_2_BYTES.zcash_deserialize_into()?;
let block2_hash = block2.hash();
let block3: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_3_BYTES.zcash_deserialize_into()?;
let block3_hash = block3.hash();
let block4: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_4_BYTES.zcash_deserialize_into()?;
let block4_hash = block4.hash();
let block5: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_5_BYTES.zcash_deserialize_into()?;
let block5_hash = block5.hash();
// Also get a block that is a long way away from genesis
let block982k: Arc<Block> =
zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?;
let block982k_hash = block982k.hash();
// Start the syncer
let chain_sync_task_handle = tokio::spawn(chain_sync_future);
// ChainSync::request_genesis
// State is checked for genesis
state_service
.expect_request(zs::Request::Depth(block0_hash))
.await
.respond(zs::Response::Depth(None));
// Block 0 is fetched and committed to the state
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block0.clone()]));
chain_verifier
.expect_request(block0)
.await
.respond(block0_hash);
// Check that nothing unexpected happened.
// We expect more requests to the state service, because the syncer keeps on running.
peer_set.expect_no_requests().await;
chain_verifier.expect_no_requests().await;
// State is checked for genesis again
state_service
.expect_request(zs::Request::Depth(block0_hash))
.await
.respond(zs::Response::Depth(Some(0)));
// ChainSync::obtain_tips
// State is asked for a block locator.
state_service
.expect_request(zs::Request::BlockLocator)
.await
.respond(zs::Response::BlockLocator(vec![block0_hash]));
// Network is sent the block locator
peer_set
.expect_request(zn::Request::FindBlocks {
known_blocks: vec![block0_hash],
stop: None,
})
.await
.respond(zn::Response::BlockHashes(vec![
block1_hash, // tip
block2_hash, // expected_next
block3_hash, // (discarded - last hash, possibly incorrect)
]));
// State is checked for the first unknown block (block 1)
state_service
.expect_request(zs::Request::Depth(block1_hash))
.await
.respond(zs::Response::Depth(None));
// Clear remaining block locator requests
for _ in 0..(sync::FANOUT - 1) {
peer_set
.expect_request(zn::Request::FindBlocks {
known_blocks: vec![block0_hash],
stop: None,
})
.await
.respond(Err(zn::BoxError::from("synthetic test obtain tips error")));
}
// Check that nothing unexpected happened.
peer_set.expect_no_requests().await;
chain_verifier.expect_no_requests().await;
// State is checked for all non-tip blocks (blocks 1 & 2) in response order
state_service
.expect_request(zs::Request::Depth(block1_hash))
.await
.respond(zs::Response::Depth(None));
state_service
.expect_request(zs::Request::Depth(block2_hash))
.await
.respond(zs::Response::Depth(None));
// Blocks 1 & 2 are fetched in order, then verified concurrently
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block1_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block1.clone()]));
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block2_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block2.clone()]));
// We can't guarantee the verification request order
let mut remaining_blocks: HashMap<block::Hash, Arc<Block>> =
[(block1_hash, block1), (block2_hash, block2)]
.iter()
.cloned()
.collect();
for _ in 1..=2 {
chain_verifier
.expect_request_that(|req| remaining_blocks.remove(&req.hash()).is_some())
.await
.respond_with(|req| req.hash());
}
assert_eq!(
remaining_blocks,
HashMap::new(),
"expected all non-tip blocks to be verified by obtain tips"
);
// Check that nothing unexpected happened.
chain_verifier.expect_no_requests().await;
state_service.expect_no_requests().await;
// ChainSync::extend_tips
// Network is sent a block locator based on the tip
peer_set
.expect_request(zn::Request::FindBlocks {
known_blocks: vec![block1_hash],
stop: None,
})
.await
.respond(zn::Response::BlockHashes(vec![
block2_hash, // tip (discarded - already fetched)
block3_hash, // expected_next
block4_hash,
block982k_hash,
block5_hash, // (discarded - last hash, possibly incorrect)
]));
// Clear remaining block locator requests
for _ in 0..(sync::FANOUT - 1) {
peer_set
.expect_request(zn::Request::FindBlocks {
known_blocks: vec![block1_hash],
stop: None,
})
.await
.respond(Err(zn::BoxError::from("synthetic test extend tips error")));
}
// Check that nothing unexpected happened.
chain_verifier.expect_no_requests().await;
state_service.expect_no_requests().await;
// Blocks 3, 4, 982k are fetched in order, then verified concurrently,
// but block 982k verification is skipped because it is too high.
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block3_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block3.clone()]));
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block4_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block4.clone()]));
peer_set
.expect_request(zn::Request::BlocksByHash(
iter::once(block982k_hash).collect(),
))
.await
.respond(zn::Response::Blocks(vec![block982k.clone()]));
// At this point, the following tasks race:
// - The valid chain verifier requests
// - The block too high error, which causes a syncer reset and ChainSync::obtain_tips
// - ChainSync::extend_tips for the next tip
let chain_sync_result = chain_sync_task_handle.now_or_never();
assert!(
matches!(chain_sync_result, None),
"unexpected error or panic in chain sync task: {:?}",
chain_sync_result,
);
Ok(())
}
fn setup() -> (
// ChainSync
impl Future<Output = Result<(), Report>> + Send,
SyncStatus,
// ChainVerifier
MockService<Arc<Block>, block::Hash, PanicAssertion>,
// PeerSet
MockService<zebra_network::Request, zebra_network::Response, PanicAssertion>,
// StateService
MockService<zebra_state::Request, zebra_state::Response, PanicAssertion>,
MockChainTipSender,
) {
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let config = ZebradConfig {
consensus: consensus_config,
state: state_config,
..Default::default()
};
// These tests run multiple tasks in parallel.
// So machines under heavy load need a longer delay.
// (For example, CI machines with limited cores.)
let peer_set = MockService::build()
.with_max_request_delay(MAX_SERVICE_REQUEST_DELAY)
.for_unit_tests();
let chain_verifier = MockService::build()
.with_max_request_delay(MAX_SERVICE_REQUEST_DELAY)
.for_unit_tests();
let state_service = MockService::build()
.with_max_request_delay(MAX_SERVICE_REQUEST_DELAY)
.for_unit_tests();
let (mock_chain_tip, mock_chain_tip_sender) = MockChainTip::new();
let (chain_sync, sync_status) = ChainSync::new(
&config,
peer_set.clone(),
chain_verifier.clone(),
state_service.clone(),
mock_chain_tip,
);
let chain_sync_future = chain_sync.sync();
(
chain_sync_future,
sync_status,
chain_verifier,
peer_set,
state_service,
mock_chain_tip_sender,
)
}