fix(network): Avoid initiating outbound handshakes with IPs for which Zebra already has an active peer. (#7029)
* Adds most_recent_by_ip field to address book * adds test * Apply suggestions from code review * fixes lint * Updates most_recent_by_ip in .take() Updates should_update_most_recent_by_ip() and has_active_peer_with_ip to check last_attempt and last_failure times Renames has_active_peer_with_ip * Documents that Zebra will not initiate more than 1 outbound connections per IP * Fixes is_ready_for_connection_attempt_with_ip Adds test coverage for AttemptPending and Failed Fixes new_outbound_peer_connections_are_rate_limited proptest * Applies suggestions from code review. * Applies suggestions from code review * Always return true from `is_ready_for_connection_attempt_with_ip` if max_connections_per_ip != 0 * Update max_connections_per_ip config docs * Warn about invalid config fields and use default values * Ignores last_attempt and last_failure in is_ready_for_connection_attempt_with_ip updates test * Only update most_recent_by_ip if update.last_conn_state is responded. * Apply suggestions from code review Co-authored-by: teor <teor@riseup.net> * fixes lint * Update zebra-network/src/address_book.rs Co-authored-by: teor <teor@riseup.net> * Apply suggestions from code review Co-authored-by: teor <teor@riseup.net> * Fix Rust syntax * Fix whitespace --------- Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
parent
83c459d8ab
commit
77ad91ced4
|
@ -3,8 +3,9 @@
|
|||
|
||||
use std::{
|
||||
cmp::Reverse,
|
||||
collections::HashMap,
|
||||
iter::Extend,
|
||||
net::SocketAddr,
|
||||
net::{IpAddr, SocketAddr},
|
||||
sync::{Arc, Mutex},
|
||||
time::Instant,
|
||||
};
|
||||
|
@ -72,6 +73,14 @@ pub struct AddressBook {
|
|||
/// [`OrderedMap`] sorts in descending order.
|
||||
by_addr: OrderedMap<PeerSocketAddr, MetaAddr, Reverse<MetaAddr>>,
|
||||
|
||||
/// The address with a last_connection_state of [`PeerAddrState::Responded`] and
|
||||
/// the most recent `last_response` time by IP.
|
||||
///
|
||||
/// This is used to avoid initiating outbound connections past [`Config::max_connections_per_ip`](crate::config::Config), and
|
||||
/// currently only supports a `max_connections_per_ip` of 1, and must be `None` when used with a greater `max_connections_per_ip`.
|
||||
// TODO: Replace with `by_ip: HashMap<IpAddr, BTreeMap<DateTime32, MetaAddr>>` to support configured `max_connections_per_ip` greater than 1
|
||||
most_recent_by_ip: Option<HashMap<IpAddr, MetaAddr>>,
|
||||
|
||||
/// The local listener address.
|
||||
local_listener: SocketAddr,
|
||||
|
||||
|
@ -130,7 +139,12 @@ impl AddressBook {
|
|||
/// Construct an [`AddressBook`] with the given `local_listener` on `network`.
|
||||
///
|
||||
/// Uses the supplied [`tracing::Span`] for address book operations.
|
||||
pub fn new(local_listener: SocketAddr, network: Network, span: Span) -> AddressBook {
|
||||
pub fn new(
|
||||
local_listener: SocketAddr,
|
||||
network: Network,
|
||||
max_connections_per_ip: usize,
|
||||
span: Span,
|
||||
) -> AddressBook {
|
||||
let constructor_span = span.clone();
|
||||
let _guard = constructor_span.enter();
|
||||
|
||||
|
@ -141,6 +155,8 @@ impl AddressBook {
|
|||
// and it gets replaced by `update_metrics` anyway.
|
||||
let (address_metrics_tx, _address_metrics_rx) = watch::channel(AddressMetrics::default());
|
||||
|
||||
// Avoid initiating outbound handshakes when max_connections_per_ip is 1.
|
||||
let should_limit_outbound_conns_per_ip = max_connections_per_ip == 1;
|
||||
let mut new_book = AddressBook {
|
||||
by_addr: OrderedMap::new(|meta_addr| Reverse(*meta_addr)),
|
||||
local_listener: canonical_socket_addr(local_listener),
|
||||
|
@ -149,6 +165,7 @@ impl AddressBook {
|
|||
span,
|
||||
address_metrics_tx,
|
||||
last_address_log: None,
|
||||
most_recent_by_ip: should_limit_outbound_conns_per_ip.then(HashMap::new),
|
||||
};
|
||||
|
||||
new_book.update_metrics(instant_now, chrono_now);
|
||||
|
@ -170,6 +187,7 @@ impl AddressBook {
|
|||
pub fn new_with_addrs(
|
||||
local_listener: SocketAddr,
|
||||
network: Network,
|
||||
max_connections_per_ip: usize,
|
||||
addr_limit: usize,
|
||||
span: Span,
|
||||
addrs: impl IntoIterator<Item = MetaAddr>,
|
||||
|
@ -183,7 +201,7 @@ impl AddressBook {
|
|||
// The maximum number of addresses should be always greater than 0
|
||||
assert!(addr_limit > 0);
|
||||
|
||||
let mut new_book = AddressBook::new(local_listener, network, span);
|
||||
let mut new_book = AddressBook::new(local_listener, network, max_connections_per_ip, span);
|
||||
new_book.addr_limit = addr_limit;
|
||||
|
||||
let addrs = addrs
|
||||
|
@ -198,6 +216,14 @@ impl AddressBook {
|
|||
for (socket_addr, meta_addr) in addrs {
|
||||
// overwrite any duplicate addresses
|
||||
new_book.by_addr.insert(socket_addr, meta_addr);
|
||||
// Add the address to `most_recent_by_ip` if it has responded
|
||||
if new_book.should_update_most_recent_by_ip(meta_addr) {
|
||||
new_book
|
||||
.most_recent_by_ip
|
||||
.as_mut()
|
||||
.expect("should be some when should_update_most_recent_by_ip is true")
|
||||
.insert(socket_addr.ip(), meta_addr);
|
||||
}
|
||||
// exit as soon as we get enough addresses
|
||||
if new_book.by_addr.len() >= addr_limit {
|
||||
break;
|
||||
|
@ -314,6 +340,45 @@ impl AddressBook {
|
|||
meta_addr
|
||||
}
|
||||
|
||||
/// Returns true if `updated` needs to be applied to the recent outbound peer connection IP cache.
|
||||
///
|
||||
/// Checks if there are no existing entries in the address book with this IP,
|
||||
/// or if `updated` has a more recent `last_response` requiring the outbound connector to wait
|
||||
/// longer before initiating handshakes with peers at this IP.
|
||||
///
|
||||
/// This code only needs to check a single cache entry, rather than the entire address book,
|
||||
/// because other code maintains these invariants:
|
||||
/// - `last_response` times for an entry can only increase.
|
||||
/// - this is the only field checked by `has_connection_recently_responded()`
|
||||
///
|
||||
/// See [`AddressBook::is_ready_for_connection_attempt_with_ip`] for more details.
|
||||
fn should_update_most_recent_by_ip(&self, updated: MetaAddr) -> bool {
|
||||
let Some(most_recent_by_ip) = self.most_recent_by_ip.as_ref() else {
|
||||
return false
|
||||
};
|
||||
|
||||
if let Some(previous) = most_recent_by_ip.get(&updated.addr.ip()) {
|
||||
updated.last_connection_state == PeerAddrState::Responded
|
||||
&& updated.last_response() > previous.last_response()
|
||||
} else {
|
||||
updated.last_connection_state == PeerAddrState::Responded
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if `addr` is the latest entry for its IP, which is stored in `most_recent_by_ip`.
|
||||
/// The entry is checked for an exact match to the IP and port of `addr`.
|
||||
fn should_remove_most_recent_by_ip(&self, addr: PeerSocketAddr) -> bool {
|
||||
let Some(most_recent_by_ip) = self.most_recent_by_ip.as_ref() else {
|
||||
return false
|
||||
};
|
||||
|
||||
if let Some(previous) = most_recent_by_ip.get(&addr.ip()) {
|
||||
previous.addr == addr
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply `change` to the address book, returning the updated `MetaAddr`,
|
||||
/// if the change was valid.
|
||||
///
|
||||
|
@ -373,6 +438,15 @@ impl AddressBook {
|
|||
|
||||
self.by_addr.insert(updated.addr, updated);
|
||||
|
||||
// Add the address to `most_recent_by_ip` if it sent the most recent
|
||||
// response Zebra has received from this IP.
|
||||
if self.should_update_most_recent_by_ip(updated) {
|
||||
self.most_recent_by_ip
|
||||
.as_mut()
|
||||
.expect("should be some when should_update_most_recent_by_ip is true")
|
||||
.insert(updated.addr.ip(), updated);
|
||||
}
|
||||
|
||||
debug!(
|
||||
?change,
|
||||
?updated,
|
||||
|
@ -397,6 +471,15 @@ impl AddressBook {
|
|||
|
||||
self.by_addr.remove(&surplus_peer.addr);
|
||||
|
||||
// Check if this surplus peer's addr matches that in `most_recent_by_ip`
|
||||
// for this the surplus peer's ip to remove it there as well.
|
||||
if self.should_remove_most_recent_by_ip(surplus_peer.addr) {
|
||||
self.most_recent_by_ip
|
||||
.as_mut()
|
||||
.expect("should be some when should_remove_most_recent_by_ip is true")
|
||||
.remove(&surplus_peer.addr.ip());
|
||||
}
|
||||
|
||||
debug!(
|
||||
surplus = ?surplus_peer,
|
||||
?updated,
|
||||
|
@ -435,6 +518,14 @@ impl AddressBook {
|
|||
);
|
||||
|
||||
if let Some(entry) = self.by_addr.remove(&removed_addr) {
|
||||
// Check if this surplus peer's addr matches that in `most_recent_by_ip`
|
||||
// for this the surplus peer's ip to remove it there as well.
|
||||
if self.should_remove_most_recent_by_ip(entry.addr) {
|
||||
if let Some(most_recent_by_ip) = self.most_recent_by_ip.as_mut() {
|
||||
most_recent_by_ip.remove(&entry.addr.ip());
|
||||
}
|
||||
}
|
||||
|
||||
std::mem::drop(_guard);
|
||||
self.update_metrics(instant_now, chrono_now);
|
||||
Some(entry)
|
||||
|
@ -463,6 +554,26 @@ impl AddressBook {
|
|||
self.by_addr.descending_values().cloned()
|
||||
}
|
||||
|
||||
/// Is this IP ready for a new outbound connection attempt?
|
||||
/// Checks if the outbound connection with the most recent response at this IP has recently responded.
|
||||
///
|
||||
/// Note: last_response times may remain live for a long time if the local clock is changed to an earlier time.
|
||||
fn is_ready_for_connection_attempt_with_ip(
|
||||
&self,
|
||||
ip: &IpAddr,
|
||||
chrono_now: chrono::DateTime<Utc>,
|
||||
) -> bool {
|
||||
let Some(most_recent_by_ip) = self.most_recent_by_ip.as_ref() else {
|
||||
// if we're not checking IPs, any connection is allowed
|
||||
return true;
|
||||
};
|
||||
let Some(same_ip_peer) = most_recent_by_ip.get(ip) else {
|
||||
// If there's no entry for this IP, any connection is allowed
|
||||
return true;
|
||||
};
|
||||
!same_ip_peer.has_connection_recently_responded(chrono_now)
|
||||
}
|
||||
|
||||
/// Return an iterator over peers that are due for a reconnection attempt,
|
||||
/// in reconnection attempt order.
|
||||
pub fn reconnection_peers(
|
||||
|
@ -478,6 +589,7 @@ impl AddressBook {
|
|||
.descending_values()
|
||||
.filter(move |peer| {
|
||||
peer.is_ready_for_connection_attempt(instant_now, chrono_now, self.network)
|
||||
&& self.is_ready_for_connection_attempt_with_ip(&peer.addr.ip(), chrono_now)
|
||||
})
|
||||
.cloned()
|
||||
}
|
||||
|
@ -699,6 +811,7 @@ impl Clone for AddressBook {
|
|||
span: self.span.clone(),
|
||||
address_metrics_tx,
|
||||
last_address_log: None,
|
||||
most_recent_by_ip: self.most_recent_by_ip.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,7 +9,7 @@ use tracing::Span;
|
|||
use zebra_chain::{parameters::Network::*, serialization::Duration32};
|
||||
|
||||
use crate::{
|
||||
constants::{MAX_ADDRS_IN_ADDRESS_BOOK, MAX_PEER_ACTIVE_FOR_GOSSIP},
|
||||
constants::{DEFAULT_MAX_CONNS_PER_IP, MAX_ADDRS_IN_ADDRESS_BOOK, MAX_PEER_ACTIVE_FOR_GOSSIP},
|
||||
meta_addr::{arbitrary::MAX_META_ADDR, MetaAddr, MetaAddrChange},
|
||||
AddressBook,
|
||||
};
|
||||
|
@ -30,6 +30,7 @@ proptest! {
|
|||
let address_book = AddressBook::new_with_addrs(
|
||||
local_listener,
|
||||
Mainnet,
|
||||
DEFAULT_MAX_CONNS_PER_IP,
|
||||
MAX_ADDRS_IN_ADDRESS_BOOK,
|
||||
Span::none(),
|
||||
addresses
|
||||
|
@ -59,6 +60,7 @@ proptest! {
|
|||
let address_book = AddressBook::new_with_addrs(
|
||||
local_listener,
|
||||
Mainnet,
|
||||
DEFAULT_MAX_CONNS_PER_IP,
|
||||
MAX_ADDRS_IN_ADDRESS_BOOK,
|
||||
Span::none(),
|
||||
addresses
|
||||
|
@ -97,6 +99,7 @@ proptest! {
|
|||
let mut address_book = AddressBook::new_with_addrs(
|
||||
local_listener,
|
||||
Mainnet,
|
||||
DEFAULT_MAX_CONNS_PER_IP,
|
||||
addr_limit,
|
||||
Span::none(),
|
||||
initial_addrs.clone(),
|
||||
|
@ -119,6 +122,7 @@ proptest! {
|
|||
let mut address_book = AddressBook::new_with_addrs(
|
||||
local_listener,
|
||||
Mainnet,
|
||||
DEFAULT_MAX_CONNS_PER_IP,
|
||||
addr_limit,
|
||||
Span::none(),
|
||||
initial_addrs,
|
||||
|
|
|
@ -11,14 +11,21 @@ use zebra_chain::{
|
|||
};
|
||||
|
||||
use crate::{
|
||||
constants::MAX_ADDRS_IN_ADDRESS_BOOK, meta_addr::MetaAddr,
|
||||
protocol::external::types::PeerServices, AddressBook,
|
||||
constants::{DEFAULT_MAX_CONNS_PER_IP, MAX_ADDRS_IN_ADDRESS_BOOK},
|
||||
meta_addr::MetaAddr,
|
||||
protocol::external::types::PeerServices,
|
||||
AddressBook,
|
||||
};
|
||||
|
||||
/// Make sure an empty address book is actually empty.
|
||||
#[test]
|
||||
fn address_book_empty() {
|
||||
let address_book = AddressBook::new("0.0.0.0:0".parse().unwrap(), Mainnet, Span::current());
|
||||
let address_book = AddressBook::new(
|
||||
"0.0.0.0:0".parse().unwrap(),
|
||||
Mainnet,
|
||||
DEFAULT_MAX_CONNS_PER_IP,
|
||||
Span::current(),
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
address_book
|
||||
|
@ -48,6 +55,7 @@ fn address_book_peer_order() {
|
|||
let address_book = AddressBook::new_with_addrs(
|
||||
"0.0.0.0:0".parse().unwrap(),
|
||||
Mainnet,
|
||||
DEFAULT_MAX_CONNS_PER_IP,
|
||||
MAX_ADDRS_IN_ADDRESS_BOOK,
|
||||
Span::current(),
|
||||
addrs,
|
||||
|
@ -64,6 +72,7 @@ fn address_book_peer_order() {
|
|||
let address_book = AddressBook::new_with_addrs(
|
||||
"0.0.0.0:0".parse().unwrap(),
|
||||
Mainnet,
|
||||
DEFAULT_MAX_CONNS_PER_IP,
|
||||
MAX_ADDRS_IN_ADDRESS_BOOK,
|
||||
Span::current(),
|
||||
addrs,
|
||||
|
@ -83,6 +92,7 @@ fn address_book_peer_order() {
|
|||
let address_book = AddressBook::new_with_addrs(
|
||||
"0.0.0.0:0".parse().unwrap(),
|
||||
Mainnet,
|
||||
DEFAULT_MAX_CONNS_PER_IP,
|
||||
MAX_ADDRS_IN_ADDRESS_BOOK,
|
||||
Span::current(),
|
||||
addrs,
|
||||
|
@ -99,6 +109,7 @@ fn address_book_peer_order() {
|
|||
let address_book = AddressBook::new_with_addrs(
|
||||
"0.0.0.0:0".parse().unwrap(),
|
||||
Mainnet,
|
||||
DEFAULT_MAX_CONNS_PER_IP,
|
||||
MAX_ADDRS_IN_ADDRESS_BOOK,
|
||||
Span::current(),
|
||||
addrs,
|
||||
|
@ -110,3 +121,64 @@ fn address_book_peer_order() {
|
|||
Some(meta_addr2),
|
||||
);
|
||||
}
|
||||
|
||||
/// Check that `reconnection_peers` skips addresses with IPs for which
|
||||
/// Zebra already has recently updated outbound peers.
|
||||
#[test]
|
||||
fn reconnection_peers_skips_recently_updated_ip() {
|
||||
// tests that reconnection_peers() skips addresses where there's a connection at that IP with a recent:
|
||||
// - `last_response`
|
||||
test_reconnection_peers_skips_recently_updated_ip(true, |addr| {
|
||||
MetaAddr::new_responded(addr, &PeerServices::NODE_NETWORK)
|
||||
});
|
||||
|
||||
// tests that reconnection_peers() *does not* skip addresses where there's a connection at that IP with a recent:
|
||||
// - `last_attempt`
|
||||
test_reconnection_peers_skips_recently_updated_ip(false, MetaAddr::new_reconnect);
|
||||
// - `last_failure`
|
||||
test_reconnection_peers_skips_recently_updated_ip(false, |addr| {
|
||||
MetaAddr::new_errored(addr, PeerServices::NODE_NETWORK)
|
||||
});
|
||||
}
|
||||
|
||||
fn test_reconnection_peers_skips_recently_updated_ip<
|
||||
M: Fn(crate::PeerSocketAddr) -> crate::meta_addr::MetaAddrChange,
|
||||
>(
|
||||
should_skip_ip: bool,
|
||||
make_meta_addr_change: M,
|
||||
) {
|
||||
let addr1 = "127.0.0.1:1".parse().unwrap();
|
||||
let addr2 = "127.0.0.1:2".parse().unwrap();
|
||||
|
||||
let meta_addr1 = make_meta_addr_change(addr1).into_new_meta_addr(
|
||||
Instant::now(),
|
||||
Utc::now().try_into().expect("will succeed until 2038"),
|
||||
);
|
||||
let meta_addr2 = MetaAddr::new_gossiped_meta_addr(
|
||||
addr2,
|
||||
PeerServices::NODE_NETWORK,
|
||||
DateTime32::MIN.saturating_add(Duration32::from_seconds(1)),
|
||||
);
|
||||
|
||||
// The second address should be skipped because the first address has a
|
||||
// recent `last_response` time and the two addresses have the same IP.
|
||||
let addrs = vec![meta_addr1, meta_addr2];
|
||||
let address_book = AddressBook::new_with_addrs(
|
||||
"0.0.0.0:0".parse().unwrap(),
|
||||
Mainnet,
|
||||
DEFAULT_MAX_CONNS_PER_IP,
|
||||
MAX_ADDRS_IN_ADDRESS_BOOK,
|
||||
Span::current(),
|
||||
addrs,
|
||||
);
|
||||
|
||||
let next_reconnection_peer = address_book
|
||||
.reconnection_peers(Instant::now(), Utc::now())
|
||||
.next();
|
||||
|
||||
if should_skip_ip {
|
||||
assert_eq!(next_reconnection_peer, None,);
|
||||
} else {
|
||||
assert_ne!(next_reconnection_peer, None,);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,6 +51,7 @@ impl AddressBookUpdater {
|
|||
let address_book = AddressBook::new(
|
||||
local_listener,
|
||||
config.network,
|
||||
config.max_connections_per_ip,
|
||||
span!(Level::TRACE, "address book"),
|
||||
);
|
||||
let address_metrics = address_book.address_metrics_watcher();
|
||||
|
|
|
@ -19,8 +19,9 @@ use zebra_chain::parameters::Network;
|
|||
|
||||
use crate::{
|
||||
constants::{
|
||||
DEFAULT_CRAWL_NEW_PEER_INTERVAL, DEFAULT_MAX_CONNS_PER_IP, DNS_LOOKUP_TIMEOUT,
|
||||
INBOUND_PEER_LIMIT_MULTIPLIER, MAX_PEER_DISK_CACHE_SIZE, OUTBOUND_PEER_LIMIT_MULTIPLIER,
|
||||
DEFAULT_CRAWL_NEW_PEER_INTERVAL, DEFAULT_MAX_CONNS_PER_IP,
|
||||
DEFAULT_PEERSET_INITIAL_TARGET_SIZE, DNS_LOOKUP_TIMEOUT, INBOUND_PEER_LIMIT_MULTIPLIER,
|
||||
MAX_PEER_DISK_CACHE_SIZE, OUTBOUND_PEER_LIMIT_MULTIPLIER,
|
||||
},
|
||||
protocol::external::{canonical_peer_addr, canonical_socket_addr},
|
||||
BoxError, PeerSocketAddr,
|
||||
|
@ -158,6 +159,20 @@ pub struct Config {
|
|||
/// before it drops any additional peer connections with that IP.
|
||||
///
|
||||
/// The default and minimum value are 1.
|
||||
///
|
||||
/// # Security
|
||||
///
|
||||
/// Increasing this config above 1 reduces Zebra's network security.
|
||||
///
|
||||
/// If this config is greater than 1, Zebra can initiate multiple outbound handshakes to the same
|
||||
/// IP address.
|
||||
///
|
||||
/// This config does not currently limit the number of inbound connections that Zebra will accept
|
||||
/// from the same IP address.
|
||||
///
|
||||
/// If Zebra makes multiple inbound or outbound connections to the same IP, they will be dropped
|
||||
/// after the handshake, but before adding them to the peer set. The total numbers of inbound and
|
||||
/// outbound connections are also limited to a multiple of `peerset_initial_target_size`.
|
||||
pub max_connections_per_ip: usize,
|
||||
}
|
||||
|
||||
|
@ -593,7 +608,7 @@ impl Default for Config {
|
|||
//
|
||||
// But Zebra should only make a small number of initial outbound connections,
|
||||
// so that idle peers don't use too many connection slots.
|
||||
peerset_initial_target_size: 25,
|
||||
peerset_initial_target_size: DEFAULT_PEERSET_INITIAL_TARGET_SIZE,
|
||||
max_connections_per_ip: DEFAULT_MAX_CONNS_PER_IP,
|
||||
}
|
||||
}
|
||||
|
@ -629,7 +644,7 @@ impl<'de> Deserialize<'de> for Config {
|
|||
cache_dir: config.cache_dir,
|
||||
peerset_initial_target_size: config.peerset_initial_target_size,
|
||||
crawl_new_peer_interval: config.crawl_new_peer_interval,
|
||||
max_connections_per_ip: Some(DEFAULT_MAX_CONNS_PER_IP),
|
||||
max_connections_per_ip: Some(config.max_connections_per_ip),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -655,6 +670,23 @@ impl<'de> Deserialize<'de> for Config {
|
|||
},
|
||||
}?;
|
||||
|
||||
let [max_connections_per_ip, peerset_initial_target_size] = [
|
||||
("max_connections_per_ip", max_connections_per_ip, DEFAULT_MAX_CONNS_PER_IP),
|
||||
// If we want Zebra to operate with no network,
|
||||
// we should implement a `zebrad` command that doesn't use `zebra-network`.
|
||||
("peerset_initial_target_size", Some(peerset_initial_target_size), DEFAULT_PEERSET_INITIAL_TARGET_SIZE)
|
||||
].map(|(field_name, non_zero_config_field, default_config_value)| {
|
||||
if non_zero_config_field == Some(0) {
|
||||
warn!(
|
||||
?field_name,
|
||||
?non_zero_config_field,
|
||||
"{field_name} should be greater than 0, using default value of {default_config_value} instead"
|
||||
);
|
||||
}
|
||||
|
||||
non_zero_config_field.filter(|config_value| config_value > &0).unwrap_or(default_config_value)
|
||||
});
|
||||
|
||||
Ok(Config {
|
||||
listen_addr: canonical_socket_addr(listen_addr),
|
||||
network,
|
||||
|
@ -663,7 +695,7 @@ impl<'de> Deserialize<'de> for Config {
|
|||
cache_dir,
|
||||
peerset_initial_target_size,
|
||||
crawl_new_peer_interval,
|
||||
max_connections_per_ip: max_connections_per_ip.unwrap_or(DEFAULT_MAX_CONNS_PER_IP),
|
||||
max_connections_per_ip,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -70,9 +70,20 @@ pub const OUTBOUND_PEER_LIMIT_MULTIPLIER: usize = 3;
|
|||
/// The default maximum number of peer connections Zebra will keep for a given IP address
|
||||
/// before it drops any additional peer connections with that IP.
|
||||
///
|
||||
/// This will be used as Config.max_connections_per_ip if no value is provided.
|
||||
/// This will be used as `Config.max_connections_per_ip` if no valid value is provided.
|
||||
///
|
||||
/// Note: Zebra will currently avoid initiating outbound connections where it
|
||||
/// has recently had a successful handshake with any address
|
||||
/// on that IP. Zebra will not initiate more than 1 outbound connection
|
||||
/// to an IP based on the default configuration, but it will accept more inbound
|
||||
/// connections to an IP.
|
||||
pub const DEFAULT_MAX_CONNS_PER_IP: usize = 1;
|
||||
|
||||
/// The default peerset target size.
|
||||
///
|
||||
/// This will be used as `Config.peerset_initial_target_size` if no valid value is provided.
|
||||
pub const DEFAULT_PEERSET_INITIAL_TARGET_SIZE: usize = 25;
|
||||
|
||||
/// The buffer size for the peer set.
|
||||
///
|
||||
/// This should be greater than 1 to avoid sender contention, but also reasonably
|
||||
|
|
|
@ -561,6 +561,17 @@ impl MetaAddr {
|
|||
}
|
||||
}
|
||||
|
||||
/// Returns true if any messages were recently sent to or received from this address.
|
||||
pub fn was_recently_updated(
|
||||
&self,
|
||||
instant_now: Instant,
|
||||
chrono_now: chrono::DateTime<Utc>,
|
||||
) -> bool {
|
||||
self.has_connection_recently_responded(chrono_now)
|
||||
|| self.was_connection_recently_attempted(instant_now)
|
||||
|| self.has_connection_recently_failed(instant_now)
|
||||
}
|
||||
|
||||
/// Is this address ready for a new outbound connection attempt?
|
||||
pub fn is_ready_for_connection_attempt(
|
||||
&self,
|
||||
|
@ -569,9 +580,7 @@ impl MetaAddr {
|
|||
network: Network,
|
||||
) -> bool {
|
||||
self.last_known_info_is_valid_for_outbound(network)
|
||||
&& !self.has_connection_recently_responded(chrono_now)
|
||||
&& !self.was_connection_recently_attempted(instant_now)
|
||||
&& !self.has_connection_recently_failed(instant_now)
|
||||
&& !self.was_recently_updated(instant_now, chrono_now)
|
||||
&& self.is_probably_reachable(chrono_now)
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
//! Randomised test data generation for MetaAddr.
|
||||
|
||||
use std::time::Instant;
|
||||
use std::{net::IpAddr, time::Instant};
|
||||
|
||||
use proptest::{arbitrary::any, collection::vec, prelude::*};
|
||||
|
||||
|
@ -99,7 +99,17 @@ impl MetaAddrChange {
|
|||
.boxed()
|
||||
}
|
||||
|
||||
/// Create a strategy that generates [`MetaAddrChange`]s which are ready for
|
||||
/// Create a strategy that generates [`IpAddr`]s for [`MetaAddrChange`]s which are ready for
|
||||
/// outbound connections.
|
||||
pub fn ready_outbound_strategy_ip() -> BoxedStrategy<IpAddr> {
|
||||
any::<IpAddr>()
|
||||
.prop_filter("failed MetaAddr::is_valid_for_outbound", |ip| {
|
||||
!ip.is_unspecified()
|
||||
})
|
||||
.boxed()
|
||||
}
|
||||
|
||||
/// Create a strategy that generates port numbers for [`MetaAddrChange`]s which are ready for
|
||||
/// outbound connections.
|
||||
///
|
||||
/// Currently, all generated changes are the [`NewAlternate`][1] variant.
|
||||
|
@ -107,7 +117,7 @@ impl MetaAddrChange {
|
|||
/// fields. (After PR #2276 merges.)
|
||||
///
|
||||
/// [1]: super::NewAlternate
|
||||
pub fn ready_outbound_strategy() -> BoxedStrategy<Self> {
|
||||
pub fn ready_outbound_strategy_port() -> BoxedStrategy<u16> {
|
||||
(
|
||||
canonical_peer_addr_strategy(),
|
||||
any::<Instant>(),
|
||||
|
@ -125,7 +135,7 @@ impl MetaAddrChange {
|
|||
.into_new_meta_addr(instant_now, local_now)
|
||||
.last_known_info_is_valid_for_outbound(Mainnet)
|
||||
{
|
||||
Some(change)
|
||||
Some(addr.port())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
|
|
|
@ -10,7 +10,10 @@ use tracing::Span;
|
|||
use zebra_chain::{parameters::Network::*, serialization::DateTime32};
|
||||
|
||||
use crate::{
|
||||
constants::{MAX_ADDRS_IN_ADDRESS_BOOK, MAX_RECENT_PEER_AGE, MIN_PEER_RECONNECTION_DELAY},
|
||||
constants::{
|
||||
DEFAULT_MAX_CONNS_PER_IP, MAX_ADDRS_IN_ADDRESS_BOOK, MAX_RECENT_PEER_AGE,
|
||||
MIN_PEER_RECONNECTION_DELAY,
|
||||
},
|
||||
meta_addr::{
|
||||
arbitrary::{MAX_ADDR_CHANGE, MAX_META_ADDR},
|
||||
MetaAddr, MetaAddrChange,
|
||||
|
@ -156,6 +159,7 @@ proptest! {
|
|||
let address_book = AddressBook::new_with_addrs(
|
||||
local_listener,
|
||||
Mainnet,
|
||||
DEFAULT_MAX_CONNS_PER_IP,
|
||||
MAX_ADDRS_IN_ADDRESS_BOOK,
|
||||
Span::none(),
|
||||
address_book_addrs
|
||||
|
@ -214,6 +218,7 @@ proptest! {
|
|||
let mut address_book = AddressBook::new_with_addrs(
|
||||
local_listener,
|
||||
Mainnet,
|
||||
DEFAULT_MAX_CONNS_PER_IP,
|
||||
1,
|
||||
Span::none(),
|
||||
Vec::new(),
|
||||
|
@ -327,6 +332,7 @@ proptest! {
|
|||
let address_book = Arc::new(std::sync::Mutex::new(AddressBook::new_with_addrs(
|
||||
SocketAddr::from_str("0.0.0.0:0").unwrap(),
|
||||
Mainnet,
|
||||
DEFAULT_MAX_CONNS_PER_IP,
|
||||
MAX_ADDRS_IN_ADDRESS_BOOK,
|
||||
Span::none(),
|
||||
addrs,
|
||||
|
|
|
@ -8,15 +8,20 @@ use std::{
|
|||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use proptest::{collection::vec, prelude::*};
|
||||
use proptest::{
|
||||
collection::{hash_map, vec},
|
||||
prelude::*,
|
||||
};
|
||||
use tokio::time::{sleep, timeout};
|
||||
use tracing::Span;
|
||||
|
||||
use zebra_chain::{parameters::Network::*, serialization::DateTime32};
|
||||
|
||||
use crate::{
|
||||
constants::MIN_OUTBOUND_PEER_CONNECTION_INTERVAL,
|
||||
canonical_peer_addr,
|
||||
constants::{DEFAULT_MAX_CONNS_PER_IP, MIN_OUTBOUND_PEER_CONNECTION_INTERVAL},
|
||||
meta_addr::{MetaAddr, MetaAddrChange},
|
||||
protocol::types::PeerServices,
|
||||
AddressBook, BoxError, Request, Response,
|
||||
};
|
||||
|
||||
|
@ -67,7 +72,7 @@ proptest! {
|
|||
});
|
||||
|
||||
// Since the address book is empty, there won't be any available peers
|
||||
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Mainnet, Span::none());
|
||||
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Mainnet, DEFAULT_MAX_CONNS_PER_IP, Span::none());
|
||||
|
||||
let mut candidate_set = CandidateSet::new(Arc::new(std::sync::Mutex::new(address_book)), peer_service);
|
||||
|
||||
|
@ -94,18 +99,22 @@ proptest! {
|
|||
/// Test that new outbound peer connections are rate-limited.
|
||||
#[test]
|
||||
fn new_outbound_peer_connections_are_rate_limited(
|
||||
peers in vec(MetaAddrChange::ready_outbound_strategy(), TEST_ADDRESSES),
|
||||
peers in hash_map(MetaAddrChange::ready_outbound_strategy_ip(), MetaAddrChange::ready_outbound_strategy_port(), TEST_ADDRESSES),
|
||||
initial_candidates in 0..MAX_TEST_CANDIDATES,
|
||||
extra_candidates in 0..MAX_TEST_CANDIDATES,
|
||||
) {
|
||||
let (runtime, _init_guard) = zebra_test::init_async();
|
||||
let _guard = runtime.enter();
|
||||
|
||||
let peers = peers.into_iter().map(|(ip, port)| {
|
||||
MetaAddr::new_alternate(canonical_peer_addr(SocketAddr::new(ip, port)), &PeerServices::NODE_NETWORK)
|
||||
}).collect::<Vec<_>>();
|
||||
|
||||
let peer_service = tower::service_fn(|_| async {
|
||||
unreachable!("Mock peer service is never used");
|
||||
});
|
||||
|
||||
let mut address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Mainnet, Span::none());
|
||||
let mut address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Mainnet, DEFAULT_MAX_CONNS_PER_IP, Span::none());
|
||||
address_book.extend(peers);
|
||||
|
||||
let mut candidate_set = CandidateSet::new(Arc::new(std::sync::Mutex::new(address_book)), peer_service);
|
||||
|
|
|
@ -15,7 +15,7 @@ use zebra_chain::{parameters::Network::*, serialization::DateTime32};
|
|||
use zebra_test::mock_service::{MockService, PanicAssertion};
|
||||
|
||||
use crate::{
|
||||
constants::{GET_ADDR_FANOUT, MIN_PEER_GET_ADDR_INTERVAL},
|
||||
constants::{DEFAULT_MAX_CONNS_PER_IP, GET_ADDR_FANOUT, MIN_PEER_GET_ADDR_INTERVAL},
|
||||
types::{MetaAddr, PeerServices},
|
||||
AddressBook, Request, Response,
|
||||
};
|
||||
|
@ -141,6 +141,7 @@ fn candidate_set_updates_are_rate_limited() {
|
|||
let address_book = AddressBook::new(
|
||||
SocketAddr::from_str("0.0.0.0:0").unwrap(),
|
||||
Mainnet,
|
||||
DEFAULT_MAX_CONNS_PER_IP,
|
||||
Span::none(),
|
||||
);
|
||||
let mut peer_service = MockService::build().for_unit_tests();
|
||||
|
@ -186,6 +187,7 @@ fn candidate_set_update_after_update_initial_is_rate_limited() {
|
|||
let address_book = AddressBook::new(
|
||||
SocketAddr::from_str("0.0.0.0:0").unwrap(),
|
||||
Mainnet,
|
||||
DEFAULT_MAX_CONNS_PER_IP,
|
||||
Span::none(),
|
||||
);
|
||||
let mut peer_service = MockService::build().for_unit_tests();
|
||||
|
|
|
@ -107,13 +107,6 @@ where
|
|||
S::Future: Send + 'static,
|
||||
C: ChainTip + Clone + Send + Sync + 'static,
|
||||
{
|
||||
// If we want Zebra to operate with no network,
|
||||
// we should implement a `zebrad` command that doesn't use `zebra-network`.
|
||||
assert!(
|
||||
config.peerset_initial_target_size > 0,
|
||||
"Zebra must be allowed to connect to at least one peer"
|
||||
);
|
||||
|
||||
let (tcp_listener, listen_addr) = open_listener(&config.clone()).await;
|
||||
|
||||
let (address_book, address_book_updater, address_metrics, address_book_updater_guard) =
|
||||
|
|
|
@ -1509,7 +1509,12 @@ where
|
|||
}
|
||||
|
||||
// Manually initialize an address book without a timestamp tracker.
|
||||
let mut address_book = AddressBook::new(config.listen_addr, config.network, Span::current());
|
||||
let mut address_book = AddressBook::new(
|
||||
config.listen_addr,
|
||||
config.network,
|
||||
config.max_connections_per_ip,
|
||||
Span::current(),
|
||||
);
|
||||
|
||||
// Add enough fake peers to go over the limit, even if the limit is zero.
|
||||
let over_limit_peers = config.peerset_outbound_connection_limit() * 2 + 1;
|
||||
|
|
|
@ -23,6 +23,7 @@ use zebra_chain::{
|
|||
|
||||
use crate::{
|
||||
address_book::AddressMetrics,
|
||||
constants::DEFAULT_MAX_CONNS_PER_IP,
|
||||
peer::{ClientTestHarness, LoadTrackedClient, MinimumPeerVersion},
|
||||
peer_set::{set::MorePeers, InventoryChange, PeerSet},
|
||||
protocol::external::types::Version,
|
||||
|
@ -333,7 +334,12 @@ impl PeerSetGuard {
|
|||
let local_listener = "127.0.0.1:1000"
|
||||
.parse()
|
||||
.expect("Invalid local listener address");
|
||||
let address_book = AddressBook::new(local_listener, Network::Mainnet, Span::none());
|
||||
let address_book = AddressBook::new(
|
||||
local_listener,
|
||||
Network::Mainnet,
|
||||
DEFAULT_MAX_CONNS_PER_IP,
|
||||
Span::none(),
|
||||
);
|
||||
|
||||
Arc::new(std::sync::Mutex::new(address_book))
|
||||
}
|
||||
|
|
|
@ -23,7 +23,9 @@ use zebra_chain::{
|
|||
transaction::{UnminedTx, UnminedTxId, VerifiedUnminedTx},
|
||||
};
|
||||
use zebra_consensus::{error::TransactionError, transaction, Config as ConsensusConfig};
|
||||
use zebra_network::{AddressBook, InventoryResponse, Request, Response};
|
||||
use zebra_network::{
|
||||
constants::DEFAULT_MAX_CONNS_PER_IP, AddressBook, InventoryResponse, Request, Response,
|
||||
};
|
||||
use zebra_node_services::mempool;
|
||||
use zebra_state::{ChainTipChange, Config as StateConfig, CHAIN_TIP_UPDATE_WAIT_LIMIT};
|
||||
use zebra_test::mock_service::{MockService, PanicAssertion};
|
||||
|
@ -771,6 +773,7 @@ async fn setup(
|
|||
let address_book = AddressBook::new(
|
||||
SocketAddr::from_str("0.0.0.0:0").unwrap(),
|
||||
Mainnet,
|
||||
DEFAULT_MAX_CONNS_PER_IP,
|
||||
Span::none(),
|
||||
);
|
||||
let address_book = Arc::new(std::sync::Mutex::new(address_book));
|
||||
|
|
Loading…
Reference in New Issue