From b03a83fa86daf9e087cbdacb209ce3a521fa79b5 Mon Sep 17 00:00:00 2001 From: Henry de Valence Date: Thu, 17 Oct 2019 16:38:44 -0700 Subject: [PATCH] Simplify TimestampCollector. Previously, the TimestampCollector was intended to own the address book data, so it was intended to be cloneable and hold shared state among all of its handles. This is now modeled more directly by an `Arc>`, so the only functionality left in the `TimestampCollector` is setting up the inital worker, which is better called `spawn` than `new`. This also fixes a problem introduced in the previous commit where the `TimestampCollector` was dropped, causing the worker task to shut down early. --- zebra-network/src/peer/connector.rs | 17 +++-- zebra-network/src/peer_set.rs | 6 +- zebra-network/src/timestamp_collector.rs | 89 ++++-------------------- zebrad/src/commands/connect.rs | 4 +- 4 files changed, 30 insertions(+), 86 deletions(-) diff --git a/zebra-network/src/peer/connector.rs b/zebra-network/src/peer/connector.rs index 104d94b0b..fd0f2e6da 100644 --- a/zebra-network/src/peer/connector.rs +++ b/zebra-network/src/peer/connector.rs @@ -29,7 +29,7 @@ use super::{error::ErrorSlot, server::ServerState, HandshakeError, PeerClient, P pub struct PeerConnector { config: Config, internal_service: S, - sender: mpsc::Sender, + timestamp_collector: mpsc::Sender, nonces: Arc>>, } @@ -39,17 +39,20 @@ where S::Future: Send, { /// Construct a new `PeerConnector`. - pub fn new(config: Config, internal_service: S, collector: &TimestampCollector) -> Self { + pub fn new( + config: Config, + internal_service: S, + timestamp_collector: mpsc::Sender, + ) -> Self { // XXX this function has too many parameters, but it's not clear how to // do a nice builder as all fields are mandatory. Could have Builder1, // Builder2, ..., with Builder1::with_config() -> Builder2; // Builder2::with_internal_service() -> ... or use Options in a single // Builder type or use the derive_builder crate. - let sender = collector.sender_handle(); PeerConnector { config, internal_service, - sender, + timestamp_collector, nonces: Arc::new(Mutex::new(HashSet::new())), } } @@ -78,7 +81,7 @@ where // Clone these upfront, so they can be moved into the future. let nonces = self.nonces.clone(); let internal_service = self.internal_service.clone(); - let sender = self.sender.clone(); + let timestamp_collector = self.timestamp_collector.clone(); let user_agent = self.config.user_agent.clone(); let network = self.config.network.clone(); @@ -181,11 +184,11 @@ where let hooked_peer_rx = peer_rx .then(move |msg| { - let mut sender = sender.clone(); + let mut timestamp_collector = timestamp_collector.clone(); async move { if let Ok(_) = msg { use futures::sink::SinkExt; - let _ = sender + let _ = timestamp_collector .send(MetaAddr { addr, services: remote_services, diff --git a/zebra-network/src/peer_set.rs b/zebra-network/src/peer_set.rs index 9cc38023e..81c1652dc 100644 --- a/zebra-network/src/peer_set.rs +++ b/zebra-network/src/peer_set.rs @@ -57,9 +57,9 @@ where S: Service + Clone + Send + 'static, S::Future: Send + 'static, { - let timestamp_collector = TimestampCollector::new(); + let (address_book, timestamp_collector) = TimestampCollector::spawn(); let peer_connector = Buffer::new( - PeerConnector::new(config.clone(), inbound_service, ×tamp_collector), + PeerConnector::new(config.clone(), inbound_service, timestamp_collector), 1, ); @@ -98,7 +98,7 @@ where // 3. Outgoing peers we connect to in response to load. - (Box::new(peer_set), timestamp_collector.address_book()) + (Box::new(peer_set), address_book) } /// Use the provided `peer_connector` to connect to `initial_peers`, then send diff --git a/zebra-network/src/timestamp_collector.rs b/zebra-network/src/timestamp_collector.rs index 6cc8da346..bf0c459cf 100644 --- a/zebra-network/src/timestamp_collector.rs +++ b/zebra-network/src/timestamp_collector.rs @@ -1,95 +1,36 @@ //! The timestamp collector collects liveness information from peers. -use std::{ - collections::{BTreeMap, HashMap}, - net::SocketAddr, - sync::{Arc, Mutex}, -}; +use std::sync::{Arc, Mutex}; -use chrono::{DateTime, Utc}; use futures::channel::mpsc; use tokio::prelude::*; -use crate::{ - constants, - types::{MetaAddr, PeerServices}, - AddressBook, -}; +use crate::{types::MetaAddr, AddressBook}; /// The timestamp collector hooks into incoming message streams for each peer and /// records per-connection last-seen timestamps into an [`AddressBook`]. -/// -/// On creation, the `TimestampCollector` spawns a worker task to process new -/// timestamp events. The resulting `TimestampCollector` can be cloned, and the -/// worker task and state are shared among all of the clones. -/// -/// XXX add functionality for querying the timestamp data -#[derive(Clone, Debug)] -pub struct TimestampCollector { - // We do not expect mutex contention to be a problem, because - // the dominant accessor is the collector worker, and it has a long - // event buffer to hide latency if other tasks block it temporarily. - data: Arc>, - shutdown: Arc, - worker_tx: mpsc::Sender, -} +pub struct TimestampCollector {} impl TimestampCollector { - /// Constructs a new `TimestampCollector`, spawning a worker task to process updates. - pub fn new() -> TimestampCollector { - let data = Arc::new(Mutex::new(AddressBook::default())); - // We need to make a copy now so we can move data into the async block. - let data2 = data.clone(); - + /// Spawn a new [`TimestampCollector`] task, and return handles for the + /// transmission channel for timestamp events and for the [`AddressBook`] it + /// updates. + pub fn spawn() -> (Arc>, mpsc::Sender) { const TIMESTAMP_WORKER_BUFFER_SIZE: usize = 100; let (worker_tx, mut worker_rx) = mpsc::channel(TIMESTAMP_WORKER_BUFFER_SIZE); - let (shutdown_tx, mut shutdown_rx) = mpsc::channel(0); + let address_book = Arc::new(Mutex::new(AddressBook::default())); + let worker_address_book = address_book.clone(); - // Construct and then spawn a worker. let worker = async move { - use futures::future::{self, Either}; - loop { - match future::select(shutdown_rx.next(), worker_rx.next()).await { - Either::Left((_, _)) => return, // shutdown signal - Either::Right((None, _)) => return, // all workers are gone - Either::Right((Some(event), _)) => data2 - .lock() - .expect("mutex should be unpoisoned") - .update(event), - } + while let Some(event) = worker_rx.next().await { + worker_address_book + .lock() + .expect("mutex should be unpoisoned") + .update(event); } }; tokio::spawn(worker.boxed()); - TimestampCollector { - data, - worker_tx, - shutdown: Arc::new(ShutdownSignal { tx: shutdown_tx }), - } - } - - /// Return a shared reference to the [`AddressBook`] this collector updates. - pub fn address_book(&self) -> Arc> { - self.data.clone() - } - - pub(crate) fn sender_handle(&self) -> mpsc::Sender { - self.worker_tx.clone() - } -} - -/// Sends a signal when dropped. -#[derive(Debug)] -struct ShutdownSignal { - /// Sending () signals that the task holding the rx end should terminate. - /// - /// This should really be a oneshot but calling a oneshot consumes it, - /// and we can't move out of self in Drop. - tx: mpsc::Sender<()>, -} - -impl Drop for ShutdownSignal { - fn drop(&mut self) { - self.tx.try_send(()).expect("tx is only used in drop"); + (address_book, worker_tx) } } diff --git a/zebrad/src/commands/connect.rs b/zebrad/src/commands/connect.rs index 6957711e2..ea14ad74f 100644 --- a/zebrad/src/commands/connect.rs +++ b/zebrad/src/commands/connect.rs @@ -79,9 +79,9 @@ impl ConnectCmd { use tokio::net::TcpStream; use zebra_network::should_be_private::{PeerConnector, TimestampCollector}; - let collector = TimestampCollector::new(); + let (_, collector) = TimestampCollector::spawn(); let mut pc = Buffer::new( - PeerConnector::new(config.clone(), node.clone(), &collector), + PeerConnector::new(config.clone(), node.clone(), collector), 1, );