Stop concurrently obtaining peer set readiness (#2672)
This avoids peer set contention when most peers are busy. Also exit the task if the peer service returns a readiness error, because that means it's permanently unusable. Co-authored-by: Janito Vaqueiro Ferreira Filho <janito.vff@gmail.com>
This commit is contained in:
parent
0e60936ad3
commit
6a9a8dfc38
|
@ -4,7 +4,7 @@
|
||||||
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use futures::{stream, StreamExt, TryStreamExt};
|
use futures::{stream::FuturesUnordered, StreamExt};
|
||||||
use tokio::{sync::Mutex, task::JoinHandle, time::sleep};
|
use tokio::{sync::Mutex, task::JoinHandle, time::sleep};
|
||||||
use tower::{timeout::Timeout, BoxError, Service, ServiceExt};
|
use tower::{timeout::Timeout, BoxError, Service, ServiceExt};
|
||||||
|
|
||||||
|
@ -39,7 +39,7 @@ where
|
||||||
S::Future: Send,
|
S::Future: Send,
|
||||||
{
|
{
|
||||||
/// Spawn an asynchronous task to run the mempool crawler.
|
/// Spawn an asynchronous task to run the mempool crawler.
|
||||||
pub fn spawn(peer_set: S) -> JoinHandle<()> {
|
pub fn spawn(peer_set: S) -> JoinHandle<Result<(), BoxError>> {
|
||||||
let crawler = Crawler {
|
let crawler = Crawler {
|
||||||
peer_set: Mutex::new(Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT)),
|
peer_set: Mutex::new(Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT)),
|
||||||
};
|
};
|
||||||
|
@ -48,10 +48,10 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Periodically crawl peers for transactions to include in the mempool.
|
/// Periodically crawl peers for transactions to include in the mempool.
|
||||||
pub async fn run(self) {
|
pub async fn run(self) -> Result<(), BoxError> {
|
||||||
loop {
|
loop {
|
||||||
self.wait_until_enabled().await;
|
self.wait_until_enabled().await;
|
||||||
self.crawl_transactions().await;
|
self.crawl_transactions().await?;
|
||||||
sleep(RATE_LIMIT_DELAY).await;
|
sleep(RATE_LIMIT_DELAY).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -64,24 +64,35 @@ where
|
||||||
/// Crawl peers for transactions.
|
/// Crawl peers for transactions.
|
||||||
///
|
///
|
||||||
/// Concurrently request [`FANOUT`] peers for transactions to include in the mempool.
|
/// Concurrently request [`FANOUT`] peers for transactions to include in the mempool.
|
||||||
async fn crawl_transactions(&self) {
|
async fn crawl_transactions(&self) -> Result<(), BoxError> {
|
||||||
let requests = stream::repeat(Request::MempoolTransactionIds).take(FANOUT);
|
|
||||||
let peer_set = self.peer_set.lock().await.clone();
|
let peer_set = self.peer_set.lock().await.clone();
|
||||||
|
|
||||||
trace!("Crawling for mempool transactions");
|
trace!("Crawling for mempool transactions");
|
||||||
|
|
||||||
peer_set
|
let mut requests = FuturesUnordered::new();
|
||||||
.call_all(requests)
|
// get readiness for one peer at a time, to avoid peer set contention
|
||||||
.unordered()
|
for _ in 0..FANOUT {
|
||||||
.and_then(|response| self.handle_response(response))
|
let mut peer_set = peer_set.clone();
|
||||||
|
// end the task on permanent peer set errors
|
||||||
|
let peer_set = peer_set.ready_and().await?;
|
||||||
|
|
||||||
|
requests.push(peer_set.call(Request::MempoolTransactionIds));
|
||||||
|
}
|
||||||
|
|
||||||
|
while let Some(result) = requests.next().await {
|
||||||
|
// log individual response errors
|
||||||
|
match result {
|
||||||
|
Ok(response) => self.handle_response(response).await,
|
||||||
// TODO: Reduce the log level of the errors (#2655).
|
// TODO: Reduce the log level of the errors (#2655).
|
||||||
.inspect_err(|error| info!("Failed to crawl peer for mempool transactions: {}", error))
|
Err(error) => info!("Failed to crawl peer for mempool transactions: {}", error),
|
||||||
.for_each(|_| async {})
|
}
|
||||||
.await;
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle a peer's response to the crawler's request for transactions.
|
/// Handle a peer's response to the crawler's request for transactions.
|
||||||
async fn handle_response(&self, response: Response) -> Result<(), BoxError> {
|
async fn handle_response(&self, response: Response) {
|
||||||
let transaction_ids = match response {
|
let transaction_ids = match response {
|
||||||
Response::TransactionIds(ids) => ids,
|
Response::TransactionIds(ids) => ids,
|
||||||
_ => unreachable!("Peer set did not respond with transaction IDs to mempool crawler"),
|
_ => unreachable!("Peer set did not respond with transaction IDs to mempool crawler"),
|
||||||
|
@ -92,8 +103,6 @@ where
|
||||||
transaction_ids.len()
|
transaction_ids.len()
|
||||||
);
|
);
|
||||||
|
|
||||||
// TODO: Download transactions and send them to the mempool (#2650)
|
// TODO: Send transaction IDs to the download and verify stream (#2650)
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue