Reliability: send local listener address to peers
When peers ask for peer addresses, add our local listener address to the set of addresses, sanitize, then truncate. Sanitize shuffles addresses, so if there are lots of addresses in the address book, our address will only be sent to some peers.
This commit is contained in:
parent
d2a8985dbc
commit
92828bbb29
|
@ -11,7 +11,7 @@ use std::{
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use tracing::Span;
|
use tracing::Span;
|
||||||
|
|
||||||
use crate::{constants, types::MetaAddr, PeerAddrState};
|
use crate::{constants, types::MetaAddr, Config, PeerAddrState};
|
||||||
|
|
||||||
/// A database of peer listener addresses, their advertised services, and
|
/// A database of peer listener addresses, their advertised services, and
|
||||||
/// information on when they were last seen.
|
/// information on when they were last seen.
|
||||||
|
@ -48,13 +48,16 @@ use crate::{constants, types::MetaAddr, PeerAddrState};
|
||||||
/// - the canonical address of any connection.
|
/// - the canonical address of any connection.
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct AddressBook {
|
pub struct AddressBook {
|
||||||
/// Each known peer address has a matching `MetaAddr`
|
/// Each known peer address has a matching `MetaAddr`.
|
||||||
by_addr: HashMap<SocketAddr, MetaAddr>,
|
by_addr: HashMap<SocketAddr, MetaAddr>,
|
||||||
|
|
||||||
|
/// The local listener address.
|
||||||
|
local_listener: SocketAddr,
|
||||||
|
|
||||||
/// The span for operations on this address book.
|
/// The span for operations on this address book.
|
||||||
span: Span,
|
span: Span,
|
||||||
|
|
||||||
/// The last time we logged a message about the address metrics
|
/// The last time we logged a message about the address metrics.
|
||||||
last_address_log: Option<Instant>,
|
last_address_log: Option<Instant>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,13 +88,14 @@ pub struct AddressMetrics {
|
||||||
|
|
||||||
#[allow(clippy::len_without_is_empty)]
|
#[allow(clippy::len_without_is_empty)]
|
||||||
impl AddressBook {
|
impl AddressBook {
|
||||||
/// Construct an `AddressBook` with the given [`tracing::Span`].
|
/// Construct an `AddressBook` with the given `config` and [`tracing::Span`].
|
||||||
pub fn new(span: Span) -> AddressBook {
|
pub fn new(config: &Config, span: Span) -> AddressBook {
|
||||||
let constructor_span = span.clone();
|
let constructor_span = span.clone();
|
||||||
let _guard = constructor_span.enter();
|
let _guard = constructor_span.enter();
|
||||||
|
|
||||||
let mut new_book = AddressBook {
|
let mut new_book = AddressBook {
|
||||||
by_addr: HashMap::default(),
|
by_addr: HashMap::default(),
|
||||||
|
local_listener: config.listen_addr,
|
||||||
span,
|
span,
|
||||||
last_address_log: None,
|
last_address_log: None,
|
||||||
};
|
};
|
||||||
|
@ -100,6 +104,11 @@ impl AddressBook {
|
||||||
new_book
|
new_book
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get the local listener address.
|
||||||
|
pub fn get_local_listener(&self) -> MetaAddr {
|
||||||
|
MetaAddr::new_local_listener(&self.local_listener)
|
||||||
|
}
|
||||||
|
|
||||||
/// Get the contents of `self` in random order with sanitized timestamps.
|
/// Get the contents of `self` in random order with sanitized timestamps.
|
||||||
pub fn sanitized(&self) -> Vec<MetaAddr> {
|
pub fn sanitized(&self) -> Vec<MetaAddr> {
|
||||||
use rand::seq::SliceRandom;
|
use rand::seq::SliceRandom;
|
||||||
|
|
|
@ -194,6 +194,17 @@ impl MetaAddr {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create a new `MetaAddr` for our own listener address.
|
||||||
|
pub fn new_local_listener(addr: &SocketAddr) -> MetaAddr {
|
||||||
|
MetaAddr {
|
||||||
|
addr: *addr,
|
||||||
|
// TODO: create a "local services" constant
|
||||||
|
services: PeerServices::NODE_NETWORK,
|
||||||
|
last_seen: Utc::now(),
|
||||||
|
last_connection_state: Responded,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Create a new `MetaAddr` for a peer that has just had an error.
|
/// Create a new `MetaAddr` for a peer that has just had an error.
|
||||||
pub fn new_errored(addr: &SocketAddr, services: &PeerServices) -> MetaAddr {
|
pub fn new_errored(addr: &SocketAddr, services: &PeerServices) -> MetaAddr {
|
||||||
MetaAddr {
|
MetaAddr {
|
||||||
|
@ -251,8 +262,8 @@ impl MetaAddr {
|
||||||
let last_seen = Utc.timestamp(ts - ts.rem_euclid(interval), 0);
|
let last_seen = Utc.timestamp(ts - ts.rem_euclid(interval), 0);
|
||||||
MetaAddr {
|
MetaAddr {
|
||||||
addr: self.addr,
|
addr: self.addr,
|
||||||
// services are sanitized during parsing, so we don't need to make
|
// services are sanitized during parsing, or set to a fixed valued by
|
||||||
// any changes here
|
// new_local_listener, so we don't need to sanitize here
|
||||||
services: self.services,
|
services: self.services,
|
||||||
last_seen,
|
last_seen,
|
||||||
// the state isn't sent to the remote peer, but sanitize it anyway
|
// the state isn't sent to the remote peer, but sanitize it anyway
|
||||||
|
|
|
@ -64,7 +64,7 @@ where
|
||||||
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
|
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
|
||||||
S::Future: Send + 'static,
|
S::Future: Send + 'static,
|
||||||
{
|
{
|
||||||
let (address_book, timestamp_collector) = TimestampCollector::spawn();
|
let (address_book, timestamp_collector) = TimestampCollector::spawn(&config);
|
||||||
let (inv_sender, inv_receiver) = broadcast::channel(100);
|
let (inv_sender, inv_receiver) = broadcast::channel(100);
|
||||||
|
|
||||||
// Construct services that handle inbound handshakes and perform outbound
|
// Construct services that handle inbound handshakes and perform outbound
|
||||||
|
|
|
@ -4,7 +4,7 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use futures::{channel::mpsc, prelude::*};
|
use futures::{channel::mpsc, prelude::*};
|
||||||
|
|
||||||
use crate::{types::MetaAddr, AddressBook};
|
use crate::{types::MetaAddr, AddressBook, Config};
|
||||||
|
|
||||||
/// The timestamp collector hooks into incoming message streams for each peer and
|
/// The timestamp collector hooks into incoming message streams for each peer and
|
||||||
/// records per-connection last-seen timestamps into an [`AddressBook`].
|
/// records per-connection last-seen timestamps into an [`AddressBook`].
|
||||||
|
@ -14,14 +14,14 @@ impl TimestampCollector {
|
||||||
/// Spawn a new [`TimestampCollector`] task, and return handles for the
|
/// Spawn a new [`TimestampCollector`] task, and return handles for the
|
||||||
/// transmission channel for timestamp events and for the [`AddressBook`] it
|
/// transmission channel for timestamp events and for the [`AddressBook`] it
|
||||||
/// updates.
|
/// updates.
|
||||||
pub fn spawn() -> (Arc<std::sync::Mutex<AddressBook>>, mpsc::Sender<MetaAddr>) {
|
pub fn spawn(config: &Config) -> (Arc<std::sync::Mutex<AddressBook>>, mpsc::Sender<MetaAddr>) {
|
||||||
use tracing::Level;
|
use tracing::Level;
|
||||||
const TIMESTAMP_WORKER_BUFFER_SIZE: usize = 100;
|
const TIMESTAMP_WORKER_BUFFER_SIZE: usize = 100;
|
||||||
let (worker_tx, mut worker_rx) = mpsc::channel(TIMESTAMP_WORKER_BUFFER_SIZE);
|
let (worker_tx, mut worker_rx) = mpsc::channel(TIMESTAMP_WORKER_BUFFER_SIZE);
|
||||||
let address_book = Arc::new(std::sync::Mutex::new(AddressBook::new(span!(
|
let address_book = Arc::new(std::sync::Mutex::new(AddressBook::new(
|
||||||
Level::TRACE,
|
config,
|
||||||
"timestamp collector"
|
span!(Level::TRACE, "timestamp collector"),
|
||||||
))));
|
)));
|
||||||
let worker_address_book = address_book.clone();
|
let worker_address_book = address_book.clone();
|
||||||
|
|
||||||
let worker = async move {
|
let worker = async move {
|
||||||
|
|
|
@ -240,7 +240,13 @@ impl Service<zn::Request> for Inbound {
|
||||||
// Briefly hold the address book threaded mutex while
|
// Briefly hold the address book threaded mutex while
|
||||||
// cloning the address book. Then sanitize after releasing
|
// cloning the address book. Then sanitize after releasing
|
||||||
// the lock.
|
// the lock.
|
||||||
let peers = address_book.lock().unwrap().clone();
|
let mut peers = address_book.lock().unwrap().clone();
|
||||||
|
|
||||||
|
// Add our local listener address to the advertised peers
|
||||||
|
let local_listener = address_book.lock().unwrap().get_local_listener();
|
||||||
|
peers.update(local_listener);
|
||||||
|
|
||||||
|
// Send a sanitized response
|
||||||
let mut peers = peers.sanitized();
|
let mut peers = peers.sanitized();
|
||||||
const MAX_ADDR: usize = 1000; // bitcoin protocol constant
|
const MAX_ADDR: usize = 1000; // bitcoin protocol constant
|
||||||
peers.truncate(MAX_ADDR);
|
peers.truncate(MAX_ADDR);
|
||||||
|
|
Loading…
Reference in New Issue