From b468ead1b167c90d46f669d83e176f241019be62 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Wed, 28 Apr 2021 00:15:11 +0000 Subject: [PATCH] uses current timestamp when flushing local pending push queue (#16808) local_message_pending_push_queue is recording timestamps at the time the value is created, and uses that when the pending values are flushed: https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L321 https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds_gossip.rs#L96-L102 which is then used as the insert_timestamp when inserting values in the crds table: https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds_gossip_push.rs#L183 The flushing may happen 100ms after the values are created (or even later if there is a lock contention). This will cause non-monotone insert_timestamps in the crds table (where time goes backward), hindering the usability of insert_timestamps for other computations. For example both ClusterInfo::get_votes and get_epoch_slots_since rely on monotone insert_timestamps when values are inserted into the table: https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1197-L1215 https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1274-L1298 This commit removes timestamps from local_message_pending_push_queue and uses current timestamp when flushing the queue. --- core/src/cluster_info.rs | 43 +++++++++++++++++++--------------------- core/src/crds_gossip.rs | 25 ++++++++++++----------- 2 files changed, 33 insertions(+), 35 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 584463eef0..3236609ad4 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -71,7 +71,7 @@ use std::{ ops::{Deref, DerefMut}, path::{Path, PathBuf}, sync::atomic::{AtomicBool, AtomicU64, Ordering}, - sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, + sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}, thread::{sleep, Builder, JoinHandle}, time::{Duration, Instant}, }; @@ -318,7 +318,7 @@ pub struct ClusterInfo { id: Pubkey, stats: GossipStats, socket: UdpSocket, - local_message_pending_push_queue: RwLock>, + local_message_pending_push_queue: Mutex>, contact_debug_interval: u64, // milliseconds, 0 = disabled contact_save_interval: u64, // milliseconds, 0 = disabled instance: NodeInstance, @@ -588,7 +588,7 @@ impl ClusterInfo { id, stats: GossipStats::default(), socket: UdpSocket::bind("0.0.0.0:0").unwrap(), - local_message_pending_push_queue: RwLock::new(vec![]), + local_message_pending_push_queue: Mutex::default(), contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS, instance: NodeInstance::new(&mut thread_rng(), id, timestamp()), contact_info_path: PathBuf::default(), @@ -620,9 +620,9 @@ impl ClusterInfo { id: *new_id, stats: GossipStats::default(), socket: UdpSocket::bind("0.0.0.0:0").unwrap(), - local_message_pending_push_queue: RwLock::new( + local_message_pending_push_queue: Mutex::new( self.local_message_pending_push_queue - .read() + .lock() .unwrap() .clone(), ), @@ -651,13 +651,10 @@ impl ClusterInfo { .into_iter() .map(|v| CrdsValue::new_signed(v, &self.keypair)) .collect(); - { - let mut local_message_pending_push_queue = - self.local_message_pending_push_queue.write().unwrap(); - for entry in entries { - local_message_pending_push_queue.push((entry, now)); - } - } + self.local_message_pending_push_queue + .lock() + .unwrap() + .extend(entries); self.gossip .write() .unwrap() @@ -1008,9 +1005,9 @@ impl ClusterInfo { &self.keypair, ); self.local_message_pending_push_queue - .write() + .lock() .unwrap() - .push((entry, now)); + .push(entry); } } @@ -1064,9 +1061,9 @@ impl ClusterInfo { if n > 0 { let entry = CrdsValue::new_signed(CrdsData::EpochSlots(ix, slots), &self.keypair); self.local_message_pending_push_queue - .write() + .lock() .unwrap() - .push((entry, now)); + .push(entry); } num += n; if num < update.len() { @@ -1092,12 +1089,11 @@ impl ClusterInfo { GossipWriteLock::new(self.gossip.write().unwrap(), label, counter) } - pub fn push_message(&self, message: CrdsValue) { - let now = message.wallclock(); + pub(crate) fn push_message(&self, message: CrdsValue) { self.local_message_pending_push_queue - .write() + .lock() .unwrap() - .push((message, now)); + .push(message); } pub fn push_accounts_hashes(&self, accounts_hashes: Vec<(Slot, Hash)>) { @@ -1695,15 +1691,16 @@ impl ClusterInfo { }) .collect() } - fn drain_push_queue(&self) -> Vec<(CrdsValue, u64)> { - let mut push_queue = self.local_message_pending_push_queue.write().unwrap(); + + fn drain_push_queue(&self) -> Vec { + let mut push_queue = self.local_message_pending_push_queue.lock().unwrap(); std::mem::take(&mut *push_queue) } #[cfg(test)] pub fn flush_push_queue(&self) { let pending_push_messages = self.drain_push_queue(); let mut gossip = self.gossip.write().unwrap(); - gossip.process_push_messages(pending_push_messages); + gossip.process_push_messages(pending_push_messages, timestamp()); } fn new_push_requests( &self, diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index bb2257fa28..757dfc85ce 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -62,15 +62,12 @@ impl CrdsGossip { values .into_iter() .filter_map(|val| { - let res = self + let old = self .push - .process_push_message(&mut self.crds, from, val, now); - if let Ok(Some(val)) = res { - self.pull.record_old_hash(val.value_hash, now); - Some(val) - } else { - None - } + .process_push_message(&mut self.crds, from, val, now) + .ok()?; + self.pull.record_old_hash(old.as_ref()?.value_hash, now); + old }) .collect() } @@ -93,8 +90,12 @@ impl CrdsGossip { prune_map } - pub fn process_push_messages(&mut self, pending_push_messages: Vec<(CrdsValue, u64)>) { - for (push_message, timestamp) in pending_push_messages { + pub(crate) fn process_push_messages( + &mut self, + pending_push_messages: Vec, + timestamp: u64, + ) { + for push_message in pending_push_messages { let _ = self.push .process_push_message(&mut self.crds, &self.id, push_message, timestamp); @@ -103,10 +104,10 @@ impl CrdsGossip { pub fn new_push_messages( &mut self, - pending_push_messages: Vec<(CrdsValue, u64)>, + pending_push_messages: Vec, now: u64, ) -> HashMap> { - self.process_push_messages(pending_push_messages); + self.process_push_messages(pending_push_messages, now); self.push.new_push_messages(&self.crds, now) }