//! The addressbook manages information about what peers exist, when they were //! seen, and what services they provide. use std::{ collections::{BTreeSet, HashMap}, iter::Extend, net::SocketAddr, sync::{Arc, Mutex}, }; use chrono::{DateTime, Utc}; use futures::channel::mpsc; use tokio::prelude::*; use crate::{ constants, types::{MetaAddr, PeerServices}, }; /// A database of peers, their advertised services, and information on when they /// were last seen. #[derive(Default, Debug)] pub struct AddressBook { by_addr: HashMap, PeerServices)>, by_time: BTreeSet, } impl AddressBook { /// Check consistency of the address book invariants or panic, doing work /// quadratic in the address book size. #[cfg(test)] fn assert_consistency(&self) { for (a, (t, s)) in self.by_addr.iter() { for meta in self.by_time.iter().filter(|meta| meta.addr == *a) { if meta.last_seen != *t || meta.services != *s { panic!("meta {:?} is not {:?}, {:?}, {:?}", meta, a, t, s); } } } } /// Returns true if the address book has an entry for `addr`. pub fn contains_addr(&self, addr: &SocketAddr) -> bool { self.by_addr.contains_key(addr) } /// Returns the entry corresponding to `addr`, or `None` if it does not exist. pub fn get_by_addr(&self, addr: SocketAddr) -> Option { let (last_seen, services) = self.by_addr.get(&addr).cloned()?; Some(MetaAddr { addr, last_seen, services, }) } /// Add `new` to the address book, updating the previous entry if `new` is /// more recent or discarding `new` if it is stale. pub fn update(&mut self, new: MetaAddr) { trace!( ?new, data.total = self.by_time.len(), data.recent = (self.by_time.len() - self.disconnected_peers().count()), ); #[cfg(test)] self.assert_consistency(); match self.get_by_addr(new.addr) { Some(prev) => { if prev.last_seen > new.last_seen { return; } else { self.by_time .take(&prev) .expect("cannot have by_addr entry without by_time entry"); } } None => {} } self.by_time.insert(new); self.by_addr.insert(new.addr, (new.last_seen, new.services)); #[cfg(test)] self.assert_consistency(); } /// Compute a cutoff time that can determine whether an entry /// in an address book being updated with peer message timestamps /// represents a known-disconnected peer or a potentially-connected peer. /// /// [`constants::LIVE_PEER_DURATION`] represents the time interval in which /// we are guaranteed to receive at least one message from a peer or close /// the connection. Therefore, if the last-seen timestamp is older than /// [`constants::LIVE_PEER_DURATION`] ago, we know we must have disconnected /// from it. Otherwise, we could potentially be connected to it. fn cutoff_time() -> DateTime { // chrono uses signed durations while stdlib uses unsigned durations use chrono::Duration as CD; Utc::now() - CD::from_std(constants::LIVE_PEER_DURATION).unwrap() } /// Returns true if the given [`SocketAddr`] could potentially be connected /// to a node feeding timestamps into this address book. pub fn is_potentially_connected(&self, addr: &SocketAddr) -> bool { match self.by_addr.get(addr) { None => return false, Some((ref last_seen, _)) => last_seen > &AddressBook::cutoff_time(), } } /// Return an iterator over all peers, ordered from most recently seen to /// least recently seen. pub fn peers<'a>(&'a self) -> impl Iterator + 'a { self.by_time.iter().rev().cloned() } /// Return an iterator over peers known to be disconnected, ordered from most /// recently seen to least recently seen. pub fn disconnected_peers<'a>(&'a self) -> impl Iterator + 'a { use std::net::{IpAddr, Ipv4Addr}; use std::ops::Bound::{Excluded, Unbounded}; let cutoff_meta = MetaAddr { last_seen: AddressBook::cutoff_time(), // The ordering on MetaAddrs is newest-first, then arbitrary, // so any fields will do here. addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), services: PeerServices::default(), }; self.by_time .range((Excluded(cutoff_meta), Unbounded)) .rev() .cloned() } /// Returns an iterator that drains entries from the address book, removing /// them in order from most recent to least recent. pub fn drain_newest<'a>(&'a mut self) -> impl Iterator + 'a { Drain { book: self, newest_first: true, } } /// Returns an iterator that drains entries from the address book, removing /// them in order from most recent to least recent. pub fn drain_oldest<'a>(&'a mut self) -> impl Iterator + 'a { Drain { book: self, newest_first: false, } } } impl Extend for AddressBook { fn extend(&mut self, iter: T) where T: IntoIterator, { for meta in iter.into_iter() { self.update(meta); } } } struct Drain<'a> { book: &'a mut AddressBook, newest_first: bool, } impl<'a> Iterator for Drain<'a> { type Item = MetaAddr; fn next(&mut self) -> Option { let next_item = if self.newest_first { self.book.by_time.iter().next()?.clone() } else { self.book.by_time.iter().rev().next()?.clone() }; self.book.by_time.remove(&next_item); self.book .by_addr .remove(&next_item.addr) .expect("cannot have by_time entry without by_addr entry"); Some(next_item) } }