diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 8a998445f..145a15299 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -85,12 +85,12 @@ impl StartCmd { info!("initializing syncer"); // TODO: use sync_status to activate the mempool (#2592) - let (syncer, _sync_status) = + let (syncer, sync_status) = ChainSync::new(&config, peer_set.clone(), state, chain_verifier); select! { result = syncer.sync().fuse() => result, - _ = mempool::Crawler::spawn(peer_set).fuse() => { + _ = mempool::Crawler::spawn(peer_set, sync_status).fuse() => { unreachable!("The mempool crawler only stops if it panics"); } } diff --git a/zebrad/src/components/mempool/crawler.rs b/zebrad/src/components/mempool/crawler.rs index 22b509a65..bb53da4c6 100644 --- a/zebrad/src/components/mempool/crawler.rs +++ b/zebrad/src/components/mempool/crawler.rs @@ -10,6 +10,8 @@ use tower::{timeout::Timeout, BoxError, Service, ServiceExt}; use zebra_network::{Request, Response}; +use super::super::sync::SyncStatus; + #[cfg(test)] mod tests; @@ -31,6 +33,7 @@ const PEER_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6); /// The mempool transaction crawler. pub struct Crawler { peer_set: Mutex>, + status: SyncStatus, } impl Crawler @@ -39,26 +42,26 @@ where S::Future: Send, { /// Spawn an asynchronous task to run the mempool crawler. - pub fn spawn(peer_set: S) -> JoinHandle> { + pub fn spawn(peer_set: S, status: SyncStatus) -> JoinHandle> { let crawler = Crawler { peer_set: Mutex::new(Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT)), + status, }; tokio::spawn(crawler.run()) } /// Periodically crawl peers for transactions to include in the mempool. - pub async fn run(self) -> Result<(), BoxError> { - loop { - self.wait_until_enabled().await; + /// + /// Runs until the [`SyncStatus`] loses its connection to the chain syncer, which happens when + /// Zebra is shutting down. + pub async fn run(mut self) -> Result<(), BoxError> { + while self.status.wait_until_close_to_tip().await.is_ok() { self.crawl_transactions().await?; sleep(RATE_LIMIT_DELAY).await; } - } - /// Wait until the mempool is enabled. - async fn wait_until_enabled(&self) { - // TODO: Check if synchronizing up to chain tip has finished (#2603). + Ok(()) } /// Crawl peers for transactions. diff --git a/zebrad/src/components/mempool/crawler/tests.rs b/zebrad/src/components/mempool/crawler/tests.rs index 35b71d042..e9ea6dc19 100644 --- a/zebrad/src/components/mempool/crawler/tests.rs +++ b/zebrad/src/components/mempool/crawler/tests.rs @@ -1,5 +1,6 @@ use std::time::Duration; +use proptest::prelude::*; use tokio::{ sync::mpsc::{self, UnboundedReceiver}, time::{self, timeout}, @@ -8,7 +9,7 @@ use tower::{buffer::Buffer, util::BoxService, BoxError}; use zebra_network::{Request, Response}; -use super::{Crawler, FANOUT, RATE_LIMIT_DELAY}; +use super::{Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY}; /// The number of iterations to crawl while testing. /// @@ -21,31 +22,60 @@ const CRAWL_ITERATIONS: usize = 4; /// Note that this affects the total run time of the [`crawler_requests_for_transaction_ids`] test. /// There are [`CRAWL_ITERATIONS`] requests that are expected to not be sent, so the test runs for /// at least `MAX_REQUEST_DELAY * CRAWL_ITERATIONS`. -const MAX_REQUEST_DELAY: Duration = Duration::from_millis(250); +const MAX_REQUEST_DELAY: Duration = Duration::from_millis(25); /// The amount of time to advance beyond the expected instant that the crawler wakes up. const ERROR_MARGIN: Duration = Duration::from_millis(100); -#[tokio::test] -async fn crawler_requests_for_transaction_ids() { - let (peer_set, mut requests) = mock_peer_set(); +proptest! { + #[test] + fn crawler_requests_for_transaction_ids(mut sync_lengths in any::>()) { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Failed to create Tokio runtime"); + let _guard = runtime.enter(); - Crawler::spawn(peer_set); + // Add a dummy last element, so that all of the original values are used. + sync_lengths.push(0); - time::pause(); + runtime.block_on(async move { + let (peer_set, mut requests) = mock_peer_set(); + let (sync_status, mut recent_sync_lengths) = SyncStatus::new(); - for _ in 0..CRAWL_ITERATIONS { - for _ in 0..FANOUT { - let request = timeout(MAX_REQUEST_DELAY, requests.recv()).await; + time::pause(); - assert!(matches!(request, Ok(Some(Request::MempoolTransactionIds)))); - } + Crawler::spawn(peer_set, sync_status.clone()); - let extra_request = timeout(MAX_REQUEST_DELAY, requests.recv()).await; + for sync_length in sync_lengths { + let mempool_is_enabled = sync_status.is_close_to_tip(); - assert!(extra_request.is_err()); + for _ in 0..CRAWL_ITERATIONS { + for _ in 0..FANOUT { + let request = timeout(MAX_REQUEST_DELAY, requests.recv()).await; - time::advance(RATE_LIMIT_DELAY + ERROR_MARGIN).await; + if mempool_is_enabled { + prop_assert!(matches!(request, Ok(Some(Request::MempoolTransactionIds)))); + } else { + prop_assert!(request.is_err()); + } + } + + let extra_request = timeout(MAX_REQUEST_DELAY, requests.recv()).await; + + prop_assert!(extra_request.is_err()); + + time::sleep(RATE_LIMIT_DELAY + ERROR_MARGIN).await; + } + + // Applying the update event at the end of the test iteration means that the first + // iteration runs with an empty recent sync. lengths vector. A dummy element is + // appended to the events so that all of the original values are applied. + recent_sync_lengths.push_extend_tips_length(sync_length); + } + + Ok(()) + })?; } }