Create a `SyncStatus` helper type (#2685)

* Create a `SyncStatus` helper type

Keeps track if the synchronizer is close to the chain tip or not.

* Refactor `ChainSync` ctor. to return `SyncStatus`

Change the constructor API so that it returns a higher level construct.

* Test if `SyncStatus` waits for the chain tip

Test if waiting for the chain tip to be reached correctly finishes when
the chain tip is reached. This is done by sending recent sync lengths to
the `SyncStatus` instance, and checking that every time a separate
`SyncStatus` instance determines it has reached the tip the original
instance wakes up.

* Add a temporary attribute to allow dead code

The code added isn't used yet, so we'll add a temporary waiver until
another PR is merged to use them.
This commit is contained in:
Janito Vaqueiro Ferreira Filho 2021-08-29 21:01:33 -03:00 committed by GitHub
parent 968f20d423
commit 83a2e30e33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 198 additions and 12 deletions

View File

@ -84,8 +84,8 @@ impl StartCmd {
.map_err(|_| eyre!("could not send setup data to inbound service"))?;
info!("initializing syncer");
// TODO: use sync_length_receiver to activate the mempool (#2592)
let (syncer, _sync_length_receiver) =
// TODO: use sync_status to activate the mempool (#2592)
let (syncer, _sync_status) =
ChainSync::new(&config, peer_set.clone(), state, chain_verifier);
select! {

View File

@ -5,7 +5,7 @@ use futures::{
future::FutureExt,
stream::{FuturesUnordered, StreamExt},
};
use tokio::{sync::watch, time::sleep};
use tokio::time::sleep;
use tower::{
builder::ServiceBuilder, hedge::Hedge, limit::ConcurrencyLimit, retry::Retry, timeout::Timeout,
Service, ServiceExt,
@ -22,12 +22,14 @@ use crate::{config::ZebradConfig, BoxError};
mod downloads;
mod recent_sync_lengths;
mod status;
#[cfg(test)]
mod tests;
use downloads::{AlwaysHedge, Downloads};
use recent_sync_lengths::RecentSyncLengths;
pub use status::SyncStatus;
/// Controls the number of peers used for each ObtainTips and ExtendTips request.
const FANOUT: usize = 4;
@ -222,13 +224,8 @@ where
/// - state: the zebra-state that stores the chain
/// - verifier: the zebra-consensus verifier that checks the chain
///
/// Also returns a [`watch::Receiver`] endpoint for receiving recent sync lengths.
pub fn new(
config: &ZebradConfig,
peers: ZN,
state: ZS,
verifier: ZV,
) -> (Self, watch::Receiver<Vec<usize>>) {
/// Also returns a [`SyncStatus`] to check if the syncer has likely reached the chain tip.
pub fn new(config: &ZebradConfig, peers: ZN, state: ZS, verifier: ZV) -> (Self, SyncStatus) {
let tip_network = Timeout::new(peers.clone(), TIPS_RESPONSE_TIMEOUT);
// The Hedge middleware is the outermost layer, hedging requests
// between two retry-wrapped networks. The innermost timeout
@ -264,7 +261,7 @@ where
MIN_LOOKAHEAD_LIMIT
);
let (recent_syncs, sync_length_receiver) = RecentSyncLengths::new();
let (sync_status, recent_syncs) = SyncStatus::new();
let new_syncer = Self {
genesis_hash: genesis_hash(config.network.network),
@ -276,7 +273,7 @@ where
recent_syncs,
};
(new_syncer, sync_length_receiver)
(new_syncer, sync_status)
}
#[instrument(skip(self))]

View File

@ -0,0 +1,49 @@
// TODO: Remove this attribute once this type is used (#2603).
#![allow(dead_code)]
use tokio::sync::watch;
use super::RecentSyncLengths;
#[cfg(test)]
mod tests;
/// A helper type to determine if the synchronizer has likely reached the chain tip.
///
/// This type can be used as a handle, so cloning it is cheap.
#[derive(Clone, Debug)]
pub struct SyncStatus {
latest_sync_length: watch::Receiver<Vec<usize>>,
}
impl SyncStatus {
/// Create an instance of [`SyncStatus`].
///
/// The status is determined based on the latest counts of synchronized blocks, observed
/// through `latest_sync_length`.
pub fn new() -> (Self, RecentSyncLengths) {
let (recent_sync_lengths, latest_sync_length) = RecentSyncLengths::new();
let status = SyncStatus { latest_sync_length };
(status, recent_sync_lengths)
}
/// Wait until the synchronization is likely close to the tip.
///
/// Returns an error if communication with the synchronizer is lost.
pub async fn wait_until_close_to_tip(&mut self) -> Result<(), watch::error::RecvError> {
while !self.is_close_to_tip() {
self.latest_sync_length.changed().await?;
}
Ok(())
}
/// Check if the synchronization is likely close to the chain tip.
pub fn is_close_to_tip(&self) -> bool {
let _sync_lengths = self.latest_sync_length.borrow();
// TODO: Determine if the synchronization is actually close to the tip (#2592).
true
}
}

View File

@ -0,0 +1,140 @@
use std::{env, sync::Arc, time::Duration};
use futures::{select, FutureExt};
use proptest::prelude::*;
use tokio::{sync::Semaphore, time::timeout};
use super::{super::RecentSyncLengths, SyncStatus};
/// The default number of test cases to run.
const DEFAULT_ASYNC_SYNCHRONIZED_TASKS_PROPTEST_CASES: u32 = 32;
/// The maximum time one test instance should run.
///
/// If the test exceeds this time it is considered to have failed.
const MAX_TEST_EXECUTION: Duration = Duration::from_secs(1);
/// The maximum time to wait for an event to be received.
///
/// If an event is not received in this time, it is considered that it will never be received.
const EVENT_TIMEOUT: Duration = Duration::from_millis(5);
proptest! {
#![proptest_config(
proptest::test_runner::Config::with_cases(env::var("PROPTEST_CASES")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_ASYNC_SYNCHRONIZED_TASKS_PROPTEST_CASES))
)]
/// Test if the [`SyncStatus`] correctly waits until the chain tip is reached.
///
/// This is an asynchronous test with two concurrent tasks. The main task mocks chain sync
/// length updates and verifies if the other task was awakened by the update.
#[test]
fn waits_until_close_to_tip(sync_lengths in any::<Vec<usize>>()) {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("Failed to create Tokio runtime");
let _guard = runtime.enter();
runtime.block_on(timeout(MAX_TEST_EXECUTION, root_task(sync_lengths)))??;
/// The root task that the runtime executes.
///
/// Spawns the two concurrent tasks, and sets up the synchronization channels between them.
async fn root_task(sync_lengths: Vec<usize>) -> Result<(), TestCaseError> {
let update_events = Arc::new(Semaphore::new(0));
let wake_events = Arc::new(Semaphore::new(0));
let (status, recent_sync_lengths) = SyncStatus::new();
let mut wait_task_handle = tokio::spawn(wait_task(
status.clone(),
update_events.clone(),
wake_events.clone(),
))
.fuse();
let mut main_task_handle = tokio::spawn(main_task(
sync_lengths,
status,
recent_sync_lengths,
update_events,
wake_events,
))
.fuse();
select! {
result = main_task_handle => result.expect("Failed to wait for main test task"),
result = wait_task_handle => result.expect("Failed to wait for wait test task"),
}
}
/// The main task.
///
/// 1. Applies each chain sync length update from the `sync_lengths` parameter.
/// 2. If necessary, notify the other task that an update was applied. This is to avoid
/// having the other task enter an infinite loop while it thinks it has reached the
/// chain tip.
/// 3. Waits to see if the other task sends a wake event, meaning that it awoke because it
/// was notified that it has reached the chain tip.
/// 4. Compares to see if the there was an awake event and if it was expected or not based
/// on whether the [`SyncStatus`] says that it's close to the tip.
async fn main_task(
sync_lengths: Vec<usize>,
status: SyncStatus,
mut recent_sync_lengths: RecentSyncLengths,
update_events: Arc<Semaphore>,
wake_events: Arc<Semaphore>,
) -> Result<(), TestCaseError> {
let mut needs_update_event = true;
for length in sync_lengths {
recent_sync_lengths.push_extend_tips_length(length);
if needs_update_event {
update_events.add_permits(1);
}
let awoke = match timeout(EVENT_TIMEOUT, wake_events.acquire()).await {
Ok(permit) => {
permit.forget();
true
}
Err(_) => false,
};
needs_update_event = awoke;
assert_eq!(status.is_close_to_tip(), awoke);
}
Ok(())
}
/// The helper task that repeatedly waits until the chain tip is close.
///
/// 1. Waits for an update event granting permission to run an iteration. This avoids
/// looping repeatedly while [`SyncStatus`] reports that it is close to the chain tip.
/// 2. Waits until [`SyncStatus`] reports that it is close to the chain tip.
/// 3. Notifies the main task that it awoke, i.e., that the [`SyncStatus`] has finished
/// wating until it was close to the chain tip.
async fn wait_task(
mut status: SyncStatus,
update_events: Arc<Semaphore>,
wake_events: Arc<Semaphore>,
) -> Result<(), TestCaseError> {
loop {
update_events.acquire().await.forget();
if status.wait_until_close_to_tip().await.is_err() {
return Ok(());
}
wake_events.add_permits(1);
}
}
}
}