2019-10-17 15:42:19 -07:00
|
|
|
//! The timestamp collector collects liveness information from peers.
|
2019-10-07 15:36:16 -07:00
|
|
|
|
2019-10-17 16:38:44 -07:00
|
|
|
use std::sync::{Arc, Mutex};
|
2019-10-07 15:36:16 -07:00
|
|
|
|
|
|
|
use futures::channel::mpsc;
|
|
|
|
use tokio::prelude::*;
|
|
|
|
|
2019-10-17 16:38:44 -07:00
|
|
|
use crate::{types::MetaAddr, AddressBook};
|
2019-10-07 15:36:16 -07:00
|
|
|
|
2019-10-17 15:42:19 -07:00
|
|
|
/// The timestamp collector hooks into incoming message streams for each peer and
|
|
|
|
/// records per-connection last-seen timestamps into an [`AddressBook`].
|
2019-10-17 16:38:44 -07:00
|
|
|
pub struct TimestampCollector {}
|
2019-10-07 15:36:16 -07:00
|
|
|
|
|
|
|
impl TimestampCollector {
|
2019-10-17 16:38:44 -07:00
|
|
|
/// Spawn a new [`TimestampCollector`] task, and return handles for the
|
|
|
|
/// transmission channel for timestamp events and for the [`AddressBook`] it
|
|
|
|
/// updates.
|
|
|
|
pub fn spawn() -> (Arc<Mutex<AddressBook>>, mpsc::Sender<MetaAddr>) {
|
2019-10-07 15:36:16 -07:00
|
|
|
const TIMESTAMP_WORKER_BUFFER_SIZE: usize = 100;
|
|
|
|
let (worker_tx, mut worker_rx) = mpsc::channel(TIMESTAMP_WORKER_BUFFER_SIZE);
|
2019-10-17 16:38:44 -07:00
|
|
|
let address_book = Arc::new(Mutex::new(AddressBook::default()));
|
|
|
|
let worker_address_book = address_book.clone();
|
2019-10-07 15:36:16 -07:00
|
|
|
|
|
|
|
let worker = async move {
|
2019-10-17 16:38:44 -07:00
|
|
|
while let Some(event) = worker_rx.next().await {
|
|
|
|
worker_address_book
|
|
|
|
.lock()
|
|
|
|
.expect("mutex should be unpoisoned")
|
|
|
|
.update(event);
|
2019-10-07 15:36:16 -07:00
|
|
|
}
|
|
|
|
};
|
|
|
|
tokio::spawn(worker.boxed());
|
|
|
|
|
2019-10-17 16:38:44 -07:00
|
|
|
(address_book, worker_tx)
|
2019-10-07 15:36:16 -07:00
|
|
|
}
|
|
|
|
}
|