diff --git a/zebra-network/src/constants.rs b/zebra-network/src/constants.rs index db90e4c55..c372949b1 100644 --- a/zebra-network/src/constants.rs +++ b/zebra-network/src/constants.rs @@ -84,6 +84,10 @@ pub const DEFAULT_MAX_CONNS_PER_IP: usize = 1; /// This will be used as `Config.peerset_initial_target_size` if no valid value is provided. pub const DEFAULT_PEERSET_INITIAL_TARGET_SIZE: usize = 25; +/// The maximum number of peers we will add to the address book after each `getaddr` request. +pub const PEER_ADDR_RESPONSE_LIMIT: usize = + DEFAULT_PEERSET_INITIAL_TARGET_SIZE * OUTBOUND_PEER_LIMIT_MULTIPLIER / 2; + /// The buffer size for the peer set. /// /// This should be greater than 1 to avoid sender contention, but also reasonably diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index f52ef565d..83f1c502a 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -14,7 +14,7 @@ use futures::{ prelude::*, stream::Stream, }; -use rand::{thread_rng, Rng}; +use rand::{seq::SliceRandom, thread_rng, Rng}; use tokio::time::{sleep, Sleep}; use tower::{Service, ServiceExt}; use tracing_futures::Instrument; @@ -27,8 +27,8 @@ use zebra_chain::{ use crate::{ constants::{ - self, MAX_OVERLOAD_DROP_PROBABILITY, MIN_OVERLOAD_DROP_PROBABILITY, - OVERLOAD_PROTECTION_INTERVAL, + self, MAX_ADDRS_IN_MESSAGE, MAX_OVERLOAD_DROP_PROBABILITY, MIN_OVERLOAD_DROP_PROBABILITY, + OVERLOAD_PROTECTION_INTERVAL, PEER_ADDR_RESPONSE_LIMIT, }, meta_addr::MetaAddr, peer::{ @@ -137,7 +137,14 @@ impl Handler { /// interpretable as a response, we return ownership to the caller. /// /// Unexpected messages are left unprocessed, and may be rejected later. - fn process_message(&mut self, msg: Message) -> Option { + /// + /// `addr` responses are limited to avoid peer set takeover. Any excess + /// addresses are stored in `cached_addrs`. + fn process_message( + &mut self, + msg: Message, + cached_addrs: &mut Vec, + ) -> Option { let mut ignored_msg = None; // TODO: can this be avoided? let tmp_state = std::mem::replace(self, Handler::Finished(Ok(Response::Nil))); @@ -152,7 +159,24 @@ impl Handler { Handler::Ping(req_nonce) } } - (Handler::Peers, Message::Addr(addrs)) => Handler::Finished(Ok(Response::Peers(addrs))), + + (Handler::Peers, Message::Addr(new_addrs)) => { + // Security: This method performs security-sensitive operations, see its comments + // for details. + let response_addrs = + Handler::update_addr_cache(cached_addrs, &new_addrs, PEER_ADDR_RESPONSE_LIMIT); + + debug!( + new_addrs = new_addrs.len(), + response_addrs = response_addrs.len(), + remaining_addrs = cached_addrs.len(), + PEER_ADDR_RESPONSE_LIMIT, + "responding to Peers request using new and cached addresses", + ); + + Handler::Finished(Ok(Response::Peers(response_addrs))) + } + // `zcashd` returns requested transactions in a single batch of messages. // Other transaction or non-transaction messages can come before or after the batch. // After the transaction batch, `zcashd` sends `notfound` if any transactions are missing: @@ -251,6 +275,7 @@ impl Handler { ))) } } + // `zcashd` returns requested blocks in a single batch of messages. // Other blocks or non-blocks messages can come before or after the batch. // `zcashd` silently skips missing blocks, rather than sending a final `notfound` message. @@ -365,6 +390,10 @@ impl Handler { block_hashes(&items[..]).collect(), ))) } + (Handler::FindHeaders, Message::Headers(headers)) => { + Handler::Finished(Ok(Response::BlockHeaders(headers))) + } + (Handler::MempoolTransactionIds, Message::Inv(items)) if items.iter().all(|item| item.unmined_tx_id().is_some()) => { @@ -372,9 +401,7 @@ impl Handler { transaction_ids(&items).collect(), ))) } - (Handler::FindHeaders, Message::Headers(headers)) => { - Handler::Finished(Ok(Response::BlockHeaders(headers))) - } + // By default, messages are not responses. (state, msg) => { trace!(?msg, "did not interpret message as response"); @@ -385,6 +412,52 @@ impl Handler { ignored_msg } + + /// Adds `new_addrs` to the `cached_addrs` cache, then takes and returns `response_size` + /// addresses from that cache. + /// + /// `cached_addrs` can be empty if the cache is empty. `new_addrs` can be empty or `None` if + /// there are no new addresses. `response_size` can be zero or `None` if there is no response + /// needed. + fn update_addr_cache<'new>( + cached_addrs: &mut Vec, + new_addrs: impl IntoIterator, + response_size: impl Into>, + ) -> Vec { + // # Peer Set Reliability + // + // Newly received peers are added to the cache, so that we can use them if the connection + // doesn't respond to our getaddr requests. + // + // Add the new addresses to the end of the cache. + cached_addrs.extend(new_addrs); + + // # Security + // + // We limit how many peer addresses we take from each peer, so that our address book + // and outbound connections aren't controlled by a single peer (#1869). We randomly select + // peers, so the remote peer can't control which addresses we choose by changing the order + // in the messages they send. + let response_size = response_size.into().unwrap_or_default(); + + let mut temp_cache = Vec::new(); + std::mem::swap(cached_addrs, &mut temp_cache); + + // The response is fully shuffled, remaining is partially shuffled. + let (response, remaining) = temp_cache.partial_shuffle(&mut thread_rng(), response_size); + + // # Security + // + // The cache size is limited to avoid memory denial of service. + // + // It's ok to just partially shuffle the cache, because it doesn't actually matter which + // peers we drop. Having excess peers is rare, because most peers only send one large + // unsolicited peer message when they first connect. + *cached_addrs = remaining.to_vec(); + cached_addrs.truncate(MAX_ADDRS_IN_MESSAGE); + + response.to_vec() + } } #[derive(Debug)] @@ -780,7 +853,7 @@ where let request_msg = match self.state { State::AwaitingResponse { ref mut handler, .. - } => span.in_scope(|| handler.process_message(peer_msg)), + } => span.in_scope(|| handler.process_message(peer_msg, &mut self.cached_addrs)), _ => unreachable!("unexpected state after AwaitingResponse: {:?}, peer_msg: {:?}, client_receiver: {:?}", self.state, peer_msg, @@ -929,16 +1002,21 @@ where self.client_rx ), - // Consume the cached addresses from the peer, - // to work-around a `zcashd` response rate-limit. + // Take some cached addresses from the peer connection. This address cache helps + // work-around a `zcashd` addr response rate-limit. (AwaitingRequest, Peers) if !self.cached_addrs.is_empty() => { - let cached_addrs = std::mem::take(&mut self.cached_addrs); + // Security: This method performs security-sensitive operations, see its comments + // for details. + let response_addrs = Handler::update_addr_cache(&mut self.cached_addrs, None, PEER_ADDR_RESPONSE_LIMIT); + debug!( - addrs = cached_addrs.len(), - "responding to Peers request using cached addresses", + response_addrs = response_addrs.len(), + remaining_addrs = self.cached_addrs.len(), + PEER_ADDR_RESPONSE_LIMIT, + "responding to Peers request using some cached addresses", ); - Ok(Handler::Finished(Ok(Response::Peers(cached_addrs)))) + Ok(Handler::Finished(Ok(Response::Peers(response_addrs)))) } (AwaitingRequest, Peers) => self .peer_tx @@ -1145,28 +1223,32 @@ where // Ignored, but consumed because it is technically a protocol error. Consumed } - // Zebra crawls the network proactively, to prevent - // peers from inserting data into our address book. - Message::Addr(ref addrs) => { - // Workaround `zcashd`'s `getaddr` response rate-limit - if addrs.len() > 1 { - // Always refresh the cache with multi-addr messages. - debug!(%msg, "caching unsolicited multi-addr message"); - self.cached_addrs = addrs.clone(); - Consumed - } else if addrs.len() == 1 && self.cached_addrs.len() <= 1 { - // Only refresh a cached single addr message with another single addr. - // (`zcashd` regularly advertises its own address.) - debug!(%msg, "caching unsolicited single addr message"); - self.cached_addrs = addrs.clone(); - Consumed - } else { - debug!( - %msg, - "ignoring unsolicited single addr message: already cached a multi-addr message" - ); - Consumed - } + + // # Security + // + // Zebra crawls the network proactively, and that's the only way peers get into our + // address book. This prevents peers from filling our address book with malicious peer + // addresses. + Message::Addr(ref new_addrs) => { + // # Peer Set Reliability + // + // We keep a list of the unused peer addresses sent by each connection, to work + // around `zcashd`'s `getaddr` response rate-limit. + let no_response = + Handler::update_addr_cache(&mut self.cached_addrs, new_addrs, None); + assert_eq!( + no_response, + Vec::new(), + "peers unexpectedly taken from cache" + ); + + debug!( + new_addrs = new_addrs.len(), + cached_addrs = self.cached_addrs.len(), + "adding unsolicited addresses to cached addresses", + ); + + Consumed } Message::Tx(ref transaction) => Request::PushTransaction(transaction.clone()).into(), Message::Inv(ref items) => match &items[..] { diff --git a/zebra-network/src/peer/connection/tests/prop.rs b/zebra-network/src/peer/connection/tests/prop.rs index 3c4b2d51c..f633aea58 100644 --- a/zebra-network/src/peer/connection/tests/prop.rs +++ b/zebra-network/src/peer/connection/tests/prop.rs @@ -7,7 +7,7 @@ use futures::{ sink::SinkMapErr, SinkExt, StreamExt, }; -use proptest::prelude::*; +use proptest::{collection, prelude::*}; use tracing::Span; use zebra_chain::{ @@ -18,7 +18,12 @@ use zebra_chain::{ use zebra_test::mock_service::{MockService, PropTestAssertion}; use crate::{ - peer::{connection::Connection, ClientRequest, ErrorSlot}, + constants::{MAX_ADDRS_IN_MESSAGE, PEER_ADDR_RESPONSE_LIMIT}, + meta_addr::MetaAddr, + peer::{ + connection::{Connection, Handler}, + ClientRequest, ErrorSlot, + }, protocol::external::Message, protocol::internal::InventoryResponse, Request, Response, SharedPeerError, @@ -129,6 +134,41 @@ proptest! { Ok(()) })?; } + + /// This test makes sure that Zebra's per-connection peer cache is updated correctly. + #[test] + fn cache_is_updated_correctly( + mut cached_addrs in collection::vec(MetaAddr::gossiped_strategy(), 0..=MAX_ADDRS_IN_MESSAGE), + new_addrs in collection::vec(MetaAddr::gossiped_strategy(), 0..=MAX_ADDRS_IN_MESSAGE), + response_size in 0..=PEER_ADDR_RESPONSE_LIMIT, + ) { + let _init_guard = zebra_test::init(); + + let old_cached_addrs = cached_addrs.clone(); + + let response = Handler::update_addr_cache(&mut cached_addrs, &new_addrs, response_size); + + prop_assert!(cached_addrs.len() <= MAX_ADDRS_IN_MESSAGE, "cache has a limited size"); + prop_assert!(response.len() <= response_size, "response has a limited size"); + + prop_assert!(response.len() <= old_cached_addrs.len() + new_addrs.len(), "no duplicate or made up addresses in response"); + prop_assert!(cached_addrs.len() <= old_cached_addrs.len() + new_addrs.len(), "no duplicate or made up addresses in cache"); + + if old_cached_addrs.len() + new_addrs.len() >= response_size { + // If we deduplicate addresses, this check should fail and must be changed + prop_assert_eq!(response.len(), response_size, "response gets addresses before cache does"); + } else { + prop_assert!(response.len() < response_size, "response gets all addresses if there is no excess"); + } + + if old_cached_addrs.len() + new_addrs.len() <= response_size { + prop_assert_eq!(cached_addrs.len(), 0, "cache is empty if there are no excess addresses"); + } else { + // If we deduplicate addresses, this check should fail and must be changed + prop_assert_ne!(cached_addrs.len(), 0, "cache gets excess addresses"); + } + + } } /// Creates a new [`Connection`] instance for property tests. diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index b60dc7473..eb02c4645 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -77,8 +77,8 @@ mod tests; /// ││ ▼ ││ /// ││ Λ ││ /// ││ ╱ ╲ filter by ││ -/// ││ ▕ ▏ is_ready_for_connection_attempt ││ -/// ││ ╲ ╱ to remove recent `Responded`, ││ +/// ││ ▕ ▏ is_ready_for_connection_attempt ││ +/// ││ ╲ ╱ to remove recent `Responded`, ││ /// ││ V `AttemptPending`, and `Failed` peers ││ /// ││ │ ││ /// ││ │ try outbound connection, ││ @@ -105,7 +105,8 @@ mod tests; /// │ │ /// │ ▼ /// │┌───────────────────────────────────────┐ -/// ││ every time we receive a peer message: │ +/// ││ when connection succeeds, and every │ +/// ││ time we receive a peer heartbeat: │ /// └│ * update state to `Responded` │ /// │ * update last_response to now() │ /// └───────────────────────────────────────┘ @@ -120,11 +121,6 @@ mod tests; // TODO: // * show all possible transitions between Attempt/Responded/Failed, // except Failed -> Responded is invalid, must go through Attempt -// * for now, seed peers go straight to handshaking and responded, -// but we'll fix that once we add the Seed state -// When we add the Seed state: -// * show that seed peers that transition to other never attempted -// states are already in the address book pub(crate) struct CandidateSet where S: Service + Send, @@ -447,10 +443,8 @@ fn validate_addrs( // TODO: // We should eventually implement these checks in this function: // - Zebra should ignore peers that are older than 3 weeks (part of #1865) - // - Zebra should count back 3 weeks from the newest peer timestamp sent - // by the other peer, to compensate for clock skew - // - Zebra should limit the number of addresses it uses from a single Addrs - // response (#1869) + // - Zebra should count back 3 weeks from the newest peer timestamp sent + // by the other peer, to compensate for clock skew let mut addrs: Vec<_> = addrs.into_iter().collect();