From 9ddd6f08e863dca5d1c518e6f30ade7e3dba005f Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Fri, 25 Dec 2020 22:31:25 -0800 Subject: [PATCH] Persist gossip contact info --- Cargo.lock | 1 + core/src/cluster_info.rs | 137 +++++++++++++++++++++++++++++++++++++-- core/src/validator.rs | 13 ++-- validator/Cargo.toml | 1 + validator/src/main.rs | 11 +++- 5 files changed, 152 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ab181df37d..0eb4d94857 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5101,6 +5101,7 @@ name = "solana-validator" version = "1.6.0" dependencies = [ "base64 0.12.3", + "bincode", "chrono", "clap", "console", diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 3c416088e8..b5ec6baad2 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -69,8 +69,11 @@ use std::{ cmp::min, collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, fmt::{self, Debug}, + fs::{self, File}, + io::BufReader, net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}, ops::{Deref, DerefMut}, + path::{Path, PathBuf}, sync::atomic::{AtomicBool, AtomicU64, Ordering}, sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, thread::{sleep, Builder, JoinHandle}, @@ -107,7 +110,8 @@ const MAX_PRUNE_DATA_NODES: usize = 32; const GOSSIP_PING_TOKEN_SIZE: usize = 32; const GOSSIP_PING_CACHE_CAPACITY: usize = 16384; const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(640); -pub const DEFAULT_CONTACT_DEBUG_INTERVAL: u64 = 10_000; +pub const DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS: u64 = 10_000; +pub const DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS: u64 = 60_000; /// Minimum serialized size of a Protocol::PullResponse packet. const PULL_RESPONSE_MIN_SERIALIZED_SIZE: usize = 167; @@ -297,8 +301,10 @@ pub struct ClusterInfo { stats: GossipStats, socket: UdpSocket, local_message_pending_push_queue: RwLock>, - contact_debug_interval: u64, + contact_debug_interval: u64, // milliseconds, 0 = disabled + contact_save_interval: u64, // milliseconds, 0 = disabled instance: NodeInstance, + contact_info_path: PathBuf, } impl Default for ClusterInfo { @@ -554,8 +560,10 @@ impl ClusterInfo { stats: GossipStats::default(), socket: UdpSocket::bind("0.0.0.0:0").unwrap(), local_message_pending_push_queue: RwLock::new(vec![]), - contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL, + contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS, instance: NodeInstance::new(&mut thread_rng(), id, timestamp()), + contact_info_path: PathBuf::default(), + contact_save_interval: 0, // disabled }; { let mut gossip = me.gossip.write().unwrap(); @@ -591,6 +599,8 @@ impl ClusterInfo { ), contact_debug_interval: self.contact_debug_interval, instance: NodeInstance::new(&mut thread_rng(), *new_id, timestamp()), + contact_info_path: PathBuf::default(), + contact_save_interval: 0, // disabled } } @@ -649,6 +659,117 @@ impl ClusterInfo { *self.entrypoints.write().unwrap() = entrypoints; } + pub fn save_contact_info(&self) { + let nodes = { + let gossip = self.gossip.read().unwrap(); + let entrypoint_gossip_addrs = self + .entrypoints + .read() + .unwrap() + .iter() + .map(|contact_info| contact_info.gossip) + .collect::>(); + + gossip + .crds + .get_nodes() + .filter_map(|v| { + // Don't save: + // 1. Our ContactInfo. No point + // 2. Entrypoint ContactInfo. This will avoid adopting the incorrect shred + // version on restart if the entrypoint shred version changes. Also + // there's not much point in saving entrypoint ContactInfo since by + // definition that information is already available + let contact_info = v.value.contact_info().unwrap(); + if contact_info.id != self.id() + && !entrypoint_gossip_addrs.contains(&contact_info.gossip) + { + return Some(v.value.clone()); + } + None + }) + .collect::>() + }; + + if nodes.is_empty() { + return; + } + + let filename = self.contact_info_path.join("contact-info.bin"); + let tmp_filename = &filename.with_extension("tmp"); + + match File::create(&tmp_filename) { + Ok(mut file) => { + if let Err(err) = bincode::serialize_into(&mut file, &nodes) { + warn!( + "Failed to serialize contact info info {}: {}", + tmp_filename.display(), + err + ); + return; + } + } + Err(err) => { + warn!("Failed to create {}: {}", tmp_filename.display(), err); + return; + } + } + + match fs::rename(&tmp_filename, &filename) { + Ok(()) => { + info!( + "Saved contact info for {} nodes into {}", + nodes.len(), + filename.display() + ); + } + Err(err) => { + warn!( + "Failed to rename {} to {}: {}", + tmp_filename.display(), + filename.display(), + err + ); + } + } + } + + pub fn restore_contact_info(&mut self, contact_info_path: &Path, contact_save_interval: u64) { + self.contact_info_path = contact_info_path.into(); + self.contact_save_interval = contact_save_interval; + + let filename = contact_info_path.join("contact-info.bin"); + if !filename.exists() { + return; + } + + let nodes: Vec = match File::open(&filename) { + Ok(file) => { + bincode::deserialize_from(&mut BufReader::new(file)).unwrap_or_else(|err| { + warn!("Failed to deserialize {}: {}", filename.display(), err); + vec![] + }) + } + Err(err) => { + warn!("Failed to open {}: {}", filename.display(), err); + vec![] + } + }; + + info!( + "Loaded contact info for {} nodes from {}", + nodes.len(), + filename.display() + ); + let now = timestamp(); + let mut gossip = self.gossip.write().unwrap(); + for node in nodes { + if let Err(err) = gossip.crds.insert(node, now) { + warn!("crds insert failed {:?}", err); + } + } + } + pub fn id(&self) -> Pubkey { self.id } @@ -1805,6 +1926,7 @@ impl ClusterInfo { .spawn(move || { let mut last_push = timestamp(); let mut last_contact_info_trace = timestamp(); + let mut last_contact_info_save = timestamp(); let mut entrypoints_processed = false; let recycler = PacketsRecycler::default(); let crds_data = vec![ @@ -1822,7 +1944,7 @@ impl ClusterInfo { if self.contact_debug_interval != 0 && start - last_contact_info_trace > self.contact_debug_interval { - // Log contact info every 10 seconds + // Log contact info info!( "\n{}\n\n{}", self.contact_info_trace(), @@ -1831,6 +1953,13 @@ impl ClusterInfo { last_contact_info_trace = start; } + if self.contact_save_interval != 0 + && start - last_contact_info_save > self.contact_save_interval + { + self.save_contact_info(); + last_contact_info_save = start; + } + let stakes: HashMap<_, _> = match bank_forks { Some(ref bank_forks) => { bank_forks.read().unwrap().root_bank().staked_nodes() diff --git a/core/src/validator.rs b/core/src/validator.rs index e7c8935928..2ebc5f6ea0 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -3,7 +3,10 @@ use crate::{ broadcast_stage::BroadcastStageType, cache_block_time_service::{CacheBlockTimeSender, CacheBlockTimeService}, - cluster_info::{ClusterInfo, Node, DEFAULT_CONTACT_DEBUG_INTERVAL}, + cluster_info::{ + ClusterInfo, Node, DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS, + DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS, + }, cluster_info_vote_listener::VoteTracker, completed_data_sets_service::CompletedDataSetsService, consensus::{reconcile_blockstore_roots_with_tower, Tower}, @@ -107,6 +110,7 @@ pub struct ValidatorConfig { pub require_tower: bool, pub debug_keys: Option>>, pub contact_debug_interval: u64, + pub contact_save_interval: u64, pub bpf_jit: bool, pub send_transaction_retry_ms: u64, pub send_transaction_leader_forward_count: u64, @@ -147,7 +151,8 @@ impl Default for ValidatorConfig { cuda: false, require_tower: false, debug_keys: None, - contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL, + contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS, + contact_save_interval: DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS, bpf_jit: false, send_transaction_retry_ms: 2000, send_transaction_leader_forward_count: 2, @@ -365,6 +370,8 @@ impl Validator { let mut cluster_info = ClusterInfo::new(node.info.clone(), identity_keypair.clone()); cluster_info.set_contact_debug_interval(config.contact_debug_interval); + cluster_info.set_entrypoints(cluster_entrypoints); + cluster_info.restore_contact_info(ledger_path, config.contact_save_interval); let cluster_info = Arc::new(cluster_info); let mut block_commitment_cache = BlockCommitmentCache::default(); block_commitment_cache.initialize_slots(bank.slot()); @@ -495,8 +502,6 @@ impl Validator { config.gossip_validators.clone(), &exit, ); - cluster_info.set_entrypoints(cluster_entrypoints); - let serve_repair = Arc::new(RwLock::new(ServeRepair::new(cluster_info.clone()))); let serve_repair_service = ServeRepairService::new( &serve_repair, diff --git a/validator/Cargo.toml b/validator/Cargo.toml index e1b9aaa2b8..9b9808fd6d 100644 --- a/validator/Cargo.toml +++ b/validator/Cargo.toml @@ -11,6 +11,7 @@ default-run = "solana-validator" [dependencies] base64 = "0.12.3" +bincode = "1.3.1" clap = "2.33.1" chrono = { version = "0.4.11", features = ["serde"] } console = "0.11.3" diff --git a/validator/src/main.rs b/validator/src/main.rs index 0e0bf7d7da..957233ef11 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -108,12 +108,13 @@ fn get_trusted_snapshot_hashes( fn start_gossip_node( identity_keypair: &Arc, cluster_entrypoints: &[ContactInfo], + ledger_path: &Path, gossip_addr: &SocketAddr, gossip_socket: UdpSocket, expected_shred_version: Option, gossip_validators: Option>, ) -> (Arc, Arc, GossipService) { - let cluster_info = ClusterInfo::new( + let mut cluster_info = ClusterInfo::new( ClusterInfo::gossip_contact_info( &identity_keypair.pubkey(), *gossip_addr, @@ -122,6 +123,7 @@ fn start_gossip_node( identity_keypair.clone(), ); cluster_info.set_entrypoints(cluster_entrypoints.to_vec()); + cluster_info.restore_contact_info(ledger_path, 0); let cluster_info = Arc::new(cluster_info); let gossip_exit_flag = Arc::new(AtomicBool::new(false)); @@ -590,6 +592,7 @@ fn rpc_bootstrap( gossip = Some(start_gossip_node( &identity_keypair, &cluster_entrypoints, + ledger_path, &node.info.gossip, node.sockets.gossip.try_clone().unwrap(), validator_config.expected_shred_version, @@ -689,8 +692,9 @@ fn rpc_bootstrap( .map_err(|err| format!("Failed to get RPC node slot: {}", err)) .and_then(|slot| { info!("RPC node root slot: {}", slot); - let (_cluster_info, gossip_exit_flag, gossip_service) = + let (cluster_info, gossip_exit_flag, gossip_service) = gossip.take().unwrap(); + cluster_info.save_contact_info(); gossip_exit_flag.store(true, Ordering::Relaxed); let ret = download_snapshot( &rpc_contact_info.rpc, @@ -747,7 +751,8 @@ fn rpc_bootstrap( ); blacklisted_rpc_nodes.insert(rpc_contact_info.id); } - if let Some((_cluster_info, gossip_exit_flag, gossip_service)) = gossip.take() { + if let Some((cluster_info, gossip_exit_flag, gossip_service)) = gossip.take() { + cluster_info.save_contact_info(); gossip_exit_flag.store(true, Ordering::Relaxed); gossip_service.join().unwrap(); }