Create initial transaction crawler for the mempool (#2646)

* Create initial `mempool::Crawler` type

The mempool crawler is responsible for periodically asking peers for
transactions to insert into the local mempool. This initial
implementation will periodically ask for transactions, but won't do
anything with them yet.

Also, the crawler is currently configured to be always enabled, but this
should be fixed to avoid crawling while Zebra is still syncing the
chain.

* Add a timeout to peer responses

Prevent the crawler from getting stuck if there's communication with a
peer that takes too long to respond.

* Run the mempool crawler in Zebra

Spawn a task for the crawler when Zebra starts.

* Test if the crawler is sending requests

Create a mock for the `PeerSet` service to intercept requests and verify
that the transaction requests are sent periodically.

* Use `full` Tokio features when testing

Make it simpler to select the features for test builds.

Co-authored-by: teor <teor@riseup.net>

* Link to the issue for crawler activation

Make it easy to navigate from the `TODO` comment to the current project
planning.

Co-authored-by: teor <teor@riseup.net>

* Link to the issue for downloading transactions

Make it easy to navigate from the `TODO` comment to the current project
planning.

Co-authored-by: teor <teor@riseup.net>

Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
Janito Vaqueiro Ferreira Filho 2021-08-24 11:23:53 -03:00 committed by GitHub
parent bc4194fcb9
commit 069f7716db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 192 additions and 3 deletions

View File

@ -55,6 +55,7 @@ once_cell = "1.8"
regex = "1.4.6" regex = "1.4.6"
semver = "1.0.3" semver = "1.0.3"
tempdir = "0.3.7" tempdir = "0.3.7"
tokio = { version = "0.3.6", features = ["full", "test-util"] }
proptest = "0.10" proptest = "0.10"
proptest-derive = "0.3" proptest-derive = "0.3"

View File

@ -25,13 +25,14 @@
use abscissa_core::{config, Command, FrameworkError, Options, Runnable}; use abscissa_core::{config, Command, FrameworkError, Options, Runnable};
use color_eyre::eyre::{eyre, Report}; use color_eyre::eyre::{eyre, Report};
use futures::{select, FutureExt};
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tower::builder::ServiceBuilder; use tower::builder::ServiceBuilder;
use crate::components::{tokio::RuntimeRun, Inbound}; use crate::components::{tokio::RuntimeRun, Inbound};
use crate::config::ZebradConfig; use crate::config::ZebradConfig;
use crate::{ use crate::{
components::{tokio::TokioComponent, ChainSync}, components::{mempool, tokio::TokioComponent, ChainSync},
prelude::*, prelude::*,
}; };
@ -79,9 +80,15 @@ impl StartCmd {
info!("initializing syncer"); info!("initializing syncer");
// TODO: use sync_length_receiver to activate the mempool (#2592) // TODO: use sync_length_receiver to activate the mempool (#2592)
let (syncer, _sync_length_receiver) = ChainSync::new(&config, peer_set, state, verifier); let (syncer, _sync_length_receiver) =
ChainSync::new(&config, peer_set.clone(), state, verifier);
syncer.sync().await select! {
result = syncer.sync().fuse() => result,
_ = mempool::Crawler::spawn(peer_set).fuse() => {
unreachable!("The mempool crawler only stops if it panics");
}
}
} }
} }

View File

@ -6,6 +6,7 @@
//! don't fit the async context well. //! don't fit the async context well.
mod inbound; mod inbound;
pub mod mempool;
pub mod metrics; pub mod metrics;
mod sync; mod sync;
pub mod tokio; pub mod tokio;

View File

@ -0,0 +1,5 @@
//! Zebra mempool.
mod crawler;
pub use self::crawler::Crawler;

View File

