Avoid tmp snapshot backlog in SnapshotPackagerService under high load (#14516)

This commit is contained in:
Michael Vines 2021-01-11 10:21:15 -08:00 committed by GitHub
parent 3fb9f017d3
commit a95675a7ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 95 additions and 56 deletions

View File

@ -4,10 +4,11 @@
// hash on gossip. Monitor gossip for messages from validators in the --trusted-validators
// set and halt the node if a mismatch is detected.
use crate::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES};
use solana_runtime::snapshot_package::{
AccountsPackage, AccountsPackageReceiver, AccountsPackageSender,
use crate::{
cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES},
snapshot_packager_service::PendingSnapshotPackage,
};
use solana_runtime::snapshot_package::{AccountsPackage, AccountsPackageReceiver};
use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey};
use std::collections::{HashMap, HashSet};
use std::{
@ -27,7 +28,7 @@ pub struct AccountsHashVerifier {
impl AccountsHashVerifier {
pub fn new(
accounts_package_receiver: AccountsPackageReceiver,
accounts_package_sender: Option<AccountsPackageSender>,
pending_snapshot_package: Option<PendingSnapshotPackage>,
exit: &Arc<AtomicBool>,
cluster_info: &Arc<ClusterInfo>,
trusted_validators: Option<HashSet<Pubkey>>,
@ -53,7 +54,7 @@ impl AccountsHashVerifier {
&cluster_info,
&trusted_validators,
halt_on_trusted_validators_accounts_hash_mismatch,
&accounts_package_sender,
&pending_snapshot_package,
&mut hashes,
&exit,
fault_injection_rate_slots,
@ -76,7 +77,7 @@ impl AccountsHashVerifier {
cluster_info: &ClusterInfo,
trusted_validators: &Option<HashSet<Pubkey>>,
halt_on_trusted_validator_accounts_hash_mismatch: bool,
accounts_package_sender: &Option<AccountsPackageSender>,
pending_snapshot_package: &Option<PendingSnapshotPackage>,
hashes: &mut Vec<(Slot, Hash)>,
exit: &Arc<AtomicBool>,
fault_injection_rate_slots: u64,
@ -111,8 +112,8 @@ impl AccountsHashVerifier {
}
if accounts_package.block_height % snapshot_interval_slots == 0 {
if let Some(sender) = accounts_package_sender.as_ref() {
if sender.send(accounts_package).is_err() {}
if let Some(pending_snapshot_package) = pending_snapshot_package.as_ref() {
*pending_snapshot_package.lock().unwrap() = Some(accounts_package);
}
}

View File

@ -1,23 +1,24 @@
use crate::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES};
use solana_runtime::{snapshot_package::AccountsPackageReceiver, snapshot_utils};
use solana_runtime::{snapshot_package::AccountsPackage, snapshot_utils};
use solana_sdk::{clock::Slot, hash::Hash};
use std::{
sync::{
atomic::{AtomicBool, Ordering},
mpsc::RecvTimeoutError,
Arc,
Arc, Mutex,
},
thread::{self, Builder, JoinHandle},
time::Duration,
};
pub type PendingSnapshotPackage = Arc<Mutex<Option<AccountsPackage>>>;
pub struct SnapshotPackagerService {
t_snapshot_packager: JoinHandle<()>,
}
impl SnapshotPackagerService {
pub fn new(
snapshot_package_receiver: AccountsPackageReceiver,
pending_snapshot_package: PendingSnapshotPackage,
starting_snapshot_hash: Option<(Slot, Hash)>,
exit: &Arc<AtomicBool>,
cluster_info: &Arc<ClusterInfo>,
@ -26,7 +27,7 @@ impl SnapshotPackagerService {
let cluster_info = cluster_info.clone();
let t_snapshot_packager = Builder::new()
.name("solana-snapshot-packager".to_string())
.name("snapshot-packager".to_string())
.spawn(move || {
let mut hashes = vec![];
if let Some(starting_snapshot_hash) = starting_snapshot_hash {
@ -38,32 +39,26 @@ impl SnapshotPackagerService {
break;
}
match snapshot_package_receiver.recv_timeout(Duration::from_secs(1)) {
Ok(mut snapshot_package) => {
// Only package the latest
while let Ok(new_snapshot_package) =
snapshot_package_receiver.try_recv()
{
snapshot_package = new_snapshot_package;
}
if let Err(err) =
snapshot_utils::archive_snapshot_package(&snapshot_package)
{
warn!("Failed to create snapshot archive: {}", err);
} else {
hashes.push((snapshot_package.slot, snapshot_package.hash));
while hashes.len() > MAX_SNAPSHOT_HASHES {
hashes.remove(0);
}
cluster_info.push_snapshot_hashes(hashes.clone());
let snapshot_package = pending_snapshot_package.lock().unwrap().take();
if let Some(snapshot_package) = snapshot_package {
if let Err(err) =
snapshot_utils::archive_snapshot_package(&snapshot_package)
{
warn!("Failed to create snapshot archive: {}", err);
} else {
hashes.push((snapshot_package.slot, snapshot_package.hash));
while hashes.len() > MAX_SNAPSHOT_HASHES {
hashes.remove(0);
}
cluster_info.push_snapshot_hashes(hashes.clone());
}
Err(RecvTimeoutError::Disconnected) => break,
Err(RecvTimeoutError::Timeout) => (),
} else {
std::thread::sleep(Duration::from_millis(100));
}
}
})
.unwrap();
Self {
t_snapshot_packager,
}

View File

@ -20,6 +20,7 @@ use crate::{
shred_fetch_stage::ShredFetchStage,
sigverify_shreds::ShredSigVerifier,
sigverify_stage::SigVerifyStage,
snapshot_packager_service::PendingSnapshotPackage,
};
use crossbeam_channel::unbounded;
use solana_ledger::{
@ -34,7 +35,6 @@ use solana_runtime::{
},
bank_forks::{BankForks, SnapshotConfig},
commitment::BlockCommitmentCache,
snapshot_package::AccountsPackageSender,
vote_sender_types::ReplayVoteSender,
};
use solana_sdk::{
@ -107,7 +107,7 @@ impl Tvu {
transaction_status_sender: Option<TransactionStatusSender>,
rewards_recorder_sender: Option<RewardsRecorderSender>,
cache_block_time_sender: Option<CacheBlockTimeSender>,
snapshot_config_and_package_sender: Option<(SnapshotConfig, AccountsPackageSender)>,
snapshot_config_and_pending_package: Option<(SnapshotConfig, PendingSnapshotPackage)>,
vote_tracker: Arc<VoteTracker>,
retransmit_slots_sender: RetransmitSlotsSender,
verified_vote_receiver: VerifiedVoteReceiver,
@ -179,15 +179,15 @@ impl Tvu {
}
};
info!("snapshot_interval_slots: {}", snapshot_interval_slots);
let (snapshot_config, accounts_package_sender) = snapshot_config_and_package_sender
.map(|(snapshot_config, accounts_package_sender)| {
(Some(snapshot_config), Some(accounts_package_sender))
let (snapshot_config, pending_snapshot_package) = snapshot_config_and_pending_package
.map(|(snapshot_config, pending_snapshot_package)| {
(Some(snapshot_config), Some(pending_snapshot_package))
})
.unwrap_or((None, None));
let (accounts_hash_sender, accounts_hash_receiver) = channel();
let accounts_hash_verifier = AccountsHashVerifier::new(
accounts_hash_receiver,
accounts_package_sender,
pending_snapshot_package,
exit,
&cluster_info,
tvu_config.trusted_validators.clone(),

View File

@ -26,7 +26,7 @@ use crate::{
serve_repair::ServeRepair,
serve_repair_service::ServeRepairService,
sigverify,
snapshot_packager_service::SnapshotPackagerService,
snapshot_packager_service::{PendingSnapshotPackage, SnapshotPackagerService},
tpu::Tpu,
transaction_status_service::TransactionStatusService,
tvu::{Sockets, Tvu, TvuConfig},
@ -70,7 +70,7 @@ use std::{
path::{Path, PathBuf},
sync::atomic::{AtomicBool, Ordering},
sync::mpsc::Receiver,
sync::{mpsc::channel, Arc, Mutex, RwLock},
sync::{Arc, Mutex, RwLock},
thread::sleep,
time::Duration,
};
@ -518,7 +518,7 @@ impl Validator {
&exit,
);
let (snapshot_packager_service, snapshot_config_and_package_sender) =
let (snapshot_packager_service, snapshot_config_and_pending_package) =
if let Some(snapshot_config) = config.snapshot_config.clone() {
if is_snapshot_config_invalid(
snapshot_config.snapshot_interval_slots,
@ -528,12 +528,17 @@ impl Validator {
}
// Start a snapshot packaging service
let (sender, receiver) = channel();
let snapshot_packager_service =
SnapshotPackagerService::new(receiver, snapshot_hash, &exit, &cluster_info);
let pending_snapshot_package = PendingSnapshotPackage::default();
let snapshot_packager_service = SnapshotPackagerService::new(
pending_snapshot_package.clone(),
snapshot_hash,
&exit,
&cluster_info,
);
(
Some(snapshot_packager_service),
Some((snapshot_config, sender)),
Some((snapshot_config, pending_snapshot_package)),
)
} else {
(None, None)
@ -609,7 +614,7 @@ impl Validator {
transaction_status_sender.clone(),
rewards_recorder_sender,
cache_block_time_sender,
snapshot_config_and_package_sender,
snapshot_config_and_pending_package,
vote_tracker.clone(),
retransmit_slots_sender,
verified_vote_receiver,

View File

@ -39,8 +39,9 @@ mod tests {
use fs_extra::dir::CopyOptions;
use itertools::Itertools;
use solana_core::{
cluster_info::ClusterInfo, contact_info::ContactInfo,
snapshot_packager_service::SnapshotPackagerService,
cluster_info::ClusterInfo,
contact_info::ContactInfo,
snapshot_packager_service::{PendingSnapshotPackage, SnapshotPackagerService},
};
use solana_runtime::{
accounts_background_service::{ABSRequestSender, SnapshotRequestHandler},
@ -60,8 +61,15 @@ mod tests {
system_transaction,
};
use std::{
collections::HashSet, fs, path::PathBuf, sync::atomic::AtomicBool, sync::mpsc::channel,
sync::Arc,
collections::HashSet,
fs,
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::channel,
Arc,
},
time::Duration,
};
use tempfile::TempDir;
@ -398,10 +406,37 @@ mod tests {
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default()));
let snapshot_packager_service =
SnapshotPackagerService::new(receiver, None, &exit, &cluster_info);
let pending_snapshot_package = PendingSnapshotPackage::default();
let snapshot_packager_service = SnapshotPackagerService::new(
pending_snapshot_package.clone(),
None,
&exit,
&cluster_info,
);
// Close the channel so that the package service will exit after reading all the
let _package_receiver = std::thread::Builder::new()
.name("package-receiver".to_string())
.spawn(move || {
while let Ok(mut snapshot_package) = receiver.recv() {
// Only package the latest
while let Ok(new_snapshot_package) = receiver.try_recv() {
snapshot_package = new_snapshot_package;
}
*pending_snapshot_package.lock().unwrap() = Some(snapshot_package);
}
// Wait until the package is consumed by SnapshotPackagerService
while pending_snapshot_package.lock().unwrap().is_some() {
std::thread::sleep(Duration::from_millis(100));
}
// Shutdown SnapshotPackagerService
exit.store(true, Ordering::Relaxed);
})
.unwrap();
// Close the channel so that the package receiver will exit after reading all the
// packages off the channel
drop(sender);

View File

@ -163,7 +163,7 @@ pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
) -> Result<AccountsPackage> {
// Hard link all the snapshots we need for this package
let snapshot_hard_links_dir = tempfile::Builder::new()
.prefix(TMP_SNAPSHOT_DIR_PREFIX)
.prefix(&format!("{}{}-", TMP_SNAPSHOT_DIR_PREFIX, bank.slot()))
.tempdir_in(snapshot_path)?;
// Create a snapshot package
@ -251,7 +251,10 @@ pub fn archive_snapshot_package(snapshot_package: &AccountsPackage) -> Result<()
// Create the staging directories
let staging_dir = tempfile::Builder::new()
.prefix(TMP_SNAPSHOT_DIR_PREFIX)
.prefix(&format!(
"{}{}-",
TMP_SNAPSHOT_DIR_PREFIX, snapshot_package.slot
))
.tempdir_in(tar_dir)?;
let staging_accounts_dir = staging_dir.path().join(TAR_ACCOUNTS_DIR);