From a95675a7ce1651f7b59443eb146b356bc4b3f374 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Mon, 11 Jan 2021 10:21:15 -0800 Subject: [PATCH] Avoid tmp snapshot backlog in SnapshotPackagerService under high load (#14516) --- core/src/accounts_hash_verifier.rs | 17 +++++----- core/src/snapshot_packager_service.rs | 45 +++++++++++------------- core/src/tvu.rs | 12 +++---- core/src/validator.rs | 21 +++++++----- core/tests/snapshots.rs | 49 +++++++++++++++++++++++---- runtime/src/snapshot_utils.rs | 7 ++-- 6 files changed, 95 insertions(+), 56 deletions(-) diff --git a/core/src/accounts_hash_verifier.rs b/core/src/accounts_hash_verifier.rs index 25d6b3a420..2274f09509 100644 --- a/core/src/accounts_hash_verifier.rs +++ b/core/src/accounts_hash_verifier.rs @@ -4,10 +4,11 @@ // hash on gossip. Monitor gossip for messages from validators in the --trusted-validators // set and halt the node if a mismatch is detected. -use crate::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES}; -use solana_runtime::snapshot_package::{ - AccountsPackage, AccountsPackageReceiver, AccountsPackageSender, +use crate::{ + cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES}, + snapshot_packager_service::PendingSnapshotPackage, }; +use solana_runtime::snapshot_package::{AccountsPackage, AccountsPackageReceiver}; use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey}; use std::collections::{HashMap, HashSet}; use std::{ @@ -27,7 +28,7 @@ pub struct AccountsHashVerifier { impl AccountsHashVerifier { pub fn new( accounts_package_receiver: AccountsPackageReceiver, - accounts_package_sender: Option, + pending_snapshot_package: Option, exit: &Arc, cluster_info: &Arc, trusted_validators: Option>, @@ -53,7 +54,7 @@ impl AccountsHashVerifier { &cluster_info, &trusted_validators, halt_on_trusted_validators_accounts_hash_mismatch, - &accounts_package_sender, + &pending_snapshot_package, &mut hashes, &exit, fault_injection_rate_slots, @@ -76,7 +77,7 @@ impl AccountsHashVerifier { cluster_info: &ClusterInfo, trusted_validators: &Option>, halt_on_trusted_validator_accounts_hash_mismatch: bool, - accounts_package_sender: &Option, + pending_snapshot_package: &Option, hashes: &mut Vec<(Slot, Hash)>, exit: &Arc, fault_injection_rate_slots: u64, @@ -111,8 +112,8 @@ impl AccountsHashVerifier { } if accounts_package.block_height % snapshot_interval_slots == 0 { - if let Some(sender) = accounts_package_sender.as_ref() { - if sender.send(accounts_package).is_err() {} + if let Some(pending_snapshot_package) = pending_snapshot_package.as_ref() { + *pending_snapshot_package.lock().unwrap() = Some(accounts_package); } } diff --git a/core/src/snapshot_packager_service.rs b/core/src/snapshot_packager_service.rs index 86a608b992..c9f97fcc9a 100644 --- a/core/src/snapshot_packager_service.rs +++ b/core/src/snapshot_packager_service.rs @@ -1,23 +1,24 @@ use crate::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES}; -use solana_runtime::{snapshot_package::AccountsPackageReceiver, snapshot_utils}; +use solana_runtime::{snapshot_package::AccountsPackage, snapshot_utils}; use solana_sdk::{clock::Slot, hash::Hash}; use std::{ sync::{ atomic::{AtomicBool, Ordering}, - mpsc::RecvTimeoutError, - Arc, + Arc, Mutex, }, thread::{self, Builder, JoinHandle}, time::Duration, }; +pub type PendingSnapshotPackage = Arc>>; + pub struct SnapshotPackagerService { t_snapshot_packager: JoinHandle<()>, } impl SnapshotPackagerService { pub fn new( - snapshot_package_receiver: AccountsPackageReceiver, + pending_snapshot_package: PendingSnapshotPackage, starting_snapshot_hash: Option<(Slot, Hash)>, exit: &Arc, cluster_info: &Arc, @@ -26,7 +27,7 @@ impl SnapshotPackagerService { let cluster_info = cluster_info.clone(); let t_snapshot_packager = Builder::new() - .name("solana-snapshot-packager".to_string()) + .name("snapshot-packager".to_string()) .spawn(move || { let mut hashes = vec![]; if let Some(starting_snapshot_hash) = starting_snapshot_hash { @@ -38,32 +39,26 @@ impl SnapshotPackagerService { break; } - match snapshot_package_receiver.recv_timeout(Duration::from_secs(1)) { - Ok(mut snapshot_package) => { - // Only package the latest - while let Ok(new_snapshot_package) = - snapshot_package_receiver.try_recv() - { - snapshot_package = new_snapshot_package; - } - if let Err(err) = - snapshot_utils::archive_snapshot_package(&snapshot_package) - { - warn!("Failed to create snapshot archive: {}", err); - } else { - hashes.push((snapshot_package.slot, snapshot_package.hash)); - while hashes.len() > MAX_SNAPSHOT_HASHES { - hashes.remove(0); - } - cluster_info.push_snapshot_hashes(hashes.clone()); + let snapshot_package = pending_snapshot_package.lock().unwrap().take(); + if let Some(snapshot_package) = snapshot_package { + if let Err(err) = + snapshot_utils::archive_snapshot_package(&snapshot_package) + { + warn!("Failed to create snapshot archive: {}", err); + } else { + hashes.push((snapshot_package.slot, snapshot_package.hash)); + while hashes.len() > MAX_SNAPSHOT_HASHES { + hashes.remove(0); } + cluster_info.push_snapshot_hashes(hashes.clone()); } - Err(RecvTimeoutError::Disconnected) => break, - Err(RecvTimeoutError::Timeout) => (), + } else { + std::thread::sleep(Duration::from_millis(100)); } } }) .unwrap(); + Self { t_snapshot_packager, } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 80647117e0..3c827ec418 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -20,6 +20,7 @@ use crate::{ shred_fetch_stage::ShredFetchStage, sigverify_shreds::ShredSigVerifier, sigverify_stage::SigVerifyStage, + snapshot_packager_service::PendingSnapshotPackage, }; use crossbeam_channel::unbounded; use solana_ledger::{ @@ -34,7 +35,6 @@ use solana_runtime::{ }, bank_forks::{BankForks, SnapshotConfig}, commitment::BlockCommitmentCache, - snapshot_package::AccountsPackageSender, vote_sender_types::ReplayVoteSender, }; use solana_sdk::{ @@ -107,7 +107,7 @@ impl Tvu { transaction_status_sender: Option, rewards_recorder_sender: Option, cache_block_time_sender: Option, - snapshot_config_and_package_sender: Option<(SnapshotConfig, AccountsPackageSender)>, + snapshot_config_and_pending_package: Option<(SnapshotConfig, PendingSnapshotPackage)>, vote_tracker: Arc, retransmit_slots_sender: RetransmitSlotsSender, verified_vote_receiver: VerifiedVoteReceiver, @@ -179,15 +179,15 @@ impl Tvu { } }; info!("snapshot_interval_slots: {}", snapshot_interval_slots); - let (snapshot_config, accounts_package_sender) = snapshot_config_and_package_sender - .map(|(snapshot_config, accounts_package_sender)| { - (Some(snapshot_config), Some(accounts_package_sender)) + let (snapshot_config, pending_snapshot_package) = snapshot_config_and_pending_package + .map(|(snapshot_config, pending_snapshot_package)| { + (Some(snapshot_config), Some(pending_snapshot_package)) }) .unwrap_or((None, None)); let (accounts_hash_sender, accounts_hash_receiver) = channel(); let accounts_hash_verifier = AccountsHashVerifier::new( accounts_hash_receiver, - accounts_package_sender, + pending_snapshot_package, exit, &cluster_info, tvu_config.trusted_validators.clone(), diff --git a/core/src/validator.rs b/core/src/validator.rs index 363f1b1888..0364b8ac62 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -26,7 +26,7 @@ use crate::{ serve_repair::ServeRepair, serve_repair_service::ServeRepairService, sigverify, - snapshot_packager_service::SnapshotPackagerService, + snapshot_packager_service::{PendingSnapshotPackage, SnapshotPackagerService}, tpu::Tpu, transaction_status_service::TransactionStatusService, tvu::{Sockets, Tvu, TvuConfig}, @@ -70,7 +70,7 @@ use std::{ path::{Path, PathBuf}, sync::atomic::{AtomicBool, Ordering}, sync::mpsc::Receiver, - sync::{mpsc::channel, Arc, Mutex, RwLock}, + sync::{Arc, Mutex, RwLock}, thread::sleep, time::Duration, }; @@ -518,7 +518,7 @@ impl Validator { &exit, ); - let (snapshot_packager_service, snapshot_config_and_package_sender) = + let (snapshot_packager_service, snapshot_config_and_pending_package) = if let Some(snapshot_config) = config.snapshot_config.clone() { if is_snapshot_config_invalid( snapshot_config.snapshot_interval_slots, @@ -528,12 +528,17 @@ impl Validator { } // Start a snapshot packaging service - let (sender, receiver) = channel(); - let snapshot_packager_service = - SnapshotPackagerService::new(receiver, snapshot_hash, &exit, &cluster_info); + let pending_snapshot_package = PendingSnapshotPackage::default(); + + let snapshot_packager_service = SnapshotPackagerService::new( + pending_snapshot_package.clone(), + snapshot_hash, + &exit, + &cluster_info, + ); ( Some(snapshot_packager_service), - Some((snapshot_config, sender)), + Some((snapshot_config, pending_snapshot_package)), ) } else { (None, None) @@ -609,7 +614,7 @@ impl Validator { transaction_status_sender.clone(), rewards_recorder_sender, cache_block_time_sender, - snapshot_config_and_package_sender, + snapshot_config_and_pending_package, vote_tracker.clone(), retransmit_slots_sender, verified_vote_receiver, diff --git a/core/tests/snapshots.rs b/core/tests/snapshots.rs index a83b129eb1..0d1cab1d16 100644 --- a/core/tests/snapshots.rs +++ b/core/tests/snapshots.rs @@ -39,8 +39,9 @@ mod tests { use fs_extra::dir::CopyOptions; use itertools::Itertools; use solana_core::{ - cluster_info::ClusterInfo, contact_info::ContactInfo, - snapshot_packager_service::SnapshotPackagerService, + cluster_info::ClusterInfo, + contact_info::ContactInfo, + snapshot_packager_service::{PendingSnapshotPackage, SnapshotPackagerService}, }; use solana_runtime::{ accounts_background_service::{ABSRequestSender, SnapshotRequestHandler}, @@ -60,8 +61,15 @@ mod tests { system_transaction, }; use std::{ - collections::HashSet, fs, path::PathBuf, sync::atomic::AtomicBool, sync::mpsc::channel, - sync::Arc, + collections::HashSet, + fs, + path::PathBuf, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::channel, + Arc, + }, + time::Duration, }; use tempfile::TempDir; @@ -398,10 +406,37 @@ mod tests { let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default())); - let snapshot_packager_service = - SnapshotPackagerService::new(receiver, None, &exit, &cluster_info); + let pending_snapshot_package = PendingSnapshotPackage::default(); + let snapshot_packager_service = SnapshotPackagerService::new( + pending_snapshot_package.clone(), + None, + &exit, + &cluster_info, + ); - // Close the channel so that the package service will exit after reading all the + let _package_receiver = std::thread::Builder::new() + .name("package-receiver".to_string()) + .spawn(move || { + while let Ok(mut snapshot_package) = receiver.recv() { + // Only package the latest + while let Ok(new_snapshot_package) = receiver.try_recv() { + snapshot_package = new_snapshot_package; + } + + *pending_snapshot_package.lock().unwrap() = Some(snapshot_package); + } + + // Wait until the package is consumed by SnapshotPackagerService + while pending_snapshot_package.lock().unwrap().is_some() { + std::thread::sleep(Duration::from_millis(100)); + } + + // Shutdown SnapshotPackagerService + exit.store(true, Ordering::Relaxed); + }) + .unwrap(); + + // Close the channel so that the package receiver will exit after reading all the // packages off the channel drop(sender); diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index 851f309a61..acd8e74441 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -163,7 +163,7 @@ pub fn package_snapshot, Q: AsRef>( ) -> Result { // Hard link all the snapshots we need for this package let snapshot_hard_links_dir = tempfile::Builder::new() - .prefix(TMP_SNAPSHOT_DIR_PREFIX) + .prefix(&format!("{}{}-", TMP_SNAPSHOT_DIR_PREFIX, bank.slot())) .tempdir_in(snapshot_path)?; // Create a snapshot package @@ -251,7 +251,10 @@ pub fn archive_snapshot_package(snapshot_package: &AccountsPackage) -> Result<() // Create the staging directories let staging_dir = tempfile::Builder::new() - .prefix(TMP_SNAPSHOT_DIR_PREFIX) + .prefix(&format!( + "{}{}-", + TMP_SNAPSHOT_DIR_PREFIX, snapshot_package.slot + )) .tempdir_in(tar_dir)?; let staging_accounts_dir = staging_dir.path().join(TAR_ACCOUNTS_DIR);