Snapshot hash gossip changes (#8358)

This commit is contained in:
sakridge 2020-02-20 11:46:13 -08:00 committed by GitHub
parent e50bc0d34b
commit 1720fe6a46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 105 additions and 7 deletions

View File

@ -21,7 +21,7 @@ use crate::{
crds_gossip::CrdsGossip,
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
crds_value::{self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlots, Vote},
crds_value::{self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlots, SnapshotHash, Vote},
packet::{Packet, PACKET_DATA_SIZE},
result::{Error, Result},
sendmmsg::{multicast, send_mmsg},
@ -43,6 +43,7 @@ use solana_net_utils::{
};
use solana_perf::packet::{to_packets_with_destination, Packets, PacketsRecycler};
use solana_rayon_threadlimit::get_thread_count;
use solana_sdk::hash::Hash;
use solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT},
pubkey::Pubkey,
@ -436,6 +437,16 @@ impl ClusterInfo {
.process_push_message(&self.id(), vec![entry], now);
}
pub fn push_snapshot_hashes(&mut self, snapshot_hashes: Vec<(Slot, Hash)>) {
let now = timestamp();
let entry = CrdsValue::new_signed(
CrdsData::SnapshotHash(SnapshotHash::new(self.id(), snapshot_hashes, now)),
&self.keypair,
);
self.gossip
.process_push_message(&self.id(), vec![entry], now);
}
pub fn push_vote(&mut self, tower_index: usize, vote: Transaction) {
let now = timestamp();
let vote = Vote::new(&self.id(), vote, now);
@ -476,6 +487,23 @@ impl ClusterInfo {
(txs, max_ts)
}
pub fn get_snapshot_hash(&self, slot: Slot) -> Vec<(Pubkey, Hash)> {
self.gossip
.crds
.table
.values()
.filter_map(|x| x.value.snapshot_hash().map(|v| v))
.filter_map(|x| {
for (table_slot, hash) in &x.hashes {
if *table_slot == slot {
return Some((x.from, *hash));
}
}
None
})
.collect()
}
pub fn get_epoch_state_for_node(
&self,
pubkey: &Pubkey,

View File

@ -2,6 +2,7 @@ use crate::contact_info::ContactInfo;
use bincode::{serialize, serialized_size};
use solana_sdk::{
clock::Slot,
hash::Hash,
pubkey::Pubkey,
signature::{Keypair, Signable, Signature},
transaction::Transaction,
@ -61,6 +62,7 @@ pub enum CrdsData {
ContactInfo(ContactInfo),
Vote(VoteIndex, Vote),
EpochSlots(EpochSlotIndex, EpochSlots),
SnapshotHash(SnapshotHash),
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
@ -83,6 +85,23 @@ pub struct EpochIncompleteSlots {
pub compressed_list: Vec<u8>,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct SnapshotHash {
pub from: Pubkey,
pub hashes: Vec<(Slot, Hash)>,
pub wallclock: u64,
}
impl SnapshotHash {
pub fn new(from: Pubkey, hashes: Vec<(Slot, Hash)>, wallclock: u64) -> Self {
Self {
from,
hashes,
wallclock,
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct EpochSlots {
pub from: Pubkey,
@ -137,6 +156,7 @@ pub enum CrdsValueLabel {
ContactInfo(Pubkey),
Vote(VoteIndex, Pubkey),
EpochSlots(Pubkey),
SnapshotHash(Pubkey),
}
impl fmt::Display for CrdsValueLabel {
@ -145,6 +165,7 @@ impl fmt::Display for CrdsValueLabel {
CrdsValueLabel::ContactInfo(_) => write!(f, "ContactInfo({})", self.pubkey()),
CrdsValueLabel::Vote(ix, _) => write!(f, "Vote({}, {})", ix, self.pubkey()),
CrdsValueLabel::EpochSlots(_) => write!(f, "EpochSlots({})", self.pubkey()),
CrdsValueLabel::SnapshotHash(_) => write!(f, "SnapshotHash({})", self.pubkey()),
}
}
}
@ -155,6 +176,7 @@ impl CrdsValueLabel {
CrdsValueLabel::ContactInfo(p) => *p,
CrdsValueLabel::Vote(_, p) => *p,
CrdsValueLabel::EpochSlots(p) => *p,
CrdsValueLabel::SnapshotHash(p) => *p,
}
}
}
@ -180,6 +202,7 @@ impl CrdsValue {
CrdsData::ContactInfo(contact_info) => contact_info.wallclock,
CrdsData::Vote(_, vote) => vote.wallclock,
CrdsData::EpochSlots(_, vote) => vote.wallclock,
CrdsData::SnapshotHash(hash) => hash.wallclock,
}
}
pub fn pubkey(&self) -> Pubkey {
@ -187,6 +210,7 @@ impl CrdsValue {
CrdsData::ContactInfo(contact_info) => contact_info.id,
CrdsData::Vote(_, vote) => vote.from,
CrdsData::EpochSlots(_, slots) => slots.from,
CrdsData::SnapshotHash(hash) => hash.from,
}
}
pub fn label(&self) -> CrdsValueLabel {
@ -194,6 +218,7 @@ impl CrdsValue {
CrdsData::ContactInfo(_) => CrdsValueLabel::ContactInfo(self.pubkey()),
CrdsData::Vote(ix, _) => CrdsValueLabel::Vote(*ix, self.pubkey()),
CrdsData::EpochSlots(_, _) => CrdsValueLabel::EpochSlots(self.pubkey()),
CrdsData::SnapshotHash(_) => CrdsValueLabel::SnapshotHash(self.pubkey()),
}
}
pub fn contact_info(&self) -> Option<&ContactInfo> {
@ -222,11 +247,20 @@ impl CrdsValue {
_ => None,
}
}
pub fn snapshot_hash(&self) -> Option<&SnapshotHash> {
match &self.data {
CrdsData::SnapshotHash(slots) => Some(slots),
_ => None,
}
}
/// Return all the possible labels for a record identified by Pubkey.
pub fn record_labels(key: &Pubkey) -> Vec<CrdsValueLabel> {
let mut labels = vec![
CrdsValueLabel::ContactInfo(*key),
CrdsValueLabel::EpochSlots(*key),
CrdsValueLabel::SnapshotHash(*key),
];
labels.extend((0..MAX_VOTES).map(|ix| CrdsValueLabel::Vote(ix, *key)));
labels
@ -276,13 +310,14 @@ mod test {
#[test]
fn test_labels() {
let mut hits = [false; 2 + MAX_VOTES as usize];
let mut hits = [false; 3 + MAX_VOTES as usize];
// this method should cover all the possible labels
for v in &CrdsValue::record_labels(&Pubkey::default()) {
match v {
CrdsValueLabel::ContactInfo(_) => hits[0] = true,
CrdsValueLabel::EpochSlots(_) => hits[1] = true,
CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 2] = true,
CrdsValueLabel::SnapshotHash(_) => hits[2] = true,
CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 3] = true,
}
}
assert!(hits.iter().all(|x| *x));

View File

@ -1,3 +1,4 @@
use crate::cluster_info::ClusterInfo;
use solana_ledger::{
snapshot_package::SnapshotPackageReceiver, snapshot_utils::archive_snapshot_package,
};
@ -5,7 +6,7 @@ use std::{
sync::{
atomic::{AtomicBool, Ordering},
mpsc::RecvTimeoutError,
Arc,
Arc, RwLock,
},
thread::{self, Builder, JoinHandle},
time::Duration,
@ -15,25 +16,42 @@ pub struct SnapshotPackagerService {
t_snapshot_packager: JoinHandle<()>,
}
const MAX_SNAPSHOT_HASHES: usize = 24;
impl SnapshotPackagerService {
pub fn new(snapshot_package_receiver: SnapshotPackageReceiver, exit: &Arc<AtomicBool>) -> Self {
pub fn new(
snapshot_package_receiver: SnapshotPackageReceiver,
exit: &Arc<AtomicBool>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
) -> Self {
let exit = exit.clone();
let cluster_info = cluster_info.clone();
let t_snapshot_packager = Builder::new()
.name("solana-snapshot-packager".to_string())
.spawn(move || loop {
let mut hashes = vec![];
if exit.load(Ordering::Relaxed) {
break;
}
match snapshot_package_receiver.recv_timeout(Duration::from_secs(1)) {
Ok(mut snapshot_package) => {
hashes.push((snapshot_package.root, snapshot_package.hash));
// Only package the latest
while let Ok(new_snapshot_package) = snapshot_package_receiver.try_recv() {
snapshot_package = new_snapshot_package;
hashes.push((snapshot_package.root, snapshot_package.hash));
}
if let Err(err) = archive_snapshot_package(&snapshot_package) {
warn!("Failed to create snapshot archive: {}", err);
}
while hashes.len() > MAX_SNAPSHOT_HASHES {
hashes.remove(0);
}
cluster_info
.write()
.unwrap()
.push_snapshot_hashes(hashes.clone());
}
Err(RecvTimeoutError::Disconnected) => break,
Err(RecvTimeoutError::Timeout) => (),
@ -61,6 +79,7 @@ mod tests {
use solana_runtime::{
accounts_db::AccountStorageEntry, bank::BankSlotDelta, bank::MAX_SNAPSHOT_DATA_FILE_SIZE,
};
use solana_sdk::hash::Hash;
use std::{
fs::{self, remove_dir_all, OpenOptions},
io::Write,
@ -139,6 +158,7 @@ mod tests {
link_snapshots_dir,
storage_entries.clone(),
output_tar_path.clone(),
Hash::default(),
);
// Make tarball from packageable snapshot

View File

@ -153,7 +153,8 @@ impl Tvu {
if snapshot_config.is_some() {
// Start a snapshot packaging service
let (sender, receiver) = channel();
let snapshot_packager_service = SnapshotPackagerService::new(receiver, exit);
let snapshot_packager_service =
SnapshotPackagerService::new(receiver, exit, &cluster_info.clone());
(Some(snapshot_packager_service), Some(sender))
} else {
(None, None)

View File

@ -5,6 +5,8 @@ mod tests {
use bincode::serialize_into;
use fs_extra::dir::CopyOptions;
use itertools::Itertools;
use solana_core::cluster_info::ClusterInfo;
use solana_core::contact_info::ContactInfo;
use solana_core::{
genesis_utils::{create_genesis_config, GenesisConfigInfo},
snapshot_packager_service::SnapshotPackagerService,
@ -24,6 +26,7 @@ mod tests {
signature::{Keypair, KeypairUtil},
system_transaction,
};
use std::sync::RwLock;
use std::{fs, path::PathBuf, sync::atomic::AtomicBool, sync::mpsc::channel, sync::Arc};
use tempfile::TempDir;
@ -296,7 +299,13 @@ mod tests {
// correctly construct the earlier snapshots because the SnapshotPackage's on the
// channel hold hard links to these deleted snapshots. We verify this is the case below.
let exit = Arc::new(AtomicBool::new(false));
let snapshot_packager_service = SnapshotPackagerService::new(receiver, &exit);
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(
ContactInfo::default(),
)));
let snapshot_packager_service =
SnapshotPackagerService::new(receiver, &exit, &cluster_info);
// Close the channel so that the package service will exit after reading all the
// packages off the channel

View File

@ -1,5 +1,6 @@
use solana_runtime::{accounts_db::AccountStorageEntry, bank::BankSlotDelta};
use solana_sdk::clock::Slot;
use solana_sdk::hash::Hash;
use std::{
path::PathBuf,
sync::{
@ -20,6 +21,7 @@ pub struct SnapshotPackage {
pub snapshot_links: TempDir,
pub storage_entries: Vec<Arc<AccountStorageEntry>>,
pub tar_output_file: PathBuf,
pub hash: Hash,
}
impl SnapshotPackage {
@ -29,6 +31,7 @@ impl SnapshotPackage {
snapshot_links: TempDir,
storage_entries: Vec<Arc<AccountStorageEntry>>,
tar_output_file: PathBuf,
hash: Hash,
) -> Self {
Self {
root,
@ -36,6 +39,7 @@ impl SnapshotPackage {
snapshot_links,
storage_entries,
tar_output_file,
hash,
}
}
}

View File

@ -106,6 +106,7 @@ pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
snapshot_hard_links_dir,
account_storage_entries,
snapshot_package_output_file.as_ref().to_path_buf(),
bank.hash(),
);
Ok(package)