Only enable the mempool crawler after synchronization reaches the chain tip (#2667)

* Store a `SyncStatus` handle in the `Crawler`

The helper type will make it easier to determine if the crawler is
enabled or not.

* Pause crawler if mempool is disabled

Implement waiting until the mempool becomes enabled, so that the crawler
does not run while the mempool is disabled.

If the `MempoolStatus` helper is unable to determine if the mempool is
enabled, stop the crawler task entirely.

* Update test to consider when crawler is paused

Change the mempool crawler test so that it's a proptest that tests
different chain sync. lengths. This leads to different scenarios with
the crawler pausing and resuming.

Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
Janito Vaqueiro Ferreira Filho 2021-08-31 07:42:25 -03:00 committed by GitHub
parent 2dac0dda47
commit 8bff71e857
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 58 additions and 25 deletions

View File

@ -85,12 +85,12 @@ impl StartCmd {
info!("initializing syncer"); info!("initializing syncer");
// TODO: use sync_status to activate the mempool (#2592) // TODO: use sync_status to activate the mempool (#2592)
let (syncer, _sync_status) = let (syncer, sync_status) =
ChainSync::new(&config, peer_set.clone(), state, chain_verifier); ChainSync::new(&config, peer_set.clone(), state, chain_verifier);
select! { select! {
result = syncer.sync().fuse() => result, result = syncer.sync().fuse() => result,
_ = mempool::Crawler::spawn(peer_set).fuse() => { _ = mempool::Crawler::spawn(peer_set, sync_status).fuse() => {
unreachable!("The mempool crawler only stops if it panics"); unreachable!("The mempool crawler only stops if it panics");
} }
} }

View File

@ -10,6 +10,8 @@ use tower::{timeout::Timeout, BoxError, Service, ServiceExt};
use zebra_network::{Request, Response}; use zebra_network::{Request, Response};
use super::super::sync::SyncStatus;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
@ -31,6 +33,7 @@ const PEER_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6);
/// The mempool transaction crawler. /// The mempool transaction crawler.
pub struct Crawler<S> { pub struct Crawler<S> {
peer_set: Mutex<Timeout<S>>, peer_set: Mutex<Timeout<S>>,
status: SyncStatus,
} }
impl<S> Crawler<S> impl<S> Crawler<S>
@ -39,26 +42,26 @@ where
S::Future: Send, S::Future: Send,
{ {
/// Spawn an asynchronous task to run the mempool crawler. /// Spawn an asynchronous task to run the mempool crawler.
pub fn spawn(peer_set: S) -> JoinHandle<Result<(), BoxError>> { pub fn spawn(peer_set: S, status: SyncStatus) -> JoinHandle<Result<(), BoxError>> {
let crawler = Crawler { let crawler = Crawler {
peer_set: Mutex::new(Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT)), peer_set: Mutex::new(Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT)),
status,
}; };
tokio::spawn(crawler.run()) tokio::spawn(crawler.run())
} }
/// Periodically crawl peers for transactions to include in the mempool. /// Periodically crawl peers for transactions to include in the mempool.
pub async fn run(self) -> Result<(), BoxError> { ///
loop { /// Runs until the [`SyncStatus`] loses its connection to the chain syncer, which happens when
self.wait_until_enabled().await; /// Zebra is shutting down.
pub async fn run(mut self) -> Result<(), BoxError> {
while self.status.wait_until_close_to_tip().await.is_ok() {
self.crawl_transactions().await?; self.crawl_transactions().await?;
sleep(RATE_LIMIT_DELAY).await; sleep(RATE_LIMIT_DELAY).await;
} }
}
/// Wait until the mempool is enabled. Ok(())
async fn wait_until_enabled(&self) {
// TODO: Check if synchronizing up to chain tip has finished (#2603).
} }
/// Crawl peers for transactions. /// Crawl peers for transactions.

View File

@ -1,5 +1,6 @@
use std::time::Duration; use std::time::Duration;
use proptest::prelude::*;
use tokio::{ use tokio::{
sync::mpsc::{self, UnboundedReceiver}, sync::mpsc::{self, UnboundedReceiver},
time::{self, timeout}, time::{self, timeout},
@ -8,7 +9,7 @@ use tower::{buffer::Buffer, util::BoxService, BoxError};
use zebra_network::{Request, Response}; use zebra_network::{Request, Response};
use super::{Crawler, FANOUT, RATE_LIMIT_DELAY}; use super::{Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY};
/// The number of iterations to crawl while testing. /// The number of iterations to crawl while testing.
/// ///
@ -21,31 +22,60 @@ const CRAWL_ITERATIONS: usize = 4;
/// Note that this affects the total run time of the [`crawler_requests_for_transaction_ids`] test. /// Note that this affects the total run time of the [`crawler_requests_for_transaction_ids`] test.
/// There are [`CRAWL_ITERATIONS`] requests that are expected to not be sent, so the test runs for /// There are [`CRAWL_ITERATIONS`] requests that are expected to not be sent, so the test runs for
/// at least `MAX_REQUEST_DELAY * CRAWL_ITERATIONS`. /// at least `MAX_REQUEST_DELAY * CRAWL_ITERATIONS`.
const MAX_REQUEST_DELAY: Duration = Duration::from_millis(250); const MAX_REQUEST_DELAY: Duration = Duration::from_millis(25);
/// The amount of time to advance beyond the expected instant that the crawler wakes up. /// The amount of time to advance beyond the expected instant that the crawler wakes up.
const ERROR_MARGIN: Duration = Duration::from_millis(100); const ERROR_MARGIN: Duration = Duration::from_millis(100);
#[tokio::test] proptest! {
async fn crawler_requests_for_transaction_ids() { #[test]
let (peer_set, mut requests) = mock_peer_set(); fn crawler_requests_for_transaction_ids(mut sync_lengths in any::<Vec<usize>>()) {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create Tokio runtime");
let _guard = runtime.enter();
Crawler::spawn(peer_set); // Add a dummy last element, so that all of the original values are used.
sync_lengths.push(0);
runtime.block_on(async move {
let (peer_set, mut requests) = mock_peer_set();
let (sync_status, mut recent_sync_lengths) = SyncStatus::new();
time::pause(); time::pause();
Crawler::spawn(peer_set, sync_status.clone());
for sync_length in sync_lengths {
let mempool_is_enabled = sync_status.is_close_to_tip();
for _ in 0..CRAWL_ITERATIONS { for _ in 0..CRAWL_ITERATIONS {
for _ in 0..FANOUT { for _ in 0..FANOUT {
let request = timeout(MAX_REQUEST_DELAY, requests.recv()).await; let request = timeout(MAX_REQUEST_DELAY, requests.recv()).await;
assert!(matches!(request, Ok(Some(Request::MempoolTransactionIds)))); if mempool_is_enabled {
prop_assert!(matches!(request, Ok(Some(Request::MempoolTransactionIds))));
} else {
prop_assert!(request.is_err());
}
} }
let extra_request = timeout(MAX_REQUEST_DELAY, requests.recv()).await; let extra_request = timeout(MAX_REQUEST_DELAY, requests.recv()).await;
assert!(extra_request.is_err()); prop_assert!(extra_request.is_err());
time::advance(RATE_LIMIT_DELAY + ERROR_MARGIN).await; time::sleep(RATE_LIMIT_DELAY + ERROR_MARGIN).await;
}
// Applying the update event at the end of the test iteration means that the first
// iteration runs with an empty recent sync. lengths vector. A dummy element is
// appended to the events so that all of the original values are applied.
recent_sync_lengths.push_extend_tips_length(sync_length);
}
Ok(())
})?;
} }
} }