From 1720fe6a4625d8b0d6fb5937633c9ca512e8e271 Mon Sep 17 00:00:00 2001 From: sakridge Date: Thu, 20 Feb 2020 11:46:13 -0800 Subject: [PATCH] Snapshot hash gossip changes (#8358) --- core/src/cluster_info.rs | 30 ++++++++++++++++++++- core/src/crds_value.rs | 39 +++++++++++++++++++++++++-- core/src/snapshot_packager_service.rs | 24 +++++++++++++++-- core/src/tvu.rs | 3 ++- core/tests/bank_forks.rs | 11 +++++++- ledger/src/snapshot_package.rs | 4 +++ ledger/src/snapshot_utils.rs | 1 + 7 files changed, 105 insertions(+), 7 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 9e902be276..2868d5ccee 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -21,7 +21,7 @@ use crate::{ crds_gossip::CrdsGossip, crds_gossip_error::CrdsGossipError, crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, - crds_value::{self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlots, Vote}, + crds_value::{self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlots, SnapshotHash, Vote}, packet::{Packet, PACKET_DATA_SIZE}, result::{Error, Result}, sendmmsg::{multicast, send_mmsg}, @@ -43,6 +43,7 @@ use solana_net_utils::{ }; use solana_perf::packet::{to_packets_with_destination, Packets, PacketsRecycler}; use solana_rayon_threadlimit::get_thread_count; +use solana_sdk::hash::Hash; use solana_sdk::{ clock::{Slot, DEFAULT_MS_PER_SLOT}, pubkey::Pubkey, @@ -436,6 +437,16 @@ impl ClusterInfo { .process_push_message(&self.id(), vec![entry], now); } + pub fn push_snapshot_hashes(&mut self, snapshot_hashes: Vec<(Slot, Hash)>) { + let now = timestamp(); + let entry = CrdsValue::new_signed( + CrdsData::SnapshotHash(SnapshotHash::new(self.id(), snapshot_hashes, now)), + &self.keypair, + ); + self.gossip + .process_push_message(&self.id(), vec![entry], now); + } + pub fn push_vote(&mut self, tower_index: usize, vote: Transaction) { let now = timestamp(); let vote = Vote::new(&self.id(), vote, now); @@ -476,6 +487,23 @@ impl ClusterInfo { (txs, max_ts) } + pub fn get_snapshot_hash(&self, slot: Slot) -> Vec<(Pubkey, Hash)> { + self.gossip + .crds + .table + .values() + .filter_map(|x| x.value.snapshot_hash().map(|v| v)) + .filter_map(|x| { + for (table_slot, hash) in &x.hashes { + if *table_slot == slot { + return Some((x.from, *hash)); + } + } + None + }) + .collect() + } + pub fn get_epoch_state_for_node( &self, pubkey: &Pubkey, diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index df140a45db..97e0b1fc28 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -2,6 +2,7 @@ use crate::contact_info::ContactInfo; use bincode::{serialize, serialized_size}; use solana_sdk::{ clock::Slot, + hash::Hash, pubkey::Pubkey, signature::{Keypair, Signable, Signature}, transaction::Transaction, @@ -61,6 +62,7 @@ pub enum CrdsData { ContactInfo(ContactInfo), Vote(VoteIndex, Vote), EpochSlots(EpochSlotIndex, EpochSlots), + SnapshotHash(SnapshotHash), } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -83,6 +85,23 @@ pub struct EpochIncompleteSlots { pub compressed_list: Vec, } +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct SnapshotHash { + pub from: Pubkey, + pub hashes: Vec<(Slot, Hash)>, + pub wallclock: u64, +} + +impl SnapshotHash { + pub fn new(from: Pubkey, hashes: Vec<(Slot, Hash)>, wallclock: u64) -> Self { + Self { + from, + hashes, + wallclock, + } + } +} + #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct EpochSlots { pub from: Pubkey, @@ -137,6 +156,7 @@ pub enum CrdsValueLabel { ContactInfo(Pubkey), Vote(VoteIndex, Pubkey), EpochSlots(Pubkey), + SnapshotHash(Pubkey), } impl fmt::Display for CrdsValueLabel { @@ -145,6 +165,7 @@ impl fmt::Display for CrdsValueLabel { CrdsValueLabel::ContactInfo(_) => write!(f, "ContactInfo({})", self.pubkey()), CrdsValueLabel::Vote(ix, _) => write!(f, "Vote({}, {})", ix, self.pubkey()), CrdsValueLabel::EpochSlots(_) => write!(f, "EpochSlots({})", self.pubkey()), + CrdsValueLabel::SnapshotHash(_) => write!(f, "SnapshotHash({})", self.pubkey()), } } } @@ -155,6 +176,7 @@ impl CrdsValueLabel { CrdsValueLabel::ContactInfo(p) => *p, CrdsValueLabel::Vote(_, p) => *p, CrdsValueLabel::EpochSlots(p) => *p, + CrdsValueLabel::SnapshotHash(p) => *p, } } } @@ -180,6 +202,7 @@ impl CrdsValue { CrdsData::ContactInfo(contact_info) => contact_info.wallclock, CrdsData::Vote(_, vote) => vote.wallclock, CrdsData::EpochSlots(_, vote) => vote.wallclock, + CrdsData::SnapshotHash(hash) => hash.wallclock, } } pub fn pubkey(&self) -> Pubkey { @@ -187,6 +210,7 @@ impl CrdsValue { CrdsData::ContactInfo(contact_info) => contact_info.id, CrdsData::Vote(_, vote) => vote.from, CrdsData::EpochSlots(_, slots) => slots.from, + CrdsData::SnapshotHash(hash) => hash.from, } } pub fn label(&self) -> CrdsValueLabel { @@ -194,6 +218,7 @@ impl CrdsValue { CrdsData::ContactInfo(_) => CrdsValueLabel::ContactInfo(self.pubkey()), CrdsData::Vote(ix, _) => CrdsValueLabel::Vote(*ix, self.pubkey()), CrdsData::EpochSlots(_, _) => CrdsValueLabel::EpochSlots(self.pubkey()), + CrdsData::SnapshotHash(_) => CrdsValueLabel::SnapshotHash(self.pubkey()), } } pub fn contact_info(&self) -> Option<&ContactInfo> { @@ -222,11 +247,20 @@ impl CrdsValue { _ => None, } } + + pub fn snapshot_hash(&self) -> Option<&SnapshotHash> { + match &self.data { + CrdsData::SnapshotHash(slots) => Some(slots), + _ => None, + } + } + /// Return all the possible labels for a record identified by Pubkey. pub fn record_labels(key: &Pubkey) -> Vec { let mut labels = vec![ CrdsValueLabel::ContactInfo(*key), CrdsValueLabel::EpochSlots(*key), + CrdsValueLabel::SnapshotHash(*key), ]; labels.extend((0..MAX_VOTES).map(|ix| CrdsValueLabel::Vote(ix, *key))); labels @@ -276,13 +310,14 @@ mod test { #[test] fn test_labels() { - let mut hits = [false; 2 + MAX_VOTES as usize]; + let mut hits = [false; 3 + MAX_VOTES as usize]; // this method should cover all the possible labels for v in &CrdsValue::record_labels(&Pubkey::default()) { match v { CrdsValueLabel::ContactInfo(_) => hits[0] = true, CrdsValueLabel::EpochSlots(_) => hits[1] = true, - CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 2] = true, + CrdsValueLabel::SnapshotHash(_) => hits[2] = true, + CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 3] = true, } } assert!(hits.iter().all(|x| *x)); diff --git a/core/src/snapshot_packager_service.rs b/core/src/snapshot_packager_service.rs index 589017ff84..986c96f030 100644 --- a/core/src/snapshot_packager_service.rs +++ b/core/src/snapshot_packager_service.rs @@ -1,3 +1,4 @@ +use crate::cluster_info::ClusterInfo; use solana_ledger::{ snapshot_package::SnapshotPackageReceiver, snapshot_utils::archive_snapshot_package, }; @@ -5,7 +6,7 @@ use std::{ sync::{ atomic::{AtomicBool, Ordering}, mpsc::RecvTimeoutError, - Arc, + Arc, RwLock, }, thread::{self, Builder, JoinHandle}, time::Duration, @@ -15,25 +16,42 @@ pub struct SnapshotPackagerService { t_snapshot_packager: JoinHandle<()>, } +const MAX_SNAPSHOT_HASHES: usize = 24; + impl SnapshotPackagerService { - pub fn new(snapshot_package_receiver: SnapshotPackageReceiver, exit: &Arc) -> Self { + pub fn new( + snapshot_package_receiver: SnapshotPackageReceiver, + exit: &Arc, + cluster_info: &Arc>, + ) -> Self { let exit = exit.clone(); + let cluster_info = cluster_info.clone(); let t_snapshot_packager = Builder::new() .name("solana-snapshot-packager".to_string()) .spawn(move || loop { + let mut hashes = vec![]; if exit.load(Ordering::Relaxed) { break; } match snapshot_package_receiver.recv_timeout(Duration::from_secs(1)) { Ok(mut snapshot_package) => { + hashes.push((snapshot_package.root, snapshot_package.hash)); // Only package the latest while let Ok(new_snapshot_package) = snapshot_package_receiver.try_recv() { snapshot_package = new_snapshot_package; + hashes.push((snapshot_package.root, snapshot_package.hash)); } if let Err(err) = archive_snapshot_package(&snapshot_package) { warn!("Failed to create snapshot archive: {}", err); } + while hashes.len() > MAX_SNAPSHOT_HASHES { + hashes.remove(0); + } + cluster_info + .write() + .unwrap() + .push_snapshot_hashes(hashes.clone()); } Err(RecvTimeoutError::Disconnected) => break, Err(RecvTimeoutError::Timeout) => (), @@ -61,6 +79,7 @@ mod tests { use solana_runtime::{ accounts_db::AccountStorageEntry, bank::BankSlotDelta, bank::MAX_SNAPSHOT_DATA_FILE_SIZE, }; + use solana_sdk::hash::Hash; use std::{ fs::{self, remove_dir_all, OpenOptions}, io::Write, @@ -139,6 +158,7 @@ mod tests { link_snapshots_dir, storage_entries.clone(), output_tar_path.clone(), + Hash::default(), ); // Make tarball from packageable snapshot diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 09d53d9851..c04de4051b 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -153,7 +153,8 @@ impl Tvu { if snapshot_config.is_some() { // Start a snapshot packaging service let (sender, receiver) = channel(); - let snapshot_packager_service = SnapshotPackagerService::new(receiver, exit); + let snapshot_packager_service = + SnapshotPackagerService::new(receiver, exit, &cluster_info.clone()); (Some(snapshot_packager_service), Some(sender)) } else { (None, None) diff --git a/core/tests/bank_forks.rs b/core/tests/bank_forks.rs index 603814792b..da209e2f22 100644 --- a/core/tests/bank_forks.rs +++ b/core/tests/bank_forks.rs @@ -5,6 +5,8 @@ mod tests { use bincode::serialize_into; use fs_extra::dir::CopyOptions; use itertools::Itertools; + use solana_core::cluster_info::ClusterInfo; + use solana_core::contact_info::ContactInfo; use solana_core::{ genesis_utils::{create_genesis_config, GenesisConfigInfo}, snapshot_packager_service::SnapshotPackagerService, @@ -24,6 +26,7 @@ mod tests { signature::{Keypair, KeypairUtil}, system_transaction, }; + use std::sync::RwLock; use std::{fs, path::PathBuf, sync::atomic::AtomicBool, sync::mpsc::channel, sync::Arc}; use tempfile::TempDir; @@ -296,7 +299,13 @@ mod tests { // correctly construct the earlier snapshots because the SnapshotPackage's on the // channel hold hard links to these deleted snapshots. We verify this is the case below. let exit = Arc::new(AtomicBool::new(false)); - let snapshot_packager_service = SnapshotPackagerService::new(receiver, &exit); + + let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( + ContactInfo::default(), + ))); + + let snapshot_packager_service = + SnapshotPackagerService::new(receiver, &exit, &cluster_info); // Close the channel so that the package service will exit after reading all the // packages off the channel diff --git a/ledger/src/snapshot_package.rs b/ledger/src/snapshot_package.rs index 76c4f1e5ec..fd894ae335 100644 --- a/ledger/src/snapshot_package.rs +++ b/ledger/src/snapshot_package.rs @@ -1,5 +1,6 @@ use solana_runtime::{accounts_db::AccountStorageEntry, bank::BankSlotDelta}; use solana_sdk::clock::Slot; +use solana_sdk::hash::Hash; use std::{ path::PathBuf, sync::{ @@ -20,6 +21,7 @@ pub struct SnapshotPackage { pub snapshot_links: TempDir, pub storage_entries: Vec>, pub tar_output_file: PathBuf, + pub hash: Hash, } impl SnapshotPackage { @@ -29,6 +31,7 @@ impl SnapshotPackage { snapshot_links: TempDir, storage_entries: Vec>, tar_output_file: PathBuf, + hash: Hash, ) -> Self { Self { root, @@ -36,6 +39,7 @@ impl SnapshotPackage { snapshot_links, storage_entries, tar_output_file, + hash, } } } diff --git a/ledger/src/snapshot_utils.rs b/ledger/src/snapshot_utils.rs index 8495f11ee1..d6c039c6c4 100644 --- a/ledger/src/snapshot_utils.rs +++ b/ledger/src/snapshot_utils.rs @@ -106,6 +106,7 @@ pub fn package_snapshot, Q: AsRef>( snapshot_hard_links_dir, account_storage_entries, snapshot_package_output_file.as_ref().to_path_buf(), + bank.hash(), ); Ok(package)