use std::{cmp::min, mem, sync::Arc, time::Duration}; use futures::stream::{FuturesUnordered, StreamExt}; use tokio::time::{sleep, sleep_until, timeout, Instant, Sleep}; use tower::{Service, ServiceExt}; use zebra_chain::serialization::DateTime32; use crate::{constants, types::MetaAddr, AddressBook, BoxError, Request, Response}; #[cfg(test)] mod tests; /// The `CandidateSet` manages the `PeerSet`'s peer reconnection attempts. /// /// It divides the set of all possible candidate peers into disjoint subsets, /// using the `PeerAddrState`: /// /// 1. `Responded` peers, which we previously had inbound or outbound connections /// to. If we have not received any messages from a `Responded` peer within a /// cutoff time, we assume that it has disconnected or hung, and attempt /// reconnection; /// 2. `NeverAttempted` peers, which we learned about from other peers or a DNS /// seeder, but have never connected to; /// 3. `Failed` peers, to whom we attempted to connect but were unable to; /// 4. `AttemptPending` peers, which we've recently queued for reconnection. /// /// ```ascii,no_run /// ┌──────────────────┐ /// │ PeerSet │ /// │GetPeers Responses│ /// └──────────────────┘ /// │ /// │ /// │ /// │ /// ▼ /// filter by Λ /// !contains_addr ╱ ╲ /// ┌────────────────────────────▶▕ ▏ /// │ ╲ ╱ /// │ V /// │ │ /// │ │ /// │ │ /// │ ┌──────────────────┐ │ /// │ │ Inbound │ │ /// │ │ Peer Connections │ │ /// │ └──────────────────┘ │ /// │ │ │ /// ├──────────┼────────────────────┼───────────────────────────────┐ /// │ PeerSet ▼ AddressBook ▼ │ /// │ ┌─────────────┐ ┌────────────────┐ ┌─────────────┐ │ /// │ │ Possibly │ │`NeverAttempted`│ │ `Failed` │ │ /// │ │Disconnected │ │ Peers │ │ Peers │◀┼┐ /// │ │ `Responded` │ │ │ │ │ ││ /// │ │ Peers │ │ │ │ │ ││ /// │ └─────────────┘ └────────────────┘ └─────────────┘ ││ /// │ │ │ │ ││ /// │ #1 oldest_first #2 newest_first #3 oldest_first ││ /// │ │ │ │ ││ /// │ ├──────────────────────┴──────────────────────┘ ││ /// │ │ disjoint `PeerAddrState`s ││ /// ├────────┼──────────────────────────────────────────────────────┘│ /// │ ▼ │ /// │ Λ │ /// │ ╱ ╲ filter by │ /// └─────▶▕ ▏!is_potentially_connected │ /// ╲ ╱ to remove live │ /// V `Responded` peers │ /// │ │ /// │ Try outbound connection │ /// ▼ │ /// ┌────────────────┐ │ /// │`AttemptPending`│ │ /// │ Peers │ │ /// │ │ │ /// └────────────────┘ │ /// │ │ /// │ │ /// ▼ │ /// Λ │ /// ╱ ╲ │ /// ▕ ▏─────────────────────────────────────────────────────┘ /// ╲ ╱ connection failed, update last_seen to now() /// V /// │ /// │ /// ▼ /// ┌────────────┐ /// │ send │ /// │peer::Client│ /// │to Discover │ /// └────────────┘ /// │ /// │ /// ▼ /// ┌───────────────────────────────────────┐ /// │ every time we receive a peer message: │ /// │ * update state to `Responded` │ /// │ * update last_seen to now() │ /// └───────────────────────────────────────┘ /// /// ``` // TODO: // * draw arrow from the "peer message" box into the `Responded` state box // * make the "disjoint states" box include `AttemptPending` pub(super) struct CandidateSet { pub(super) address_book: Arc>, pub(super) peer_service: S, next_peer_min_wait: Sleep, } impl CandidateSet where S: Service, S::Future: Send + 'static, { /// The minimum time between successive calls to `CandidateSet::next()`. /// /// ## Security /// /// Zebra resists distributed denial of service attacks by making sure that new peer connections /// are initiated at least `MIN_PEER_CONNECTION_INTERVAL` apart. const MIN_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(100); /// Uses `address_book` and `peer_service` to manage a [`CandidateSet`] of peers. pub fn new( address_book: Arc>, peer_service: S, ) -> CandidateSet { CandidateSet { address_book, peer_service, next_peer_min_wait: sleep(Duration::from_secs(0)), } } /// Update the peer set from the network, using the default fanout limit. /// /// See [`update_initial`][Self::update_initial] for details. pub async fn update(&mut self) -> Result<(), BoxError> { self.update_timeout(None).await } /// Update the peer set from the network, limiting the fanout to /// `fanout_limit`. /// /// - Ask a few live [`Responded`] peers to send us more peers. /// - Process all completed peer responses, adding new peers in the /// [`NeverAttemptedGossiped`] state. /// /// ## Correctness /// /// Pass the initial peer set size as `fanout_limit` during initialization, /// so that Zebra does not send duplicate requests to the same peer. /// /// The crawler exits when update returns an error, so it must only return /// errors on permanent failures. /// /// The handshaker sets up the peer message receiver so it also sends a /// [`Responded`] peer address update. /// /// [`report_failed`][Self::report_failed] puts peers into the [`Failed`] state. /// /// [`next`][Self::next] puts peers into the [`AttemptPending`] state. /// /// [`Responded`]: crate::PeerAddrState::Responded /// [`NeverAttemptedGossiped`]: crate::PeerAddrState::NeverAttemptedGossiped /// [`Failed`]: crate::PeerAddrState::Failed /// [`AttemptPending`]: crate::PeerAddrState::AttemptPending pub async fn update_initial(&mut self, fanout_limit: usize) -> Result<(), BoxError> { self.update_timeout(Some(fanout_limit)).await } /// Update the peer set from the network, limiting the fanout to /// `fanout_limit`, and imposing a timeout on the entire fanout. /// /// See [`update_initial`][Self::update_initial] for details. async fn update_timeout(&mut self, fanout_limit: Option) -> Result<(), BoxError> { // CORRECTNESS // // Use a timeout to avoid deadlocks when there are no connected // peers, and: // - we're waiting on a handshake to complete so there are peers, or // - another task that handles or adds peers is waiting on this task // to complete. if let Ok(fanout_result) = timeout(constants::REQUEST_TIMEOUT, self.update_fanout(fanout_limit)).await { fanout_result?; } else { // update must only return an error for permanent failures info!("timeout waiting for the peer service to become ready"); } Ok(()) } /// Update the peer set from the network, limiting the fanout to /// `fanout_limit`. /// /// See [`update_initial`][Self::update_initial] for details. /// /// # Correctness /// /// This function does not have a timeout. /// Use [`update_timeout`][Self::update_timeout] instead. async fn update_fanout(&mut self, fanout_limit: Option) -> Result<(), BoxError> { // Opportunistically crawl the network on every update call to ensure // we're actively fetching peers. Continue independently of whether we // actually receive any peers, but always ask the network for more. // // Because requests are load-balanced across existing peers, we can make // multiple requests concurrently, which will be randomly assigned to // existing peers, but we don't make too many because update may be // called while the peer set is already loaded. let mut responses = FuturesUnordered::new(); let fanout_limit = fanout_limit .map(|fanout_limit| min(fanout_limit, constants::GET_ADDR_FANOUT)) .unwrap_or(constants::GET_ADDR_FANOUT); debug!(?fanout_limit, "sending GetPeers requests"); // TODO: launch each fanout in its own task (might require tokio 1.6) for _ in 0..fanout_limit { let peer_service = self.peer_service.ready_and().await?; responses.push(peer_service.call(Request::Peers)); } while let Some(rsp) = responses.next().await { match rsp { Ok(Response::Peers(addrs)) => { trace!( addr_count = ?addrs.len(), ?addrs, "got response to GetPeers" ); let addrs = validate_addrs(addrs, DateTime32::now()); self.send_addrs(addrs); } Err(e) => { // since we do a fanout, and new updates are triggered by // each demand, we can ignore errors in individual responses trace!(?e, "got error in GetPeers request"); } Ok(_) => unreachable!("Peers requests always return Peers responses"), } } Ok(()) } /// Add new `addrs` to the address book. fn send_addrs(&self, addrs: impl IntoIterator) { // # Correctness // // Briefly hold the address book threaded mutex, to extend // the address list. // // Extend handles duplicate addresses internally. self.address_book.lock().unwrap().extend(addrs); } /// Returns the next candidate for a connection attempt, if any are available. /// /// Returns peers in this order: /// - oldest `Responded` that are not live /// - newest `NeverAttempted` /// - oldest `Failed` /// /// Skips `AttemptPending` peers and live `Responded` peers. /// /// ## Correctness /// /// `AttemptPending` peers will become `Responded` if they respond, or /// become `Failed` if they time out or provide a bad response. /// /// Live `Responded` peers will stay live if they keep responding, or /// become a reconnection candidate if they stop responding. /// /// ## Security /// /// Zebra resists distributed denial of service attacks by making sure that /// new peer connections are initiated at least /// `MIN_PEER_CONNECTION_INTERVAL` apart. pub async fn next(&mut self) -> Option { let current_deadline = self.next_peer_min_wait.deadline().max(Instant::now()); let mut sleep = sleep_until(current_deadline + Self::MIN_PEER_CONNECTION_INTERVAL); mem::swap(&mut self.next_peer_min_wait, &mut sleep); // # Correctness // // In this critical section, we hold the address mutex, blocking the // current thread, and all async tasks scheduled on that thread. // // To avoid deadlocks, the critical section: // - must not acquire any other locks // - must not await any futures // // To avoid hangs, any computation in the critical section should // be kept to a minimum. let reconnect = { let mut guard = self.address_book.lock().unwrap(); // It's okay to return without sleeping here, because we're returning // `None`. We only need to sleep before yielding an address. let reconnect = guard.reconnection_peers().next()?; let reconnect = MetaAddr::new_reconnect(&reconnect.addr, &reconnect.services); guard.update(reconnect); reconnect }; // SECURITY: rate-limit new candidate connections sleep.await; Some(reconnect) } /// Mark `addr` as a failed peer. pub fn report_failed(&mut self, addr: &MetaAddr) { let addr = MetaAddr::new_errored(&addr.addr, &addr.services); // # Correctness // // Briefly hold the address book threaded mutex, to update the state for // a single address. self.address_book.lock().unwrap().update(addr); } } /// Check new `addrs` before adding them to the address book. /// /// `last_seen_limit` is the maximum permitted last seen time, typically /// [`Utc::now`]. /// /// If the data in an address is invalid, this function can: /// - modify the address data, or /// - delete the address. /// /// # Security /// /// Adjusts untrusted last seen times so they are not in the future. This stops /// malicious peers keeping all their addresses at the front of the connection /// queue. Honest peers with future clock skew also get adjusted. /// /// Rejects all addresses if any calculated times overflow or underflow. fn validate_addrs( addrs: impl IntoIterator, last_seen_limit: DateTime32, ) -> impl Iterator { // Note: The address book handles duplicate addresses internally, // so we don't need to de-duplicate addresses here. // TODO: // We should eventually implement these checks in this function: // - Zebra should ignore peers that are older than 3 weeks (part of #1865) // - Zebra should count back 3 weeks from the newest peer timestamp sent // by the other peer, to compensate for clock skew // - Zebra should limit the number of addresses it uses from a single Addrs // response (#1869) let mut addrs: Vec<_> = addrs.into_iter().collect(); limit_last_seen_times(&mut addrs, last_seen_limit); addrs.into_iter() } /// Ensure all reported `last_seen` times are less than or equal to `last_seen_limit`. /// /// This will consider all addresses as invalid if trying to offset their /// `last_seen` times to be before the limit causes an underflow. fn limit_last_seen_times(addrs: &mut Vec, last_seen_limit: DateTime32) { let (oldest_reported_seen_timestamp, newest_reported_seen_timestamp) = addrs .iter() .fold((u32::MAX, u32::MIN), |(oldest, newest), addr| { let last_seen = addr.get_last_seen().timestamp(); (oldest.min(last_seen), newest.max(last_seen)) }); // If any time is in the future, adjust all times, to compensate for clock skew on honest peers if newest_reported_seen_timestamp > last_seen_limit.timestamp() { let offset = newest_reported_seen_timestamp - last_seen_limit.timestamp(); // Apply offset to oldest timestamp to check for underflow let oldest_resulting_timestamp = oldest_reported_seen_timestamp as i64 - offset as i64; if oldest_resulting_timestamp >= 0 { // No underflow is possible, so apply offset to all addresses for addr in addrs { let old_last_seen = addr.get_last_seen().timestamp(); let new_last_seen = old_last_seen - offset; addr.set_last_seen(new_last_seen.into()); } } else { // An underflow will occur, so reject all gossiped peers addrs.clear(); } } }