Rewrite AddressBook to use a BTreeSet.

The previous implementation failed when timestamps were duplicated between
peers, because there was not a 1-1 relationship between timestamps and peers.
This commit is contained in:
Henry de Valence 2019-10-17 21:19:23 -07:00 committed by Deirdre Connolly
parent 5e9ac87f57
commit 39d38a8647
3 changed files with 82 additions and 41 deletions

View File

@ -2,7 +2,7 @@
//! seen, and what services they provide. //! seen, and what services they provide.
use std::{ use std::{
collections::{BTreeMap, HashMap}, collections::{BTreeSet, HashMap},
iter::Extend, iter::Extend,
net::SocketAddr, net::SocketAddr,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
@ -22,20 +22,31 @@ use crate::{
#[derive(Default, Debug)] #[derive(Default, Debug)]
pub struct AddressBook { pub struct AddressBook {
by_addr: HashMap<SocketAddr, (DateTime<Utc>, PeerServices)>, by_addr: HashMap<SocketAddr, (DateTime<Utc>, PeerServices)>,
by_time: BTreeMap<DateTime<Utc>, (SocketAddr, PeerServices)>, by_time: BTreeSet<MetaAddr>,
} }
impl AddressBook { impl AddressBook {
fn assert_consistency(&self) {
for (a, (t, s)) in self.by_addr.iter() {
for meta in self.by_time.iter().filter(|meta| meta.addr == *a) {
if meta.last_seen != *t || meta.services != *s {
panic!("meta {:?} is not {:?}, {:?}, {:?}", meta, a, t, s);
}
}
}
}
/// Update the address book with `event`, a [`MetaAddr`] representing /// Update the address book with `event`, a [`MetaAddr`] representing
/// observation of a peer. /// observation of a peer.
pub fn update(&mut self, event: MetaAddr) { pub fn update(&mut self, event: MetaAddr) {
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
debug!( trace!(
?event, ?event,
data.total = self.by_time.len(), data.total = self.by_time.len(),
data.recent = (self.by_time.len() - self.disconnected_peers().count()), data.recent = (self.by_time.len() - self.disconnected_peers().count()),
); );
//self.assert_consistency();
let MetaAddr { let MetaAddr {
addr, addr,
@ -45,37 +56,40 @@ impl AddressBook {
match self.by_addr.entry(addr) { match self.by_addr.entry(addr) {
Entry::Occupied(mut entry) => { Entry::Occupied(mut entry) => {
let (prev_last_seen, _) = entry.get(); let (prev_last_seen, prev_services) = entry.get().clone();
// If the new timestamp event is older than the current // Ignore stale entries.
// one, discard it. This is irrelevant for the timestamp if prev_last_seen > last_seen {
// collector but is important for combining address
// information from different peers.
if *prev_last_seen > last_seen {
return; return;
} }
self.by_time self.by_time
.remove(prev_last_seen) .take(&MetaAddr {
addr,
services: prev_services,
last_seen: prev_last_seen,
})
.expect("cannot have by_addr entry without by_time entry"); .expect("cannot have by_addr entry without by_time entry");
entry.insert((last_seen, services)); entry.insert((last_seen, services));
self.by_time.insert(last_seen, (addr, services)); self.by_time.insert(event);
} }
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
entry.insert((last_seen, services)); entry.insert((last_seen, services));
self.by_time.insert(last_seen, (addr, services)); self.by_time.insert(event);
} }
} }
//self.assert_consistency();
} }
/// Return an iterator over all peers, ordered from most recently seen to /// Return an iterator over all peers, ordered from most recently seen to
/// least recently seen. /// least recently seen.
pub fn peers<'a>(&'a self) -> impl Iterator<Item = MetaAddr> + 'a { pub fn peers<'a>(&'a self) -> impl Iterator<Item = MetaAddr> + 'a {
self.by_time.iter().rev().map(from_by_time_kv) self.by_time.iter().rev().cloned()
} }
/// Return an iterator over peers known to be disconnected, ordered from most /// Return an iterator over peers known to be disconnected, ordered from most
/// recently seen to least recently seen. /// recently seen to least recently seen.
pub fn disconnected_peers<'a>(&'a self) -> impl Iterator<Item = MetaAddr> + 'a { pub fn disconnected_peers<'a>(&'a self) -> impl Iterator<Item = MetaAddr> + 'a {
use chrono::Duration as CD; use chrono::Duration as CD;
use std::net::{IpAddr, Ipv4Addr};
use std::ops::Bound::{Excluded, Unbounded}; use std::ops::Bound::{Excluded, Unbounded};
// LIVE_PEER_DURATION represents the time interval in which we are // LIVE_PEER_DURATION represents the time interval in which we are
@ -83,11 +97,18 @@ impl AddressBook {
// connection. Therefore, if the last-seen timestamp is older than // connection. Therefore, if the last-seen timestamp is older than
// LIVE_PEER_DURATION ago, we know we must have disconnected from it. // LIVE_PEER_DURATION ago, we know we must have disconnected from it.
let cutoff = Utc::now() - CD::from_std(constants::LIVE_PEER_DURATION).unwrap(); let cutoff = Utc::now() - CD::from_std(constants::LIVE_PEER_DURATION).unwrap();
let cutoff_meta = MetaAddr {
last_seen: cutoff,
// The ordering on MetaAddrs is newest-first, then arbitrary,
// so any fields will do here.
addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
services: PeerServices::default(),
};
self.by_time self.by_time
.range((Unbounded, Excluded(cutoff))) .range((Excluded(cutoff_meta), Unbounded))
.rev() .rev()
.map(from_by_time_kv) .cloned()
} }
/// Returns an iterator that drains entries from the address book, removing /// Returns an iterator that drains entries from the address book, removing
@ -97,17 +118,6 @@ impl AddressBook {
} }
} }
// Helper impl to convert by_time Iterator Items back to MetaAddrs
// This could easily be a From impl, but trait impls are public, and this shouldn't be.
fn from_by_time_kv(by_time_kv: (&DateTime<Utc>, &(SocketAddr, PeerServices))) -> MetaAddr {
let (last_seen, (addr, services)) = by_time_kv;
MetaAddr {
last_seen: last_seen.clone(),
addr: addr.clone(),
services: services.clone(),
}
}
impl Extend<MetaAddr> for AddressBook { impl Extend<MetaAddr> for AddressBook {
fn extend<T>(&mut self, iter: T) fn extend<T>(&mut self, iter: T)
where where
@ -127,17 +137,12 @@ impl<'a> Iterator for Drain<'a> {
type Item = MetaAddr; type Item = MetaAddr;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
let most_recent = self.book.by_time.keys().rev().next()?.clone(); let most_recent = self.book.by_time.iter().next()?.clone();
let (addr, services) = self self.book.by_time.remove(&most_recent);
.book self.book
.by_time .by_addr
.remove(&most_recent) .remove(&most_recent.addr)
.expect("key from keys() must be present in btreemap"); .expect("cannot have by_time entry without by_addr entry");
self.book.by_addr.remove(&addr); Some(most_recent)
Some(MetaAddr {
addr,
services,
last_seen: most_recent,
})
} }
} }

