Simplify TimestampCollector.

Previously, the TimestampCollector was intended to own the address book
data, so it was intended to be cloneable and hold shared state among all
of its handles.  This is now modeled more directly by an
`Arc<Mutex<AddressBook>>`, so the only functionality left in the
`TimestampCollector` is setting up the inital worker, which is better
called `spawn` than `new`.

This also fixes a problem introduced in the previous commit where the
`TimestampCollector` was dropped, causing the worker task to shut down
early.
This commit is contained in:
Henry de Valence 2019-10-17 16:38:44 -07:00 committed by Deirdre Connolly
parent 53be838d51
commit b03a83fa86
4 changed files with 30 additions and 86 deletions

View File

@ -29,7 +29,7 @@ use super::{error::ErrorSlot, server::ServerState, HandshakeError, PeerClient, P
pub struct PeerConnector<S> {
config: Config,
internal_service: S,
sender: mpsc::Sender<MetaAddr>,
timestamp_collector: mpsc::Sender<MetaAddr>,
nonces: Arc<Mutex<HashSet<Nonce>>>,
}
@ -39,17 +39,20 @@ where
S::Future: Send,
{
/// Construct a new `PeerConnector`.
pub fn new(config: Config, internal_service: S, collector: &TimestampCollector) -> Self {
pub fn new(
config: Config,
internal_service: S,
timestamp_collector: mpsc::Sender<MetaAddr>,
) -> Self {
// XXX this function has too many parameters, but it's not clear how to
// do a nice builder as all fields are mandatory. Could have Builder1,
// Builder2, ..., with Builder1::with_config() -> Builder2;
// Builder2::with_internal_service() -> ... or use Options in a single
// Builder type or use the derive_builder crate.
let sender = collector.sender_handle();
PeerConnector {
config,
internal_service,
sender,
timestamp_collector,
nonces: Arc::new(Mutex::new(HashSet::new())),
}
}
@ -78,7 +81,7 @@ where
// Clone these upfront, so they can be moved into the future.
let nonces = self.nonces.clone();
let internal_service = self.internal_service.clone();
let sender = self.sender.clone();
let timestamp_collector = self.timestamp_collector.clone();
let user_agent = self.config.user_agent.clone();
let network = self.config.network.clone();
@ -181,11 +184,11 @@ where
let hooked_peer_rx = peer_rx
.then(move |msg| {
let mut sender = sender.clone();
let mut timestamp_collector = timestamp_collector.clone();
async move {
if let Ok(_) = msg {
use futures::sink::SinkExt;
let _ = sender
let _ = timestamp_collector
.send(MetaAddr {
addr,
services: remote_services,

View File

@ -57,9 +57,9 @@ where
S: Service<Request, Response = Response, Error = BoxedStdError> + Clone + Send + 'static,
S::Future: Send + 'static,
{
let timestamp_collector = TimestampCollector::new();
let (address_book, timestamp_collector) = TimestampCollector::spawn();
let peer_connector = Buffer::new(
PeerConnector::new(config.clone(), inbound_service, &timestamp_collector),
PeerConnector::new(config.clone(), inbound_service, timestamp_collector),
1,
);
@ -98,7 +98,7 @@ where
// 3. Outgoing peers we connect to in response to load.
(Box::new(peer_set), timestamp_collector.address_book())
(Box::new(peer_set), address_book)
}
/// Use the provided `peer_connector` to connect to `initial_peers`, then send

View File

@ -1,95 +1,36 @@
//! The timestamp collector collects liveness information from peers.
use std::{
collections::{BTreeMap, HashMap},
net::SocketAddr,
sync::{Arc, Mutex},
};
use std::sync::{Arc, Mutex};
use chrono::{DateTime, Utc};
use futures::channel::mpsc;
use tokio::prelude::*;
use crate::{
constants,
types::{MetaAddr, PeerServices},
AddressBook,
};
use crate::{types::MetaAddr, AddressBook};
/// The timestamp collector hooks into incoming message streams for each peer and
/// records per-connection last-seen timestamps into an [`AddressBook`].
///
/// On creation, the `TimestampCollector` spawns a worker task to process new
/// timestamp events. The resulting `TimestampCollector` can be cloned, and the
/// worker task and state are shared among all of the clones.
///
/// XXX add functionality for querying the timestamp data
#[derive(Clone, Debug)]
pub struct TimestampCollector {
// We do not expect mutex contention to be a problem, because
// the dominant accessor is the collector worker, and it has a long
// event buffer to hide latency if other tasks block it temporarily.
data: Arc<Mutex<AddressBook>>,
shutdown: Arc<ShutdownSignal>,
worker_tx: mpsc::Sender<MetaAddr>,
}
pub struct TimestampCollector {}
impl TimestampCollector {
/// Constructs a new `TimestampCollector`, spawning a worker task to process updates.
pub fn new() -> TimestampCollector {
let data = Arc::new(Mutex::new(AddressBook::default()));
// We need to make a copy now so we can move data into the async block.
let data2 = data.clone();
/// Spawn a new [`TimestampCollector`] task, and return handles for the
/// transmission channel for timestamp events and for the [`AddressBook`] it
/// updates.
pub fn spawn() -> (Arc<Mutex<AddressBook>>, mpsc::Sender<MetaAddr>) {
const TIMESTAMP_WORKER_BUFFER_SIZE: usize = 100;
let (worker_tx, mut worker_rx) = mpsc::channel(TIMESTAMP_WORKER_BUFFER_SIZE);
let (shutdown_tx, mut shutdown_rx) = mpsc::channel(0);
let address_book = Arc::new(Mutex::new(AddressBook::default()));
let worker_address_book = address_book.clone();
// Construct and then spawn a worker.
let worker = async move {
use futures::future::{self, Either};
loop {
match future::select(shutdown_rx.next(), worker_rx.next()).await {
Either::Left((_, _)) => return, // shutdown signal
Either::Right((None, _)) => return, // all workers are gone
Either::Right((Some(event), _)) => data2
.lock()
.expect("mutex should be unpoisoned")
.update(event),
}
while let Some(event) = worker_rx.next().await {
worker_address_book
.lock()
.expect("mutex should be unpoisoned")
.update(event);
}
};
tokio::spawn(worker.boxed());
TimestampCollector {
data,
worker_tx,
shutdown: Arc::new(ShutdownSignal { tx: shutdown_tx }),
}
}
/// Return a shared reference to the [`AddressBook`] this collector updates.
pub fn address_book(&self) -> Arc<Mutex<AddressBook>> {
self.data.clone()
}
pub(crate) fn sender_handle(&self) -> mpsc::Sender<MetaAddr> {
self.worker_tx.clone()
}
}
/// Sends a signal when dropped.
#[derive(Debug)]
struct ShutdownSignal {
/// Sending () signals that the task holding the rx end should terminate.
///
/// This should really be a oneshot but calling a oneshot consumes it,
/// and we can't move out of self in Drop.
tx: mpsc::Sender<()>,
}
impl Drop for ShutdownSignal {
fn drop(&mut self) {
self.tx.try_send(()).expect("tx is only used in drop");
(address_book, worker_tx)
}
}

View File

@ -79,9 +79,9 @@ impl ConnectCmd {
use tokio::net::TcpStream;
use zebra_network::should_be_private::{PeerConnector, TimestampCollector};
let collector = TimestampCollector::new();
let (_, collector) = TimestampCollector::spawn();
let mut pc = Buffer::new(
PeerConnector::new(config.clone(), node.clone(), &collector),
PeerConnector::new(config.clone(), node.clone(), collector),
1,
);