security: Rate limit GetAddr responses (#7955)

* Updates ADDR_RESPONSE_LIMIT_DENOMINATOR to 4

* Moves logic getting a fraction of Zebra's peers to a method in the address book

* Adds and uses CachedPeerAddrs struct in inbound service

* moves and documents constant

* fixes test

* Apply suggestions from code review

Co-authored-by: teor <teor@riseup.net>

* updates docs

* renames sanitized_window method

* renames CachedPeerAddrs to CachedPeerAddrResponse

* updates test

* moves try_refresh to per request

* Make unused sanitization method pub(crate)

* Apply suggestions from code review

Co-authored-by: teor <teor@riseup.net>

* moves CachedPeerAddrResponse to a module

* updates unit test

* fixes unit test

* removes unnecessary condition

* clears cached getaddr response if it can't refresh for over a minute after the refresh time

* tests that inbound service gives out the same addresses for every Peers request before the refresh interval

* Applies suggestion from code review

* fixes doc link

* renames constant

* Fix docs on new constant

* applies suggestion from code review

* uses longer cache expiry time

* Adds code comments

---------

Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
Arya 2023-11-20 22:32:23 -05:00 committed by GitHub
parent 3be22b2cda
commit 5e4c0f973f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 276 additions and 76 deletions

View File

@ -18,7 +18,7 @@ use tracing::Span;
use zebra_chain::{parameters::Network, serialization::DateTime32};
use crate::{
constants,
constants::{self, ADDR_RESPONSE_LIMIT_DENOMINATOR, MAX_ADDRS_IN_MESSAGE},
meta_addr::MetaAddrChange,
protocol::external::{canonical_peer_addr, canonical_socket_addr},
types::MetaAddr,
@ -268,7 +268,20 @@ impl AddressBook {
/// Get the active addresses in `self` in random order with sanitized timestamps,
/// including our local listener address.
pub fn sanitized(&self, now: chrono::DateTime<Utc>) -> Vec<MetaAddr> {
///
/// Limited to a the number of peer addresses Zebra should give out per `GetAddr` request.
pub fn fresh_get_addr_response(&self) -> Vec<MetaAddr> {
let now = Utc::now();
let mut peers = self.sanitized(now);
let address_limit = peers.len().div_ceil(ADDR_RESPONSE_LIMIT_DENOMINATOR);
peers.truncate(MAX_ADDRS_IN_MESSAGE.min(address_limit));
peers
}
/// Get the active addresses in `self` in random order with sanitized timestamps,
/// including our local listener address.
pub(crate) fn sanitized(&self, now: chrono::DateTime<Utc>) -> Vec<MetaAddr> {
use rand::seq::SliceRandom;
let _guard = self.span.enter();

View File

@ -9,7 +9,10 @@ use tracing::Span;
use zebra_chain::{parameters::Network::*, serialization::Duration32};
use crate::{
constants::{DEFAULT_MAX_CONNS_PER_IP, MAX_ADDRS_IN_ADDRESS_BOOK, MAX_PEER_ACTIVE_FOR_GOSSIP},
constants::{
ADDR_RESPONSE_LIMIT_DENOMINATOR, DEFAULT_MAX_CONNS_PER_IP, MAX_ADDRS_IN_ADDRESS_BOOK,
MAX_ADDRS_IN_MESSAGE, MAX_PEER_ACTIVE_FOR_GOSSIP,
},
meta_addr::{arbitrary::MAX_META_ADDR, MetaAddr, MetaAddrChange},
AddressBook,
};
@ -36,8 +39,17 @@ proptest! {
addresses
);
for gossiped_address in address_book.sanitized(chrono_now) {
let duration_since_last_seen = gossiped_address
// Only recently reachable are sanitized
let sanitized = address_book.sanitized(chrono_now);
let gossiped = address_book.fresh_get_addr_response();
let expected_num_gossiped = sanitized.len().div_ceil(ADDR_RESPONSE_LIMIT_DENOMINATOR).min(MAX_ADDRS_IN_MESSAGE);
let num_gossiped = gossiped.len();
prop_assert_eq!(expected_num_gossiped, num_gossiped);
for sanitized_address in sanitized {
let duration_since_last_seen = sanitized_address
.last_seen()
.expect("Peer that was never seen before is being gossiped")
.saturating_elapsed(chrono_now)

View File

@ -309,7 +309,7 @@ pub const MAX_ADDRS_IN_MESSAGE: usize = 1000;
///
/// This limit makes sure that Zebra does not reveal its entire address book
/// in a single `Peers` response.
pub const ADDR_RESPONSE_LIMIT_DENOMINATOR: usize = 3;
pub const ADDR_RESPONSE_LIMIT_DENOMINATOR: usize = 4;
/// The maximum number of addresses Zebra will keep in its address book.
///
@ -499,8 +499,9 @@ mod tests {
#[test]
#[allow(clippy::assertions_on_constants)]
fn ensure_address_limits_consistent() {
// Zebra 1.0.0-beta.2 address book metrics in December 2021.
const TYPICAL_MAINNET_ADDRESS_BOOK_SIZE: usize = 4_500;
// Estimated network address book size in November 2023, after the address book limit was increased.
// Zebra 1.0.0-beta.2 address book metrics in December 2021 showed 4500 peers.
const TYPICAL_MAINNET_ADDRESS_BOOK_SIZE: usize = 5_500;
let _init_guard = zebra_test::init();
@ -515,7 +516,7 @@ mod tests {
);
assert!(
MAX_ADDRS_IN_ADDRESS_BOOK < TYPICAL_MAINNET_ADDRESS_BOOK_SIZE,
MAX_ADDRS_IN_ADDRESS_BOOK <= TYPICAL_MAINNET_ADDRESS_BOOK_SIZE,
"the address book limit should actually be used"
);
}

View File

@ -142,4 +142,10 @@ impl Response {
pub fn is_inventory_download(&self) -> bool {
matches!(self, Response::Blocks(_) | Response::Transactions(_))
}
/// Returns true if self is the [`Response::Nil`] variant.
#[allow(dead_code)]
pub fn is_nil(&self) -> bool {
matches!(self, Self::Nil)
}
}

View File

@ -9,17 +9,15 @@ use std::{
collections::HashSet,
future::Future,
pin::Pin,
sync::{Arc, TryLockError},
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use chrono::Utc;
use futures::{
future::{FutureExt, TryFutureExt},
stream::Stream,
};
use num_integer::div_ceil;
use tokio::sync::oneshot::{self, error::TryRecvError};
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service, ServiceExt};
@ -32,10 +30,7 @@ use zebra_chain::{
transaction::UnminedTxId,
};
use zebra_consensus::router::RouterError;
use zebra_network::{
constants::{ADDR_RESPONSE_LIMIT_DENOMINATOR, MAX_ADDRS_IN_MESSAGE},
AddressBook, InventoryResponse,
};
use zebra_network::{AddressBook, InventoryResponse};
use zebra_node_services::mempool;
use crate::BoxError;
@ -45,8 +40,11 @@ use super::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};
use InventoryResponse::*;
mod cached_peer_addr_response;
pub(crate) mod downloads;
use cached_peer_addr_response::CachedPeerAddrResponse;
#[cfg(test)]
mod tests;
@ -135,8 +133,12 @@ pub enum Setup {
Initialized {
// Services
//
/// A shared list of peer addresses.
address_book: Arc<std::sync::Mutex<zn::AddressBook>>,
/// An owned partial list of peer addresses used as a `GetAddr` response, and
/// a shared list of peer addresses used to periodically refresh the partial list.
///
/// Refreshed from the address book in `poll_ready` method
/// after [`CACHED_ADDRS_REFRESH_INTERVAL`](cached_peer_addr_response::CACHED_ADDRS_REFRESH_INTERVAL).
cached_peer_addr_response: CachedPeerAddrResponse,
/// A `futures::Stream` that downloads and verifies gossiped blocks.
block_downloads: Pin<Box<GossipedBlockDownloads>>,
@ -261,6 +263,8 @@ impl Service<zn::Request> for Inbound {
latest_chain_tip,
} = setup_data;
let cached_peer_addr_response = CachedPeerAddrResponse::new(address_book);
let block_downloads = Box::pin(BlockDownloads::new(
full_verify_concurrency_limit,
Timeout::new(block_download_peer_set, BLOCK_DOWNLOAD_TIMEOUT),
@ -271,7 +275,7 @@ impl Service<zn::Request> for Inbound {
result = Ok(());
Setup::Initialized {
address_book,
cached_peer_addr_response,
block_downloads,
mempool,
state,
@ -306,7 +310,7 @@ impl Service<zn::Request> for Inbound {
}
// Clean up completed download tasks, ignoring their results
Setup::Initialized {
address_book,
cached_peer_addr_response,
mut block_downloads,
mempool,
state,
@ -321,7 +325,7 @@ impl Service<zn::Request> for Inbound {
result = Ok(());
Setup::Initialized {
address_book,
cached_peer_addr_response,
block_downloads,
mempool,
state,
@ -352,13 +356,13 @@ impl Service<zn::Request> for Inbound {
/// and will cause callers to disconnect from the remote peer.
#[instrument(name = "inbound", skip(self, req))]
fn call(&mut self, req: zn::Request) -> Self::Future {
let (address_book, block_downloads, mempool, state) = match &mut self.setup {
let (cached_peer_addr_response, block_downloads, mempool, state) = match &mut self.setup {
Setup::Initialized {
address_book,
cached_peer_addr_response,
block_downloads,
mempool,
state,
} => (address_book, block_downloads, mempool, state),
} => (cached_peer_addr_response, block_downloads, mempool, state),
_ => {
debug!("ignoring request from remote peer during setup");
return async { Ok(zn::Response::Nil) }.boxed();
@ -377,58 +381,11 @@ impl Service<zn::Request> for Inbound {
//
// If the address book is busy, try again inside the future. If it can't be locked
// twice, ignore the request.
let address_book = address_book.clone();
let get_peers = move || match address_book.try_lock() {
Ok(address_book) => Some(address_book.clone()),
Err(TryLockError::WouldBlock) => None,
Err(TryLockError::Poisoned(_)) => panic!("previous thread panicked while holding the address book lock"),
};
let peers = get_peers();
cached_peer_addr_response.try_refresh();
let response = cached_peer_addr_response.value();
async move {
// Correctness: get the current time inside the future.
//
// This time is used to filter outdated peers, so it doesn't matter much
// if we get it when the future is created, or when it starts running.
let now = Utc::now();
// If we didn't get the peers when the future was created, wait for other tasks
// to run, then try again when the future first runs.
if peers.is_none() {
tokio::task::yield_now().await;
}
let peers = peers.or_else(get_peers);
let is_busy = peers.is_none();
// Send a sanitized response
let mut peers = peers.map_or_else(Vec::new, |peers| peers.sanitized(now));
// Truncate the list
let address_limit = div_ceil(peers.len(), ADDR_RESPONSE_LIMIT_DENOMINATOR);
let address_limit = MAX_ADDRS_IN_MESSAGE.min(address_limit);
peers.truncate(address_limit);
if peers.is_empty() {
// Sometimes we don't know if the peer response will be empty until we've
// sanitized them.
if is_busy {
info!(
"ignoring `Peers` request from remote peer because our address \
book is busy"
);
} else {
debug!(
"ignoring `Peers` request from remote peer because our address \
book has no available peers"
);
}
Ok(zn::Response::Nil)
} else {
Ok(zn::Response::Peers(peers))
}
Ok(response)
}.boxed()
}
zn::Request::BlocksByHash(hashes) => {

View File

@ -0,0 +1,98 @@
//! Periodically-refreshed GetAddr response for the inbound service.
//!
//! Used to avoid giving out Zebra's entire address book over a short duration.
use std::{
sync::{Mutex, TryLockError},
time::Instant,
};
use super::*;
/// The minimum duration that a `CachedPeerAddrResponse` is considered fresh before the inbound service
/// should get new peer addresses from the address book to send as a `GetAddr` response.
///
/// Cached responses are considered stale and should be cleared after twice this duration.
pub const CACHED_ADDRS_REFRESH_INTERVAL: Duration = Duration::from_secs(10 * 60);
/// Caches and refreshes a partial list of peer addresses to be returned as a `GetAddr` response.
pub struct CachedPeerAddrResponse {
/// A shared list of peer addresses.
address_book: Arc<Mutex<zn::AddressBook>>,
/// An owned list of peer addresses used as a `GetAddr` response.
value: zn::Response,
/// Instant after which `cached_addrs` should be refreshed.
refresh_time: Instant,
}
impl CachedPeerAddrResponse {
/// Creates a new empty [`CachedPeerAddrResponse`].
pub(super) fn new(address_book: Arc<Mutex<AddressBook>>) -> Self {
Self {
address_book,
value: zn::Response::Nil,
refresh_time: Instant::now(),
}
}
pub(super) fn value(&self) -> zn::Response {
self.value.clone()
}
/// Refreshes the `cached_addrs` if the time has past `refresh_time` or the cache is empty
pub(super) fn try_refresh(&mut self) {
let now = Instant::now();
// return early if there are some cached addresses, and they are still fresh
if now < self.refresh_time {
return;
}
let cache_expiry = self.refresh_time + CACHED_ADDRS_REFRESH_INTERVAL;
// try getting a lock on the address book if it's time to refresh the cached addresses
match self
.address_book
.try_lock()
.map(|book| book.fresh_get_addr_response())
{
// Update cached value and refresh_time if there are some gossipable peers in the address book.
//
// Security: this avoids outdated gossiped peers. Outdated Zebra binaries will gradually lose all their peers,
// because those peers refuse to connect to outdated versions. So we don't want those outdated Zebra
// versions to keep gossiping old peer information either.
Ok(peers) if !peers.is_empty() => {
self.refresh_time = now + CACHED_ADDRS_REFRESH_INTERVAL;
self.value = zn::Response::Peers(peers);
}
// Clear the cached response if the time has past the cache expiry time.
Ok(_) if now > cache_expiry => {
self.value = zn::Response::Nil;
}
Err(TryLockError::WouldBlock) if now > cache_expiry => {
warn!("getaddrs response hasn't been refreshed in some time");
self.value = zn::Response::Nil;
}
// Don't update the cached response or refresh time if unable to get new peer addresses
// from the address book and `now` is before the cache expiry.
Ok(_) => {
debug!(
"could not refresh cached response because our address \
book has no available peers"
);
}
Err(TryLockError::WouldBlock) => {}
// Panic if the address book lock is poisoned
Err(TryLockError::Poisoned(_)) => {
panic!("previous thread panicked while holding the address book lock")
}
};
}
}

View File

@ -19,12 +19,16 @@ use zebra_chain::{
block::{Block, Height},
fmt::humantime_seconds,
parameters::Network::{self, *},
serialization::ZcashDeserializeInto,
serialization::{DateTime32, ZcashDeserializeInto},
transaction::{UnminedTx, UnminedTxId, VerifiedUnminedTx},
};
use zebra_consensus::{error::TransactionError, transaction, Config as ConsensusConfig};
use zebra_network::{
constants::DEFAULT_MAX_CONNS_PER_IP, AddressBook, InventoryResponse, Request, Response,
constants::{
ADDR_RESPONSE_LIMIT_DENOMINATOR, DEFAULT_MAX_CONNS_PER_IP, MAX_ADDRS_IN_ADDRESS_BOOK,
},
types::{MetaAddr, PeerServices},
AddressBook, InventoryResponse, Request, Response,
};
use zebra_node_services::mempool;
use zebra_state::{ChainTipChange, Config as StateConfig, CHAIN_TIP_UPDATE_WAIT_LIMIT};
@ -742,6 +746,112 @@ async fn inbound_block_height_lookahead_limit() -> Result<(), crate::BoxError> {
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
/// Checks that Zebra won't give out its entire address book over a short duration.
async fn caches_getaddr_response() {
const NUM_ADDRESSES: usize = 20;
const NUM_REQUESTS: usize = 10;
const EXPECTED_NUM_RESULTS: usize = NUM_ADDRESSES / ADDR_RESPONSE_LIMIT_DENOMINATOR;
let _init_guard = zebra_test::init();
let addrs = (0..NUM_ADDRESSES)
.map(|idx| format!("127.0.0.{idx}:{idx}"))
.map(|addr| {
MetaAddr::new_gossiped_meta_addr(
addr.parse().unwrap(),
PeerServices::NODE_NETWORK,
DateTime32::now(),
)
});
let inbound = {
let network = Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let address_book = AddressBook::new_with_addrs(
SocketAddr::from_str("0.0.0.0:0").unwrap(),
Mainnet,
DEFAULT_MAX_CONNS_PER_IP,
MAX_ADDRS_IN_ADDRESS_BOOK,
Span::none(),
addrs,
);
let address_book = Arc::new(std::sync::Mutex::new(address_book));
// UTXO verification doesn't matter for these tests.
let (state, _read_only_state_service, latest_chain_tip, _chain_tip_change) =
zebra_state::init(state_config.clone(), network, Height::MAX, 0);
let state_service = ServiceBuilder::new().buffer(1).service(state);
// Download task panics and timeouts are propagated to the tests that use Groth16 verifiers.
let (
block_verifier,
_transaction_verifier,
_groth16_download_handle,
_max_checkpoint_height,
) = zebra_consensus::router::init(consensus_config.clone(), network, state_service.clone())
.await;
let peer_set = MockService::build()
.with_max_request_delay(MAX_PEER_SET_REQUEST_DELAY)
.for_unit_tests();
let buffered_peer_set = Buffer::new(BoxService::new(peer_set.clone()), 10);
let buffered_mempool_service =
Buffer::new(BoxService::new(MockService::build().for_unit_tests()), 10);
let (setup_tx, setup_rx) = oneshot::channel();
let inbound_service = ServiceBuilder::new()
.load_shed()
.service(Inbound::new(MAX_INBOUND_CONCURRENCY, setup_rx));
let inbound_service = BoxService::new(inbound_service);
let inbound_service = ServiceBuilder::new().buffer(1).service(inbound_service);
let setup_data = InboundSetupData {
address_book: address_book.clone(),
block_download_peer_set: buffered_peer_set,
block_verifier,
mempool: buffered_mempool_service.clone(),
state: state_service.clone(),
latest_chain_tip,
};
let r = setup_tx.send(setup_data);
// We can't expect or unwrap because the returned Result does not implement Debug
assert!(r.is_ok(), "unexpected setup channel send failure");
inbound_service
};
let Ok(zebra_network::Response::Peers(first_result)) =
inbound.clone().oneshot(zebra_network::Request::Peers).await
else {
panic!("result should match Ok(Peers(_))")
};
assert_eq!(
first_result.len(),
EXPECTED_NUM_RESULTS,
"inbound service should respond with expected number of peer addresses",
);
for _ in 0..NUM_REQUESTS {
let Ok(zebra_network::Response::Peers(peers)) =
inbound.clone().oneshot(zebra_network::Request::Peers).await
else {
panic!("result should match Ok(Peers(_))")
};
assert_eq!(
peers,
first_result,
"inbound service should return the same result for every Peers request until the refresh time",
);
}
}
/// Setup a fake Zebra network stack, with fake peer set.
///
/// Adds some initial state blocks, and mempool transactions if `add_transactions` is true.

View File

@ -59,6 +59,9 @@ async fn inbound_peers_empty_address_book() -> Result<(), crate::BoxError> {
listen_addr,
) = setup(None).await;
// yield and sleep until the address book lock is released.
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// Send a request to inbound directly
let request = inbound_service.clone().oneshot(Request::Peers);
let response = request.await;