View File

@ -1,7 +1,10 @@
//! An address-with-metadata type used in Bitcoin networking. //! An address-with-metadata type used in Bitcoin networking.
use std::io::{Read, Write}; use std::{
use std::net::SocketAddr; cmp::{Ord, Ordering},
io::{Read, Write},
net::SocketAddr,
};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use chrono::{DateTime, TimeZone, Utc}; use chrono::{DateTime, TimeZone, Utc};
@ -28,6 +31,34 @@ pub struct MetaAddr {
pub last_seen: DateTime<Utc>, pub last_seen: DateTime<Utc>,
} }
impl Ord for MetaAddr {
/// `MetaAddr`s are sorted newest-first, and then in an arbitrary
/// but determinate total order.
fn cmp(&self, other: &Self) -> Ordering {
let newest_first = self.last_seen.cmp(&other.last_seen).reverse();
newest_first.then_with(|| {
// The remainder is meaningless as an ordering, but required so that we
// have a total order on `MetaAddr` values: self and other must compare
// as Ordering::Equal iff they are equal.
use std::net::IpAddr::{V4, V6};
match (self.addr.ip(), other.addr.ip()) {
(V4(a), V4(b)) => a.octets().cmp(&b.octets()),
(V6(a), V6(b)) => a.octets().cmp(&b.octets()),
(V4(_), V6(_)) => Ordering::Less,
(V6(_), V4(_)) => Ordering::Greater,
}
.then(self.addr.port().cmp(&other.addr.port()))
.then(self.services.bits().cmp(&other.services.bits()))
})
}
}
impl PartialOrd for MetaAddr {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl ZcashSerialize for MetaAddr { impl ZcashSerialize for MetaAddr {
fn zcash_serialize<W: Write>(&self, mut writer: W) -> Result<(), SerializationError> { fn zcash_serialize<W: Write>(&self, mut writer: W) -> Result<(), SerializationError> {
writer.write_u32::<LittleEndian>(self.last_seen.timestamp() as u32)?; writer.write_u32::<LittleEndian>(self.last_seen.timestamp() as u32)?;

View File

@ -142,6 +142,11 @@ impl ConnectCmd {
let addrs = all_addrs.drain_recent().collect::<Vec<_>>(); let addrs = all_addrs.drain_recent().collect::<Vec<_>>();
info!(addrs.len = addrs.len(), ab.len = all_addrs.peers().count()); info!(addrs.len = addrs.len(), ab.len = all_addrs.peers().count());
let mut head = Vec::new();
head.extend_from_slice(&addrs[0..5]);
let mut tail = Vec::new();
tail.extend_from_slice(&addrs[addrs.len() - 5..]);
info!(addrs.first = ?head, addrs.last = ?tail);
loop { loop {
// empty loop ensures we don't exit the application, // empty loop ensures we don't exit the application,