Publish initial snapshot hash in gossip on validator startup (#8679)
automerge
This commit is contained in:
parent
542691c4e4
commit
cb6848aa80
|
@ -1,7 +1,6 @@
|
|||
use crate::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES};
|
||||
use solana_ledger::{
|
||||
snapshot_package::SnapshotPackageReceiver, snapshot_utils::archive_snapshot_package,
|
||||
};
|
||||
use solana_ledger::{snapshot_package::SnapshotPackageReceiver, snapshot_utils};
|
||||
use solana_sdk::{clock::Slot, hash::Hash};
|
||||
use std::{
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
|
@ -19,15 +18,24 @@ pub struct SnapshotPackagerService {
|
|||
impl SnapshotPackagerService {
|
||||
pub fn new(
|
||||
snapshot_package_receiver: SnapshotPackageReceiver,
|
||||
starting_snapshot_hash: Option<(Slot, Hash)>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||
) -> Self {
|
||||
let exit = exit.clone();
|
||||
let cluster_info = cluster_info.clone();
|
||||
|
||||
let t_snapshot_packager = Builder::new()
|
||||
.name("solana-snapshot-packager".to_string())
|
||||
.spawn(move || {
|
||||
let mut hashes = vec![];
|
||||
if let Some(starting_snapshot_hash) = starting_snapshot_hash {
|
||||
hashes.push(starting_snapshot_hash);
|
||||
}
|
||||
cluster_info
|
||||
.write()
|
||||
.unwrap()
|
||||
.push_snapshot_hashes(hashes.clone());
|
||||
loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
|
@ -41,7 +49,9 @@ impl SnapshotPackagerService {
|
|||
{
|
||||
snapshot_package = new_snapshot_package;
|
||||
}
|
||||
if let Err(err) = archive_snapshot_package(&snapshot_package) {
|
||||
if let Err(err) =
|
||||
snapshot_utils::archive_snapshot_package(&snapshot_package)
|
||||
{
|
||||
warn!("Failed to create snapshot archive: {}", err);
|
||||
} else {
|
||||
hashes.push((snapshot_package.root, snapshot_package.hash));
|
||||
|
|
|
@ -14,7 +14,6 @@ use crate::{
|
|||
shred_fetch_stage::ShredFetchStage,
|
||||
sigverify_shreds::ShredSigVerifier,
|
||||
sigverify_stage::{DisabledSigVerifier, SigVerifyStage},
|
||||
snapshot_packager_service::SnapshotPackagerService,
|
||||
storage_stage::{StorageStage, StorageState},
|
||||
};
|
||||
use crossbeam_channel::unbounded;
|
||||
|
@ -23,6 +22,7 @@ use solana_ledger::{
|
|||
bank_forks::BankForks,
|
||||
blockstore::{Blockstore, CompletedSlotsReceiver},
|
||||
blockstore_processor::TransactionStatusSender,
|
||||
snapshot_package::SnapshotPackageSender,
|
||||
};
|
||||
use solana_sdk::{
|
||||
pubkey::Pubkey,
|
||||
|
@ -47,7 +47,6 @@ pub struct Tvu {
|
|||
blockstream_service: Option<BlockstreamService>,
|
||||
ledger_cleanup_service: Option<LedgerCleanupService>,
|
||||
storage_stage: StorageStage,
|
||||
snapshot_packager_service: Option<SnapshotPackagerService>,
|
||||
}
|
||||
|
||||
pub struct Sockets {
|
||||
|
@ -88,6 +87,7 @@ impl Tvu {
|
|||
shred_version: u16,
|
||||
transaction_status_sender: Option<TransactionStatusSender>,
|
||||
rewards_recorder_sender: Option<RewardsRecorderSender>,
|
||||
snapshot_package_sender: Option<SnapshotPackageSender>,
|
||||
) -> Self {
|
||||
let keypair: Arc<Keypair> = cluster_info
|
||||
.read()
|
||||
|
@ -148,18 +148,6 @@ impl Tvu {
|
|||
|
||||
let (blockstream_slot_sender, blockstream_slot_receiver) = channel();
|
||||
let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel();
|
||||
let (snapshot_packager_service, snapshot_package_sender) = {
|
||||
let snapshot_config = { bank_forks.read().unwrap().snapshot_config().clone() };
|
||||
if snapshot_config.is_some() {
|
||||
// Start a snapshot packaging service
|
||||
let (sender, receiver) = channel();
|
||||
let snapshot_packager_service =
|
||||
SnapshotPackagerService::new(receiver, exit, &cluster_info.clone());
|
||||
(Some(snapshot_packager_service), Some(sender))
|
||||
} else {
|
||||
(None, None)
|
||||
}
|
||||
};
|
||||
|
||||
let replay_stage_config = ReplayStageConfig {
|
||||
my_pubkey: keypair.pubkey(),
|
||||
|
@ -225,7 +213,6 @@ impl Tvu {
|
|||
blockstream_service,
|
||||
ledger_cleanup_service,
|
||||
storage_stage,
|
||||
snapshot_packager_service,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -241,9 +228,6 @@ impl Tvu {
|
|||
self.ledger_cleanup_service.unwrap().join()?;
|
||||
}
|
||||
self.replay_stage.join()?;
|
||||
if let Some(s) = self.snapshot_packager_service {
|
||||
s.join()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -317,6 +301,7 @@ pub mod tests {
|
|||
0,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
tvu.join().unwrap();
|
||||
|
|
|
@ -16,6 +16,7 @@ use crate::{
|
|||
serve_repair::ServeRepair,
|
||||
serve_repair_service::ServeRepairService,
|
||||
sigverify,
|
||||
snapshot_packager_service::SnapshotPackagerService,
|
||||
storage_stage::StorageState,
|
||||
tpu::Tpu,
|
||||
transaction_status_service::TransactionStatusService,
|
||||
|
@ -50,7 +51,7 @@ use std::{
|
|||
process,
|
||||
sync::atomic::{AtomicBool, Ordering},
|
||||
sync::mpsc::Receiver,
|
||||
sync::{Arc, Mutex, RwLock},
|
||||
sync::{mpsc::channel, Arc, Mutex, RwLock},
|
||||
thread::{sleep, Result},
|
||||
time::Duration,
|
||||
};
|
||||
|
@ -127,6 +128,7 @@ pub struct Validator {
|
|||
rewards_recorder_service: Option<RewardsRecorderService>,
|
||||
gossip_service: GossipService,
|
||||
serve_repair_service: ServeRepairService,
|
||||
snapshot_packager_service: Option<SnapshotPackagerService>,
|
||||
poh_recorder: Arc<Mutex<PohRecorder>>,
|
||||
poh_service: PohService,
|
||||
tpu: Tpu,
|
||||
|
@ -350,7 +352,7 @@ impl Validator {
|
|||
.set_entrypoint(entrypoint_info.clone());
|
||||
}
|
||||
|
||||
if let Some(snapshot_hash) = snapshot_hash {
|
||||
if let Some(ref snapshot_hash) = snapshot_hash {
|
||||
if let Some(ref trusted_validators) = config.trusted_validators {
|
||||
let mut trusted = false;
|
||||
for _ in 0..10 {
|
||||
|
@ -378,6 +380,17 @@ impl Validator {
|
|||
}
|
||||
}
|
||||
|
||||
let (snapshot_packager_service, snapshot_package_sender) =
|
||||
if config.snapshot_config.is_some() {
|
||||
// Start a snapshot packaging service
|
||||
let (sender, receiver) = channel();
|
||||
let snapshot_packager_service =
|
||||
SnapshotPackagerService::new(receiver, snapshot_hash, &exit, &cluster_info);
|
||||
(Some(snapshot_packager_service), Some(sender))
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
|
||||
wait_for_supermajority(config, &bank, &cluster_info);
|
||||
|
||||
let voting_keypair = if config.voting_disabled {
|
||||
|
@ -440,6 +453,7 @@ impl Validator {
|
|||
node.info.shred_version,
|
||||
transaction_status_sender.clone(),
|
||||
rewards_recorder_sender,
|
||||
snapshot_package_sender,
|
||||
);
|
||||
|
||||
if config.dev_sigverify_disabled {
|
||||
|
@ -469,6 +483,7 @@ impl Validator {
|
|||
rpc_service,
|
||||
transaction_status_service,
|
||||
rewards_recorder_service,
|
||||
snapshot_packager_service,
|
||||
tpu,
|
||||
tvu,
|
||||
poh_service,
|
||||
|
@ -530,6 +545,10 @@ impl Validator {
|
|||
rewards_recorder_service.join()?;
|
||||
}
|
||||
|
||||
if let Some(s) = self.snapshot_packager_service {
|
||||
s.join()?;
|
||||
}
|
||||
|
||||
self.gossip_service.join()?;
|
||||
self.serve_repair_service.join()?;
|
||||
self.tpu.join()?;
|
||||
|
|
|
@ -317,7 +317,7 @@ mod tests {
|
|||
)));
|
||||
|
||||
let snapshot_packager_service =
|
||||
SnapshotPackagerService::new(receiver, &exit, &cluster_info);
|
||||
SnapshotPackagerService::new(receiver, None, &exit, &cluster_info);
|
||||
|
||||
// Close the channel so that the package service will exit after reading all the
|
||||
// packages off the channel
|
||||
|
|
Loading…
Reference in New Issue