diff --git a/zebrad/src/components/mempool/crawler.rs b/zebrad/src/components/mempool/crawler.rs index 6203e14ac..22b509a65 100644 --- a/zebrad/src/components/mempool/crawler.rs +++ b/zebrad/src/components/mempool/crawler.rs @@ -4,7 +4,7 @@ use std::time::Duration; -use futures::{stream, StreamExt, TryStreamExt}; +use futures::{stream::FuturesUnordered, StreamExt}; use tokio::{sync::Mutex, task::JoinHandle, time::sleep}; use tower::{timeout::Timeout, BoxError, Service, ServiceExt}; @@ -39,7 +39,7 @@ where S::Future: Send, { /// Spawn an asynchronous task to run the mempool crawler. - pub fn spawn(peer_set: S) -> JoinHandle<()> { + pub fn spawn(peer_set: S) -> JoinHandle> { let crawler = Crawler { peer_set: Mutex::new(Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT)), }; @@ -48,10 +48,10 @@ where } /// Periodically crawl peers for transactions to include in the mempool. - pub async fn run(self) { + pub async fn run(self) -> Result<(), BoxError> { loop { self.wait_until_enabled().await; - self.crawl_transactions().await; + self.crawl_transactions().await?; sleep(RATE_LIMIT_DELAY).await; } } @@ -64,24 +64,35 @@ where /// Crawl peers for transactions. /// /// Concurrently request [`FANOUT`] peers for transactions to include in the mempool. - async fn crawl_transactions(&self) { - let requests = stream::repeat(Request::MempoolTransactionIds).take(FANOUT); + async fn crawl_transactions(&self) -> Result<(), BoxError> { let peer_set = self.peer_set.lock().await.clone(); trace!("Crawling for mempool transactions"); - peer_set - .call_all(requests) - .unordered() - .and_then(|response| self.handle_response(response)) - // TODO: Reduce the log level of the errors (#2655). - .inspect_err(|error| info!("Failed to crawl peer for mempool transactions: {}", error)) - .for_each(|_| async {}) - .await; + let mut requests = FuturesUnordered::new(); + // get readiness for one peer at a time, to avoid peer set contention + for _ in 0..FANOUT { + let mut peer_set = peer_set.clone(); + // end the task on permanent peer set errors + let peer_set = peer_set.ready_and().await?; + + requests.push(peer_set.call(Request::MempoolTransactionIds)); + } + + while let Some(result) = requests.next().await { + // log individual response errors + match result { + Ok(response) => self.handle_response(response).await, + // TODO: Reduce the log level of the errors (#2655). + Err(error) => info!("Failed to crawl peer for mempool transactions: {}", error), + } + } + + Ok(()) } /// Handle a peer's response to the crawler's request for transactions. - async fn handle_response(&self, response: Response) -> Result<(), BoxError> { + async fn handle_response(&self, response: Response) { let transaction_ids = match response { Response::TransactionIds(ids) => ids, _ => unreachable!("Peer set did not respond with transaction IDs to mempool crawler"), @@ -92,8 +103,6 @@ where transaction_ids.len() ); - // TODO: Download transactions and send them to the mempool (#2650) - - Ok(()) + // TODO: Send transaction IDs to the download and verify stream (#2650) } }