@ -0,0 +1,99 @@
//! Zebra Mempool crawler.
//!
//! The crawler periodically requests transactions from peers in order to populate the mempool.
use std::time::Duration;
use futures::{stream, StreamExt, TryStreamExt};
use tokio::{sync::Mutex, task::JoinHandle, time::sleep};
use tower::{timeout::Timeout, BoxError, Service, ServiceExt};
use zebra_network::{Request, Response};
#[cfg(test)]
mod tests;
/// The number of peers to request transactions from per crawl event.
const FANOUT: usize = 4;
/// The delay between crawl events.
const RATE_LIMIT_DELAY: Duration = Duration::from_secs(75);
/// The time to wait for a peer response.
///
/// # Correctness
///
/// If this timeout is removed or set too high, the crawler may hang waiting for a peer to respond.
///
/// If this timeout is set too low, the crawler may fail to populate the mempool.
const PEER_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6);
/// The mempool transaction crawler.
pub struct Crawler<S> {
peer_set: Mutex<Timeout<S>>,
}
impl<S> Crawler<S>
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send,
{
/// Spawn an asynchronous task to run the mempool crawler.
pub fn spawn(peer_set: S) -> JoinHandle<()> {
let crawler = Crawler {
peer_set: Mutex::new(Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT)),
};
tokio::spawn(crawler.run())
}
/// Periodically crawl peers for transactions to include in the mempool.
pub async fn run(self) {
loop {
self.wait_until_enabled().await;
self.crawl_transactions().await;
sleep(RATE_LIMIT_DELAY).await;
}
}
/// Wait until the mempool is enabled.
async fn wait_until_enabled(&self) {
// TODO: Check if synchronizing up to chain tip has finished (#2603).
}
/// Crawl peers for transactions.
///
/// Concurrently request [`FANOUT`] peers for transactions to include in the mempool.
async fn crawl_transactions(&self) {
let requests = stream::repeat(Request::MempoolTransactionIds).take(FANOUT);
let peer_set = self.peer_set.lock().await.clone();
trace!("Crawling for mempool transactions");
peer_set
.call_all(requests)
.unordered()
.and_then(|response| self.handle_response(response))
// TODO: Reduce the log level of the errors (#2655).
.inspect_err(|error| info!("Failed to crawl peer for mempool transactions: {}", error))
.for_each(|_| async {})
.await;
}
/// Handle a peer's response to the crawler's request for transactions.
async fn handle_response(&self, response: Response) -> Result<(), BoxError> {
let transaction_ids = match response {
Response::TransactionIds(ids) => ids,
_ => unreachable!("Peer set did not respond with transaction IDs to mempool crawler"),
};
trace!(
"Mempool crawler received {} transaction IDs",
transaction_ids.len()
);
// TODO: Download transactions and send them to the mempool (#2650)
Ok(())
}
}

View File

@ -0,0 +1,76 @@
use std::time::Duration;
use tokio::{
sync::mpsc::{self, UnboundedReceiver},
time::{self, timeout},
};
use tower::{buffer::Buffer, util::BoxService, BoxError};
use zebra_network::{Request, Response};
use super::{Crawler, FANOUT, RATE_LIMIT_DELAY};
/// The number of iterations to crawl while testing.
///
/// Note that this affects the total run time of the [`crawler_requests_for_transaction_ids`] test.
/// See more information in [`MAX_REQUEST_DELAY`].
const CRAWL_ITERATIONS: usize = 4;
/// The maximum time to wait for a request to arrive before considering it won't arrive.
///
/// Note that this affects the total run time of the [`crawler_requests_for_transaction_ids`] test.
/// There are [`CRAWL_ITERATIONS`] requests that are expected to not be sent, so the test runs for
/// at least `MAX_REQUEST_DELAY * CRAWL_ITERATIONS`.
const MAX_REQUEST_DELAY: Duration = Duration::from_millis(250);
/// The amount of time to advance beyond the expected instant that the crawler wakes up.
const ERROR_MARGIN: Duration = Duration::from_millis(100);
#[tokio::test]
async fn crawler_requests_for_transaction_ids() {
let (peer_set, mut requests) = mock_peer_set();
Crawler::spawn(peer_set);
time::pause();
for _ in 0..CRAWL_ITERATIONS {
for _ in 0..FANOUT {
let request = timeout(MAX_REQUEST_DELAY, requests.recv()).await;
assert!(matches!(request, Ok(Some(Request::MempoolTransactionIds))));
}
let extra_request = timeout(MAX_REQUEST_DELAY, requests.recv()).await;
assert!(extra_request.is_err());
time::advance(RATE_LIMIT_DELAY + ERROR_MARGIN).await;
}
}
/// Create a mock service to represent a [`PeerSet`][zebra_network::PeerSet] and intercept the
/// requests it receives.
///
/// The intercepted requests are sent through an unbounded channel to the receiver that's also
/// returned from this function.
fn mock_peer_set() -> (
Buffer<BoxService<Request, Response, BoxError>, Request>,
UnboundedReceiver<Request>,
) {
let (sender, receiver) = mpsc::unbounded_channel();
let proxy_service = tower::service_fn(move |request| {
let sender = sender.clone();
async move {
let _ = sender.send(request);
Ok(Response::TransactionIds(vec![]))
}
});
let service = Buffer::new(BoxService::new(proxy_service), 10);
(service, receiver)
}