Extract most storage-related services from the Tvu abstraction

This commit is contained in:
Michael Vines 2022-03-15 13:14:49 -07:00
parent 268a2109de
commit 0e2e0c8b7d
4 changed files with 135 additions and 144 deletions

View File

@ -3,7 +3,6 @@
use {
crate::{
accounts_hash_verifier::AccountsHashVerifier,
broadcast_stage::RetransmitSlotsSender,
cache_block_meta_service::CacheBlockMetaSender,
cluster_info_vote_listener::{
@ -39,16 +38,10 @@ use {
rpc_subscriptions::RpcSubscriptions,
},
solana_runtime::{
accounts_background_service::{
AbsRequestHandler, AbsRequestSender, AccountsBackgroundService, DroppedSlotsReceiver,
SnapshotRequestHandler,
},
accounts_db::AccountShrinkThreshold,
accounts_background_service::AbsRequestSender,
bank_forks::BankForks,
commitment::BlockCommitmentCache,
cost_model::CostModel,
snapshot_config::SnapshotConfig,
snapshot_package::{PendingAccountsPackage, PendingSnapshotPackage},
transaction_cost_metrics_sender::{
TransactionCostMetricsSender, TransactionCostMetricsService,
},
@ -73,8 +66,6 @@ pub struct Tvu {
retransmit_stage: RetransmitStage,
replay_stage: ReplayStage,
ledger_cleanup_service: Option<LedgerCleanupService>,
accounts_background_service: AccountsBackgroundService,
accounts_hash_verifier: AccountsHashVerifier,
cost_update_service: CostUpdateService,
voting_service: VotingService,
warm_quic_cache_service: WarmQuicCacheService,
@ -94,16 +85,10 @@ pub struct TvuSockets {
pub struct TvuConfig {
pub max_ledger_shreds: Option<u64>,
pub shred_version: u16,
pub halt_on_known_validators_accounts_hash_mismatch: bool,
pub known_validators: Option<HashSet<Pubkey>>,
pub repair_validators: Option<HashSet<Pubkey>>,
pub accounts_hash_fault_injection_slots: u64,
pub accounts_db_caching_enabled: bool,
pub test_hash_calculation: bool,
pub rocksdb_compaction_interval: Option<u64>,
pub rocksdb_max_compaction_jitter: Option<u64>,
pub wait_for_vote_to_start_leader: bool,
pub accounts_shrink_ratio: AccountShrinkThreshold,
}
impl Tvu {
@ -133,7 +118,6 @@ impl Tvu {
transaction_status_sender: Option<TransactionStatusSender>,
rewards_recorder_sender: Option<RewardsRecorderSender>,
cache_block_meta_sender: Option<CacheBlockMetaSender>,
snapshot_config_and_pending_package: Option<(SnapshotConfig, PendingSnapshotPackage)>,
vote_tracker: Arc<VoteTracker>,
retransmit_slots_sender: RetransmitSlotsSender,
gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver,
@ -145,11 +129,9 @@ impl Tvu {
tvu_config: TvuConfig,
max_slots: &Arc<MaxSlots>,
cost_model: &Arc<RwLock<CostModel>>,
pending_accounts_package: PendingAccountsPackage,
last_full_snapshot_slot: Option<Slot>,
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
wait_to_vote_slot: Option<Slot>,
pruned_banks_receiver: DroppedSlotsReceiver,
accounts_background_request_sender: AbsRequestSender,
) -> Self {
let TvuSockets {
repair: repair_socket,
@ -184,8 +166,6 @@ impl Tvu {
let cluster_slots = Arc::new(ClusterSlots::default());
let (duplicate_slots_reset_sender, duplicate_slots_reset_receiver) = unbounded();
let compaction_interval = tvu_config.rocksdb_compaction_interval;
let max_compaction_jitter = tvu_config.rocksdb_max_compaction_jitter;
let (duplicate_slots_sender, duplicate_slots_receiver) = unbounded();
let (cluster_slots_update_sender, cluster_slots_update_receiver) = unbounded();
let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) =
@ -216,45 +196,6 @@ impl Tvu {
);
let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = unbounded();
let (snapshot_config, pending_snapshot_package) = snapshot_config_and_pending_package
.map(|(snapshot_config, pending_snapshot_package)| {
(Some(snapshot_config), Some(pending_snapshot_package))
})
.unwrap_or((None, None));
let accounts_hash_verifier = AccountsHashVerifier::new(
Arc::clone(&pending_accounts_package),
pending_snapshot_package,
exit,
cluster_info,
tvu_config.known_validators.clone(),
tvu_config.halt_on_known_validators_accounts_hash_mismatch,
tvu_config.accounts_hash_fault_injection_slots,
snapshot_config.clone(),
);
let (snapshot_request_sender, snapshot_request_handler) = match snapshot_config {
None => (None, None),
Some(snapshot_config) => {
let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
(
Some(snapshot_request_sender),
Some(SnapshotRequestHandler {
snapshot_config,
snapshot_request_receiver,
pending_accounts_package,
}),
)
}
};
let accounts_background_request_sender = AbsRequestSender::new(snapshot_request_sender);
let accounts_background_request_handler = AbsRequestHandler {
snapshot_request_handler,
pruned_banks_receiver,
};
let replay_stage_config = ReplayStageConfig {
vote_account: *vote_account,
authorized_voter_keypairs,
@ -332,33 +273,17 @@ impl Tvu {
blockstore.clone(),
max_ledger_shreds,
exit,
compaction_interval,
max_compaction_jitter,
tvu_config.rocksdb_compaction_interval,
tvu_config.rocksdb_max_compaction_jitter,
)
});
let accounts_background_request_handler = AbsRequestHandler {
snapshot_request_handler,
pruned_banks_receiver,
};
let accounts_background_service = AccountsBackgroundService::new(
bank_forks.clone(),
exit,
accounts_background_request_handler,
tvu_config.accounts_db_caching_enabled,
tvu_config.test_hash_calculation,
last_full_snapshot_slot,
);
Tvu {
fetch_stage,
sigverify_stage,
retransmit_stage,
replay_stage,
ledger_cleanup_service,
accounts_background_service,
accounts_hash_verifier,
cost_update_service,
voting_service,
warm_quic_cache_service,
@ -390,9 +315,7 @@ impl Tvu {
if self.ledger_cleanup_service.is_some() {
self.ledger_cleanup_service.unwrap().join()?;
}
self.accounts_background_service.join()?;
self.replay_stage.join()?;
self.accounts_hash_verifier.join()?;
self.cost_update_service.join()?;
self.voting_service.join()?;
self.warm_quic_cache_service.join()?;
@ -468,7 +391,6 @@ pub mod tests {
let bank_forks = Arc::new(RwLock::new(bank_forks));
let tower = Tower::default();
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let (_pruned_banks_sender, pruned_banks_receiver) = unbounded();
let tvu = Tvu::new(
&vote_keypair.pubkey(),
Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])),
@ -502,7 +424,6 @@ pub mod tests {
None,
None,
None,
None,
Arc::<VoteTracker>::default(),
retransmit_slots_sender,
gossip_verified_vote_hash_receiver,
@ -514,11 +435,9 @@ pub mod tests {
TvuConfig::default(),
&Arc::new(MaxSlots::default()),
&Arc::new(RwLock::new(CostModel::default())),
PendingAccountsPackage::default(),
None,
None,
None,
pruned_banks_receiver,
AbsRequestSender::default(),
);
exit.store(true, Ordering::Relaxed);
tvu.join().unwrap();

View File

@ -3,6 +3,7 @@
pub use solana_perf::report_target_features;
use {
crate::{
accounts_hash_verifier::AccountsHashVerifier,
broadcast_stage::BroadcastStageType,
cache_block_meta_service::{CacheBlockMetaSender, CacheBlockMetaService},
cluster_info_vote_listener::VoteTracker,
@ -65,7 +66,10 @@ use {
transaction_status_service::TransactionStatusService,
},
solana_runtime::{
accounts_background_service::DroppedSlotsReceiver,
accounts_background_service::{
AbsRequestHandler, AbsRequestSender, AccountsBackgroundService, DroppedSlotsReceiver,
SnapshotRequestHandler,
},
accounts_db::{AccountShrinkThreshold, AccountsDbConfig},
accounts_index::AccountSecondaryIndexes,
accounts_update_notifier_interface::AccountsUpdateNotifier,
@ -336,6 +340,8 @@ pub struct Validator {
pub blockstore: Arc<Blockstore>,
geyser_plugin_service: Option<GeyserPluginService>,
ledger_metric_report_service: LedgerMetricReportService,
accounts_background_service: AccountsBackgroundService,
accounts_hash_verifier: AccountsHashVerifier,
}
// in the distant future, get rid of ::new()/exit() and use Result properly...
@ -524,7 +530,7 @@ impl Validator {
config.snapshot_config.as_ref(),
Arc::clone(&pending_accounts_package),
blockstore_root_scan,
pruned_banks_receiver.clone(),
pruned_banks_receiver,
);
let last_full_snapshot_slot =
last_full_snapshot_slot.or_else(|| starting_snapshot_hashes.map(|x| x.full.hash.0));
@ -596,6 +602,108 @@ impl Validator {
cluster_info.set_entrypoints(cluster_entrypoints);
cluster_info.restore_contact_info(ledger_path, config.contact_save_interval);
let cluster_info = Arc::new(cluster_info);
// Before replay starts, set the callbacks in each of the banks in BankForks
// Note after this callback is created, only the AccountsBackgroundService should be calling
// AccountsDb::purge_slot() to clean up dropped banks.
let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
let callback = bank_forks
.read()
.unwrap()
.root_bank()
.rc
.accounts
.accounts_db
.create_drop_bank_callback(pruned_banks_sender);
for bank in bank_forks.read().unwrap().banks().values() {
bank.set_callback(Some(Box::new(callback.clone())));
}
let (
accounts_background_service,
accounts_hash_verifier,
snapshot_packager_service,
accounts_background_request_sender,
) = {
let (
accounts_background_request_sender,
snapshot_request_handler,
pending_snapshot_package,
snapshot_packager_service,
) = if let Some(snapshot_config) = config.snapshot_config.clone() {
if !is_snapshot_config_valid(
snapshot_config.full_snapshot_archive_interval_slots,
snapshot_config.incremental_snapshot_archive_interval_slots,
config.accounts_hash_interval_slots,
) {
error!("Snapshot config is invalid");
}
let pending_snapshot_package = PendingSnapshotPackage::default();
// filler accounts make snapshots invalid for use
// so, do not publish that we have snapshots
let enable_gossip_push = config
.accounts_db_config
.as_ref()
.map(|config| config.filler_accounts_config.count == 0)
.unwrap_or(true);
let snapshot_packager_service = SnapshotPackagerService::new(
pending_snapshot_package.clone(),
starting_snapshot_hashes,
&exit,
&cluster_info,
snapshot_config.clone(),
enable_gossip_push,
);
let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
(
AbsRequestSender::new(snapshot_request_sender),
Some(SnapshotRequestHandler {
snapshot_config,
snapshot_request_receiver,
pending_accounts_package: pending_accounts_package.clone(),
}),
Some(pending_snapshot_package),
Some(snapshot_packager_service),
)
} else {
(AbsRequestSender::default(), None, None, None)
};
let accounts_hash_verifier = AccountsHashVerifier::new(
Arc::clone(&pending_accounts_package),
pending_snapshot_package,
&exit,
&cluster_info,
config.known_validators.clone(),
config.halt_on_known_validators_accounts_hash_mismatch,
config.accounts_hash_fault_injection_slots,
config.snapshot_config.clone(),
);
let accounts_background_service = AccountsBackgroundService::new(
bank_forks.clone(),
&exit,
AbsRequestHandler {
snapshot_request_handler,
pruned_banks_receiver,
},
config.accounts_db_caching_enabled,
config.accounts_db_test_hash_calculation,
last_full_snapshot_slot,
);
(
accounts_background_service,
accounts_hash_verifier,
snapshot_packager_service,
accounts_background_request_sender,
)
};
let mut block_commitment_cache = BlockCommitmentCache::default();
block_commitment_cache.initialize_slots(bank.slot(), bank_forks.read().unwrap().root());
let block_commitment_cache = Arc::new(RwLock::new(block_commitment_cache));
@ -776,43 +884,6 @@ impl Validator {
&exit,
);
let (snapshot_packager_service, snapshot_config_and_pending_package) =
if let Some(snapshot_config) = config.snapshot_config.clone() {
if !is_snapshot_config_valid(
snapshot_config.full_snapshot_archive_interval_slots,
snapshot_config.incremental_snapshot_archive_interval_slots,
config.accounts_hash_interval_slots,
) {
error!("Snapshot config is invalid");
}
// Start a snapshot packaging service
let pending_snapshot_package = PendingSnapshotPackage::default();
// filler accounts make snapshots invalid for use
// so, do not publish that we have snapshots
let enable_gossip_push = config
.accounts_db_config
.as_ref()
.map(|config| config.filler_accounts_config.count == 0)
.unwrap_or(true);
let snapshot_packager_service = SnapshotPackagerService::new(
pending_snapshot_package.clone(),
starting_snapshot_hashes,
&exit,
&cluster_info,
snapshot_config.clone(),
enable_gossip_push,
);
(
Some(snapshot_packager_service),
Some((snapshot_config, pending_snapshot_package)),
)
} else {
(None, None)
};
let waited_for_supermajority = if let Ok(waited) = wait_for_supermajority(
config,
&bank,
@ -889,7 +960,6 @@ impl Validator {
transaction_status_sender.clone(),
rewards_recorder_sender,
cache_block_meta_sender,
snapshot_config_and_pending_package,
vote_tracker.clone(),
retransmit_slots_sender,
gossip_verified_vote_hash_receiver,
@ -900,26 +970,17 @@ impl Validator {
cluster_confirmed_slot_receiver,
TvuConfig {
max_ledger_shreds: config.max_ledger_shreds,
halt_on_known_validators_accounts_hash_mismatch: config
.halt_on_known_validators_accounts_hash_mismatch,
shred_version: node.info.shred_version,
known_validators: config.known_validators.clone(),
repair_validators: config.repair_validators.clone(),
accounts_hash_fault_injection_slots: config.accounts_hash_fault_injection_slots,
accounts_db_caching_enabled: config.accounts_db_caching_enabled,
test_hash_calculation: config.accounts_db_test_hash_calculation,
rocksdb_compaction_interval: config.rocksdb_compaction_interval,
rocksdb_max_compaction_jitter: config.rocksdb_compaction_interval,
wait_for_vote_to_start_leader,
accounts_shrink_ratio: config.accounts_shrink_ratio,
},
&max_slots,
&cost_model,
pending_accounts_package,
last_full_snapshot_slot,
block_metadata_notifier,
config.wait_to_vote_slot,
pruned_banks_receiver,
accounts_background_request_sender,
);
let tpu = Tpu::new(
@ -983,6 +1044,8 @@ impl Validator {
blockstore: blockstore.clone(),
geyser_plugin_service,
ledger_metric_report_service,
accounts_background_service,
accounts_hash_verifier,
}
}
@ -1092,6 +1155,15 @@ impl Validator {
self.stats_reporter_service
.join()
.expect("stats_reporter_service");
self.ledger_metric_report_service
.join()
.expect("ledger_metric_report_service");
self.accounts_background_service
.join()
.expect("accounts_background_service");
self.accounts_hash_verifier
.join()
.expect("accounts_hash_verifier");
self.tpu.join().expect("tpu");
self.tvu.join().expect("tvu");
self.completed_data_sets_service

View File

@ -241,8 +241,8 @@ mod tests {
let bank_forks = &mut snapshot_test_config.bank_forks;
let mint_keypair = &snapshot_test_config.genesis_config_info.mint_keypair;
let (s, snapshot_request_receiver) = unbounded();
let request_sender = AbsRequestSender::new(Some(s));
let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
let request_sender = AbsRequestSender::new(snapshot_request_sender);
let snapshot_request_handler = SnapshotRequestHandler {
snapshot_config: snapshot_test_config.snapshot_config.clone(),
snapshot_request_receiver,
@ -585,7 +585,7 @@ mod tests {
Slot::MAX,
);
let mut current_bank = snapshot_test_config.bank_forks[0].clone();
let request_sender = AbsRequestSender::new(Some(snapshot_sender));
let request_sender = AbsRequestSender::new(snapshot_sender);
for _ in 0..num_set_roots {
for _ in 0..*add_root_interval {
let new_slot = current_bank.slot() + 1;
@ -680,7 +680,7 @@ mod tests {
let mint_keypair = &snapshot_test_config.genesis_config_info.mint_keypair;
let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
let request_sender = AbsRequestSender::new(Some(snapshot_request_sender));
let request_sender = AbsRequestSender::new(snapshot_request_sender);
let snapshot_request_handler = SnapshotRequestHandler {
snapshot_config: snapshot_test_config.snapshot_config.clone(),
snapshot_request_receiver,
@ -918,7 +918,7 @@ mod tests {
bank.set_callback(Some(Box::new(callback.clone())));
}
let abs_request_sender = AbsRequestSender::new(Some(snapshot_request_sender));
let abs_request_sender = AbsRequestSender::new(snapshot_request_sender);
let snapshot_request_handler = Some(SnapshotRequestHandler {
snapshot_config: snapshot_test_config.snapshot_config.clone(),
snapshot_request_receiver,

View File

@ -368,9 +368,9 @@ pub struct AbsRequestSender {
}
impl AbsRequestSender {
pub fn new(snapshot_request_sender: Option<SnapshotRequestSender>) -> Self {
AbsRequestSender {
snapshot_request_sender,
pub fn new(snapshot_request_sender: SnapshotRequestSender) -> Self {
Self {
snapshot_request_sender: Some(snapshot_request_sender),
}
}