2019-10-17 15:42:19 -07:00
|
|
|
//! The timestamp collector collects liveness information from peers.
|
2019-10-07 15:36:16 -07:00
|
|
|
|
2023-11-05 14:28:58 -08:00
|
|
|
use std::{cmp::max, net::SocketAddr, sync::Arc};
|
2019-10-07 15:36:16 -07:00
|
|
|
|
2021-11-18 04:34:51 -08:00
|
|
|
use thiserror::Error;
|
2021-12-19 16:44:43 -08:00
|
|
|
use tokio::{
|
|
|
|
sync::{mpsc, watch},
|
|
|
|
task::JoinHandle,
|
|
|
|
};
|
2023-04-13 01:42:17 -07:00
|
|
|
use tracing::{Level, Span};
|
2019-10-07 15:36:16 -07:00
|
|
|
|
2021-12-19 16:44:43 -08:00
|
|
|
use crate::{
|
|
|
|
address_book::AddressMetrics, meta_addr::MetaAddrChange, AddressBook, BoxError, Config,
|
|
|
|
};
|
2019-10-07 15:36:16 -07:00
|
|
|
|
2023-11-05 14:28:58 -08:00
|
|
|
/// The minimum size of the address book updater channel.
|
|
|
|
pub const MIN_CHANNEL_SIZE: usize = 10;
|
|
|
|
|
2021-11-04 04:34:00 -07:00
|
|
|
/// The `AddressBookUpdater` hooks into incoming message streams for each peer
|
|
|
|
/// and lets the owner of the sender handle update the address book. For
|
|
|
|
/// example, it can be used to record per-connection last-seen timestamps, or
|
|
|
|
/// add new initial peers to the address book.
|
2021-11-18 04:34:51 -08:00
|
|
|
#[derive(Debug, Eq, PartialEq)]
|
|
|
|
pub struct AddressBookUpdater;
|
|
|
|
|
|
|
|
#[derive(Copy, Clone, Debug, Error, Eq, PartialEq, Hash)]
|
|
|
|
#[error("all address book updater senders are closed")]
|
|
|
|
pub struct AllAddressBookUpdaterSendersClosed;
|
2019-10-07 15:36:16 -07:00
|
|
|
|
2021-11-04 04:34:00 -07:00
|
|
|
impl AddressBookUpdater {
|
|
|
|
/// Spawn a new [`AddressBookUpdater`] task, updating a new [`AddressBook`]
|
2021-11-18 04:34:51 -08:00
|
|
|
/// configured with Zebra's actual `local_listener` address.
|
2021-06-22 14:59:06 -07:00
|
|
|
///
|
2021-11-18 04:34:51 -08:00
|
|
|
/// Returns handles for:
|
2021-12-19 16:44:43 -08:00
|
|
|
/// - the address book,
|
2021-11-18 04:34:51 -08:00
|
|
|
/// - the transmission channel for address book update events,
|
2021-12-19 16:44:43 -08:00
|
|
|
/// - a watch channel for address book metrics, and
|
|
|
|
/// - the address book updater task join handle.
|
|
|
|
///
|
|
|
|
/// The task exits with an error when the returned [`mpsc::Sender`] is closed.
|
2021-06-14 20:31:16 -07:00
|
|
|
pub fn spawn(
|
2021-11-18 04:34:51 -08:00
|
|
|
config: &Config,
|
2021-06-22 14:59:06 -07:00
|
|
|
local_listener: SocketAddr,
|
2021-06-14 20:31:16 -07:00
|
|
|
) -> (
|
|
|
|
Arc<std::sync::Mutex<AddressBook>>,
|
|
|
|
mpsc::Sender<MetaAddrChange>,
|
2021-12-19 16:44:43 -08:00
|
|
|
watch::Receiver<AddressMetrics>,
|
2021-11-18 04:34:51 -08:00
|
|
|
JoinHandle<Result<(), BoxError>>,
|
2021-06-14 20:31:16 -07:00
|
|
|
) {
|
2021-11-18 04:34:51 -08:00
|
|
|
// Create an mpsc channel for peerset address book updates,
|
|
|
|
// based on the maximum number of inbound and outbound peers.
|
2023-11-05 14:28:58 -08:00
|
|
|
let (worker_tx, mut worker_rx) = mpsc::channel(max(
|
|
|
|
config.peerset_total_connection_limit(),
|
|
|
|
MIN_CHANNEL_SIZE,
|
|
|
|
));
|
2021-11-18 04:34:51 -08:00
|
|
|
|
2022-06-13 21:58:37 -07:00
|
|
|
let address_book = AddressBook::new(
|
|
|
|
local_listener,
|
2024-03-19 13:45:27 -07:00
|
|
|
&config.network,
|
2023-07-05 22:54:10 -07:00
|
|
|
config.max_connections_per_ip,
|
2022-06-13 21:58:37 -07:00
|
|
|
span!(Level::TRACE, "address book"),
|
|
|
|
);
|
2021-12-19 16:44:43 -08:00
|
|
|
let address_metrics = address_book.address_metrics_watcher();
|
|
|
|
let address_book = Arc::new(std::sync::Mutex::new(address_book));
|
2019-10-07 15:36:16 -07:00
|
|
|
|
2023-04-13 01:42:17 -07:00
|
|
|
#[cfg(feature = "progress-bar")]
|
|
|
|
let (mut address_info, address_bar, never_bar, failed_bar) = {
|
2023-07-05 00:08:59 -07:00
|
|
|
let address_bar = howudoin::new_root().label("Known Peers");
|
|
|
|
let never_bar =
|
|
|
|
howudoin::new_with_parent(address_bar.id()).label("Never Attempted Peers");
|
|
|
|
let failed_bar = howudoin::new_with_parent(never_bar.id()).label("Failed Peers");
|
2023-04-13 01:42:17 -07:00
|
|
|
|
2023-07-05 00:08:59 -07:00
|
|
|
(address_metrics.clone(), address_bar, never_bar, failed_bar)
|
2023-04-13 01:42:17 -07:00
|
|
|
};
|
|
|
|
|
2021-12-19 16:44:43 -08:00
|
|
|
let worker_address_book = address_book.clone();
|
|
|
|
let worker = move || {
|
2022-06-23 00:46:02 -07:00
|
|
|
info!("starting the address book updater");
|
2022-01-04 15:43:30 -08:00
|
|
|
|
2022-06-23 00:46:02 -07:00
|
|
|
while let Some(event) = worker_rx.blocking_recv() {
|
|
|
|
trace!(?event, "got address book change");
|
2022-01-04 15:43:30 -08:00
|
|
|
|
2022-06-23 00:46:02 -07:00
|
|
|
// # Correctness
|
|
|
|
//
|
|
|
|
// Briefly hold the address book threaded mutex, to update the
|
|
|
|
// state for a single address.
|
|
|
|
worker_address_book
|
|
|
|
.lock()
|
|
|
|
.expect("mutex should be unpoisoned")
|
|
|
|
.update(event);
|
2023-04-13 01:42:17 -07:00
|
|
|
|
|
|
|
#[cfg(feature = "progress-bar")]
|
|
|
|
if matches!(howudoin::cancelled(), Some(true)) {
|
|
|
|
address_bar.close();
|
|
|
|
never_bar.close();
|
|
|
|
failed_bar.close();
|
|
|
|
} else if address_info.has_changed()? {
|
|
|
|
// We don't track:
|
|
|
|
// - attempt pending because it's always small
|
|
|
|
// - responded because it's the remaining attempted-but-not-failed peers
|
|
|
|
// - recently live because it's similar to the connected peer counts
|
|
|
|
|
|
|
|
let address_info = *address_info.borrow_and_update();
|
|
|
|
|
|
|
|
address_bar
|
2023-06-14 12:07:02 -07:00
|
|
|
.set_pos(u64::try_from(address_info.num_addresses).expect("fits in u64"));
|
|
|
|
// .set_len(u64::try_from(address_info.address_limit).expect("fits in u64"));
|
2023-04-13 01:42:17 -07:00
|
|
|
|
2023-11-27 16:30:13 -08:00
|
|
|
never_bar.set_pos(
|
|
|
|
u64::try_from(address_info.never_attempted_gossiped).expect("fits in u64"),
|
|
|
|
);
|
2023-06-14 12:07:02 -07:00
|
|
|
// .set_len(u64::try_from(address_info.address_limit).expect("fits in u64"));
|
2023-04-13 01:42:17 -07:00
|
|
|
|
2023-06-14 12:07:02 -07:00
|
|
|
failed_bar.set_pos(u64::try_from(address_info.failed).expect("fits in u64"));
|
|
|
|
// .set_len(u64::try_from(address_info.address_limit).expect("fits in u64"));
|
2023-04-13 01:42:17 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(feature = "progress-bar")]
|
|
|
|
{
|
|
|
|
address_bar.close();
|
|
|
|
never_bar.close();
|
|
|
|
failed_bar.close();
|
2022-06-23 00:46:02 -07:00
|
|
|
}
|
2021-11-18 04:34:51 -08:00
|
|
|
|
2022-06-23 00:46:02 -07:00
|
|
|
let error = Err(AllAddressBookUpdaterSendersClosed.into());
|
|
|
|
info!(?error, "stopping address book updater");
|
|
|
|
error
|
2019-10-07 15:36:16 -07:00
|
|
|
};
|
|
|
|
|
2021-12-19 16:44:43 -08:00
|
|
|
// Correctness: spawn address book accesses on a blocking thread,
|
|
|
|
// to avoid deadlocks (see #1976)
|
2022-06-23 00:46:02 -07:00
|
|
|
let span = Span::current();
|
|
|
|
let address_book_updater_task_handle =
|
|
|
|
tokio::task::spawn_blocking(move || span.in_scope(worker));
|
2021-11-18 04:34:51 -08:00
|
|
|
|
2021-12-19 16:44:43 -08:00
|
|
|
(
|
|
|
|
address_book,
|
|
|
|
worker_tx,
|
|
|
|
address_metrics,
|
|
|
|
address_book_updater_task_handle,
|
|
|
|
)
|
2019-10-07 15:36:16 -07:00
|
|
|
}
|
|
|
|
}
|