Security: When there are no new peers, stop crawler using CPU and writing logs (#3177)
* Stop useless crawler attempts when there are no peers and no crawl responses * Disable GitHub bug report URLs when the disk is full * Add help text for the `zebrad start` tracing filter option
This commit is contained in:
parent
7dd2ac267c
commit
37808eaadb
|
@ -7,7 +7,9 @@ use tower::{Service, ServiceExt};
|
|||
|
||||
use zebra_chain::serialization::DateTime32;
|
||||
|
||||
use crate::{constants, types::MetaAddr, AddressBook, BoxError, Request, Response};
|
||||
use crate::{
|
||||
constants, peer_set::set::MorePeers, types::MetaAddr, AddressBook, BoxError, Request, Response,
|
||||
};
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
@ -140,7 +142,7 @@ where
|
|||
/// Update the peer set from the network, using the default fanout limit.
|
||||
///
|
||||
/// See [`update_initial`][Self::update_initial] for details.
|
||||
pub async fn update(&mut self) -> Result<(), BoxError> {
|
||||
pub async fn update(&mut self) -> Result<Option<MorePeers>, BoxError> {
|
||||
self.update_timeout(None).await
|
||||
}
|
||||
|
||||
|
@ -151,6 +153,9 @@ where
|
|||
/// - Process all completed peer responses, adding new peers in the
|
||||
/// [`NeverAttemptedGossiped`] state.
|
||||
///
|
||||
/// Returns `Some(MorePeers)` if the crawl was successful and the crawler
|
||||
/// should ask for more peers. Returns `None` if there are no new peers.
|
||||
///
|
||||
/// ## Correctness
|
||||
///
|
||||
/// Pass the initial peer set size as `fanout_limit` during initialization,
|
||||
|
@ -177,7 +182,10 @@ where
|
|||
/// [`NeverAttemptedGossiped`]: crate::PeerAddrState::NeverAttemptedGossiped
|
||||
/// [`Failed`]: crate::PeerAddrState::Failed
|
||||
/// [`AttemptPending`]: crate::PeerAddrState::AttemptPending
|
||||
pub async fn update_initial(&mut self, fanout_limit: usize) -> Result<(), BoxError> {
|
||||
pub async fn update_initial(
|
||||
&mut self,
|
||||
fanout_limit: usize,
|
||||
) -> Result<Option<MorePeers>, BoxError> {
|
||||
self.update_timeout(Some(fanout_limit)).await
|
||||
}
|
||||
|
||||
|
@ -185,7 +193,12 @@ where
|
|||
/// `fanout_limit`, and imposing a timeout on the entire fanout.
|
||||
///
|
||||
/// See [`update_initial`][Self::update_initial] for details.
|
||||
async fn update_timeout(&mut self, fanout_limit: Option<usize>) -> Result<(), BoxError> {
|
||||
async fn update_timeout(
|
||||
&mut self,
|
||||
fanout_limit: Option<usize>,
|
||||
) -> Result<Option<MorePeers>, BoxError> {
|
||||
let mut more_peers = None;
|
||||
|
||||
// SECURITY
|
||||
//
|
||||
// Rate limit sending `GetAddr` messages to peers.
|
||||
|
@ -203,7 +216,7 @@ where
|
|||
)
|
||||
.await
|
||||
{
|
||||
fanout_result?;
|
||||
more_peers = fanout_result?;
|
||||
} else {
|
||||
// update must only return an error for permanent failures
|
||||
info!("timeout waiting for peer service readiness or peer responses");
|
||||
|
@ -212,37 +225,48 @@ where
|
|||
self.min_next_crawl = Instant::now() + constants::MIN_PEER_GET_ADDR_INTERVAL;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(more_peers)
|
||||
}
|
||||
|
||||
/// Update the peer set from the network, limiting the fanout to
|
||||
/// `fanout_limit`.
|
||||
///
|
||||
/// See [`update_initial`][Self::update_initial] for details.
|
||||
/// Opportunistically crawl the network on every update call to ensure
|
||||
/// we're actively fetching peers. Continue independently of whether we
|
||||
/// actually receive any peers, but always ask the network for more.
|
||||
///
|
||||
/// Because requests are load-balanced across existing peers, we can make
|
||||
/// multiple requests concurrently, which will be randomly assigned to
|
||||
/// existing peers, but we don't make too many because update may be
|
||||
/// called while the peer set is already loaded.
|
||||
///
|
||||
/// See [`update_initial`][Self::update_initial] for more details.
|
||||
///
|
||||
/// # Correctness
|
||||
///
|
||||
/// This function does not have a timeout.
|
||||
/// Use [`update_timeout`][Self::update_timeout] instead.
|
||||
async fn update_fanout(&mut self, fanout_limit: Option<usize>) -> Result<(), BoxError> {
|
||||
// Opportunistically crawl the network on every update call to ensure
|
||||
// we're actively fetching peers. Continue independently of whether we
|
||||
// actually receive any peers, but always ask the network for more.
|
||||
//
|
||||
// Because requests are load-balanced across existing peers, we can make
|
||||
// multiple requests concurrently, which will be randomly assigned to
|
||||
// existing peers, but we don't make too many because update may be
|
||||
// called while the peer set is already loaded.
|
||||
let mut responses = FuturesUnordered::new();
|
||||
async fn update_fanout(
|
||||
&mut self,
|
||||
fanout_limit: Option<usize>,
|
||||
) -> Result<Option<MorePeers>, BoxError> {
|
||||
let fanout_limit = fanout_limit
|
||||
.map(|fanout_limit| min(fanout_limit, constants::GET_ADDR_FANOUT))
|
||||
.unwrap_or(constants::GET_ADDR_FANOUT);
|
||||
debug!(?fanout_limit, "sending GetPeers requests");
|
||||
|
||||
let mut responses = FuturesUnordered::new();
|
||||
let mut more_peers = None;
|
||||
|
||||
// Launch requests
|
||||
//
|
||||
// TODO: launch each fanout in its own task (might require tokio 1.6)
|
||||
for _ in 0..fanout_limit {
|
||||
let peer_service = self.peer_service.ready().await?;
|
||||
responses.push(peer_service.call(Request::Peers));
|
||||
}
|
||||
|
||||
// Process responses
|
||||
while let Some(rsp) = responses.next().await {
|
||||
match rsp {
|
||||
Ok(Response::Peers(addrs)) => {
|
||||
|
@ -253,6 +277,7 @@ where
|
|||
);
|
||||
let addrs = validate_addrs(addrs, DateTime32::now());
|
||||
self.send_addrs(addrs);
|
||||
more_peers = Some(MorePeers);
|
||||
}
|
||||
Err(e) => {
|
||||
// since we do a fanout, and new updates are triggered by
|
||||
|
@ -263,7 +288,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(more_peers)
|
||||
}
|
||||
|
||||
/// Add new `addrs` to the address book.
|
||||
|
|
|
@ -698,9 +698,22 @@ where
|
|||
//
|
||||
// TODO: refactor candidates into a buffered service, so we can
|
||||
// spawn independent tasks to avoid deadlocks
|
||||
candidates.update().await?;
|
||||
// Try to connect to a new peer.
|
||||
let _ = demand_tx.try_send(MorePeers);
|
||||
let more_peers = candidates.update().await?;
|
||||
|
||||
// If we got more peers, try to connect to a new peer.
|
||||
//
|
||||
// # Security
|
||||
//
|
||||
// Update attempts are rate-limited by the candidate set.
|
||||
//
|
||||
// We only try peers if there was actually an update.
|
||||
// So if all peers have had a recent attempt,
|
||||
// and there was recent update with no peers,
|
||||
// the channel will drain.
|
||||
// This prevents useless update attempt loops.
|
||||
if let Some(more_peers) = more_peers {
|
||||
let _ = demand_tx.try_send(more_peers);
|
||||
}
|
||||
}
|
||||
TimerCrawl { tick } => {
|
||||
debug!(
|
||||
|
@ -726,6 +739,8 @@ where
|
|||
// The demand signal that was taken out of the queue
|
||||
// to attempt to connect to the failed candidate never
|
||||
// turned into a connection, so add it back:
|
||||
//
|
||||
// Security: handshake failures are rate-limited by peer attempt timeouts.
|
||||
let _ = demand_tx.try_send(MorePeers);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,7 +39,7 @@ impl fmt::Debug for ActiveConnectionCounter {
|
|||
impl ActiveConnectionCounter {
|
||||
/// Create and return a new active connection counter.
|
||||
pub fn new_counter() -> Self {
|
||||
// TODO: This channel will be bounded by the connection limit (#1850, #1851, #2902).
|
||||
// The number of items in this channel is bounded by the connection limit.
|
||||
let (close_notification_tx, close_notification_rx) = mpsc::unbounded_channel();
|
||||
|
||||
Self {
|
||||
|
@ -73,7 +73,7 @@ impl ActiveConnectionCounter {
|
|||
);
|
||||
}
|
||||
|
||||
debug!(
|
||||
trace!(
|
||||
open_connections = ?self.count,
|
||||
?previous_connections,
|
||||
"updated active connection count"
|
||||
|
|
|
@ -299,7 +299,9 @@ impl Application for ZebradApp {
|
|||
}
|
||||
|
||||
let error_str = error.to_string();
|
||||
!error_str.contains("timed out") && !error_str.contains("duplicate hash")
|
||||
!error_str.contains("timed out")
|
||||
&& !error_str.contains("duplicate hash")
|
||||
&& !error_str.contains("No space left on device")
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
@ -72,8 +72,8 @@ use crate::{
|
|||
/// `start` subcommand
|
||||
#[derive(Command, Debug, Options)]
|
||||
pub struct StartCmd {
|
||||
/// Filter strings
|
||||
#[options(free)]
|
||||
/// Filter strings which override the config file and defaults
|
||||
#[options(free, help = "tracing filters which override the zebrad.toml config")]
|
||||
filters: Vec<String>,
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue