Expose tss to the other blockstore_processor path (#11070)
This commit is contained in:
parent
21beade4b9
commit
9a80e31bae
|
@ -8,7 +8,7 @@ use crate::{
|
||||||
gossip_service::{discover_cluster, GossipService},
|
gossip_service::{discover_cluster, GossipService},
|
||||||
poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
|
poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
|
||||||
poh_service::PohService,
|
poh_service::PohService,
|
||||||
rewards_recorder_service::RewardsRecorderService,
|
rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService},
|
||||||
rpc::JsonRpcConfig,
|
rpc::JsonRpcConfig,
|
||||||
rpc_pubsub_service::PubSubService,
|
rpc_pubsub_service::PubSubService,
|
||||||
rpc_service::JsonRpcService,
|
rpc_service::JsonRpcService,
|
||||||
|
@ -27,7 +27,8 @@ use solana_ledger::{
|
||||||
bank_forks_utils,
|
bank_forks_utils,
|
||||||
blockstore::{Blockstore, CompletedSlotsReceiver, PurgeType},
|
blockstore::{Blockstore, CompletedSlotsReceiver, PurgeType},
|
||||||
blockstore_db::BlockstoreRecoveryMode,
|
blockstore_db::BlockstoreRecoveryMode,
|
||||||
blockstore_processor, create_new_tmp_ledger,
|
blockstore_processor::{self, TransactionStatusSender},
|
||||||
|
create_new_tmp_ledger,
|
||||||
leader_schedule::FixedSchedule,
|
leader_schedule::FixedSchedule,
|
||||||
leader_schedule_cache::LeaderScheduleCache,
|
leader_schedule_cache::LeaderScheduleCache,
|
||||||
};
|
};
|
||||||
|
@ -135,6 +136,14 @@ impl ValidatorExit {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct TransactionHistoryServices {
|
||||||
|
transaction_status_sender: Option<TransactionStatusSender>,
|
||||||
|
transaction_status_service: Option<TransactionStatusService>,
|
||||||
|
rewards_recorder_sender: Option<RewardsRecorderSender>,
|
||||||
|
rewards_recorder_service: Option<RewardsRecorderService>,
|
||||||
|
}
|
||||||
|
|
||||||
pub struct Validator {
|
pub struct Validator {
|
||||||
pub id: Pubkey,
|
pub id: Pubkey,
|
||||||
validator_exit: Arc<RwLock<Option<ValidatorExit>>>,
|
validator_exit: Arc<RwLock<Option<ValidatorExit>>>,
|
||||||
|
@ -203,6 +212,12 @@ impl Validator {
|
||||||
cleanup_accounts_path(accounts_path);
|
cleanup_accounts_path(accounts_path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut validator_exit = ValidatorExit::default();
|
||||||
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
|
let exit_ = exit.clone();
|
||||||
|
validator_exit.register_exit(Box::new(move || exit_.store(true, Ordering::Relaxed)));
|
||||||
|
let validator_exit = Arc::new(RwLock::new(Some(validator_exit)));
|
||||||
|
|
||||||
let (
|
let (
|
||||||
genesis_config,
|
genesis_config,
|
||||||
bank_forks,
|
bank_forks,
|
||||||
|
@ -211,7 +226,13 @@ impl Validator {
|
||||||
completed_slots_receiver,
|
completed_slots_receiver,
|
||||||
leader_schedule_cache,
|
leader_schedule_cache,
|
||||||
snapshot_hash,
|
snapshot_hash,
|
||||||
) = new_banks_from_ledger(config, ledger_path, poh_verify);
|
TransactionHistoryServices {
|
||||||
|
transaction_status_sender,
|
||||||
|
transaction_status_service,
|
||||||
|
rewards_recorder_sender,
|
||||||
|
rewards_recorder_service,
|
||||||
|
},
|
||||||
|
) = new_banks_from_ledger(config, ledger_path, poh_verify, &exit);
|
||||||
|
|
||||||
let leader_schedule_cache = Arc::new(leader_schedule_cache);
|
let leader_schedule_cache = Arc::new(leader_schedule_cache);
|
||||||
let bank = bank_forks.working_bank();
|
let bank = bank_forks.working_bank();
|
||||||
|
@ -243,14 +264,7 @@ impl Validator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut validator_exit = ValidatorExit::default();
|
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
|
||||||
let exit_ = exit.clone();
|
|
||||||
validator_exit.register_exit(Box::new(move || exit_.store(true, Ordering::Relaxed)));
|
|
||||||
let validator_exit = Arc::new(RwLock::new(Some(validator_exit)));
|
|
||||||
|
|
||||||
let cluster_info = Arc::new(ClusterInfo::new(node.info.clone(), keypair.clone()));
|
let cluster_info = Arc::new(ClusterInfo::new(node.info.clone(), keypair.clone()));
|
||||||
let blockstore = Arc::new(blockstore);
|
|
||||||
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
|
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
|
||||||
|
|
||||||
let subscriptions = Arc::new(RpcSubscriptions::new(
|
let subscriptions = Arc::new(RpcSubscriptions::new(
|
||||||
|
@ -291,36 +305,6 @@ impl Validator {
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
let (transaction_status_sender, transaction_status_service) =
|
|
||||||
if rpc_service.is_some() && config.rpc_config.enable_rpc_transaction_history {
|
|
||||||
let (transaction_status_sender, transaction_status_receiver) = unbounded();
|
|
||||||
(
|
|
||||||
Some(transaction_status_sender),
|
|
||||||
Some(TransactionStatusService::new(
|
|
||||||
transaction_status_receiver,
|
|
||||||
blockstore.clone(),
|
|
||||||
&exit,
|
|
||||||
)),
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
(None, None)
|
|
||||||
};
|
|
||||||
|
|
||||||
let (rewards_recorder_sender, rewards_recorder_service) =
|
|
||||||
if rpc_service.is_some() && config.rpc_config.enable_rpc_transaction_history {
|
|
||||||
let (rewards_recorder_sender, rewards_receiver) = unbounded();
|
|
||||||
(
|
|
||||||
Some(rewards_recorder_sender),
|
|
||||||
Some(RewardsRecorderService::new(
|
|
||||||
rewards_receiver,
|
|
||||||
blockstore.clone(),
|
|
||||||
&exit,
|
|
||||||
)),
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
(None, None)
|
|
||||||
};
|
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"Starting PoH: epoch={} slot={} tick_height={} blockhash={} leader={:?}",
|
"Starting PoH: epoch={} slot={} tick_height={} blockhash={} leader={:?}",
|
||||||
bank.epoch(),
|
bank.epoch(),
|
||||||
|
@ -579,14 +563,16 @@ fn new_banks_from_ledger(
|
||||||
config: &ValidatorConfig,
|
config: &ValidatorConfig,
|
||||||
ledger_path: &Path,
|
ledger_path: &Path,
|
||||||
poh_verify: bool,
|
poh_verify: bool,
|
||||||
|
exit: &Arc<AtomicBool>,
|
||||||
) -> (
|
) -> (
|
||||||
GenesisConfig,
|
GenesisConfig,
|
||||||
BankForks,
|
BankForks,
|
||||||
Blockstore,
|
Arc<Blockstore>,
|
||||||
Receiver<bool>,
|
Receiver<bool>,
|
||||||
CompletedSlotsReceiver,
|
CompletedSlotsReceiver,
|
||||||
LeaderScheduleCache,
|
LeaderScheduleCache,
|
||||||
Option<(Slot, Hash)>,
|
Option<(Slot, Hash)>,
|
||||||
|
TransactionHistoryServices,
|
||||||
) {
|
) {
|
||||||
info!("loading ledger from {:?}...", ledger_path);
|
info!("loading ledger from {:?}...", ledger_path);
|
||||||
let genesis_config = open_genesis_config(ledger_path, config.max_genesis_archive_unpacked_size);
|
let genesis_config = open_genesis_config(ledger_path, config.max_genesis_archive_unpacked_size);
|
||||||
|
@ -622,12 +608,23 @@ fn new_banks_from_ledger(
|
||||||
..blockstore_processor::ProcessOptions::default()
|
..blockstore_processor::ProcessOptions::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let blockstore = Arc::new(blockstore);
|
||||||
|
let transaction_history_services =
|
||||||
|
if config.rpc_ports.is_some() && config.rpc_config.enable_rpc_transaction_history {
|
||||||
|
initialize_rpc_transaction_history_services(blockstore.clone(), exit)
|
||||||
|
} else {
|
||||||
|
TransactionHistoryServices::default()
|
||||||
|
};
|
||||||
|
|
||||||
let (mut bank_forks, mut leader_schedule_cache, snapshot_hash) = bank_forks_utils::load(
|
let (mut bank_forks, mut leader_schedule_cache, snapshot_hash) = bank_forks_utils::load(
|
||||||
&genesis_config,
|
&genesis_config,
|
||||||
&blockstore,
|
&blockstore,
|
||||||
config.account_paths.clone(),
|
config.account_paths.clone(),
|
||||||
config.snapshot_config.as_ref(),
|
config.snapshot_config.as_ref(),
|
||||||
process_options,
|
process_options,
|
||||||
|
transaction_history_services
|
||||||
|
.transaction_status_sender
|
||||||
|
.clone(),
|
||||||
)
|
)
|
||||||
.unwrap_or_else(|err| {
|
.unwrap_or_else(|err| {
|
||||||
error!("Failed to load ledger: {:?}", err);
|
error!("Failed to load ledger: {:?}", err);
|
||||||
|
@ -647,6 +644,7 @@ fn new_banks_from_ledger(
|
||||||
completed_slots_receiver,
|
completed_slots_receiver,
|
||||||
leader_schedule_cache,
|
leader_schedule_cache,
|
||||||
snapshot_hash,
|
snapshot_hash,
|
||||||
|
transaction_history_services,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -710,6 +708,33 @@ fn backup_and_clear_blockstore(ledger_path: &Path, start_slot: Slot, shred_versi
|
||||||
drop(blockstore);
|
drop(blockstore);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn initialize_rpc_transaction_history_services(
|
||||||
|
blockstore: Arc<Blockstore>,
|
||||||
|
exit: &Arc<AtomicBool>,
|
||||||
|
) -> TransactionHistoryServices {
|
||||||
|
let (transaction_status_sender, transaction_status_receiver) = unbounded();
|
||||||
|
let transaction_status_sender = Some(transaction_status_sender);
|
||||||
|
let transaction_status_service = Some(TransactionStatusService::new(
|
||||||
|
transaction_status_receiver,
|
||||||
|
blockstore.clone(),
|
||||||
|
exit,
|
||||||
|
));
|
||||||
|
|
||||||
|
let (rewards_recorder_sender, rewards_receiver) = unbounded();
|
||||||
|
let rewards_recorder_sender = Some(rewards_recorder_sender);
|
||||||
|
let rewards_recorder_service = Some(RewardsRecorderService::new(
|
||||||
|
rewards_receiver,
|
||||||
|
blockstore,
|
||||||
|
exit,
|
||||||
|
));
|
||||||
|
TransactionHistoryServices {
|
||||||
|
transaction_status_sender,
|
||||||
|
transaction_status_service,
|
||||||
|
rewards_recorder_sender,
|
||||||
|
rewards_recorder_service,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Return true on error, indicating the validator should exit.
|
// Return true on error, indicating the validator should exit.
|
||||||
fn wait_for_supermajority(
|
fn wait_for_supermajority(
|
||||||
config: &ValidatorConfig,
|
config: &ValidatorConfig,
|
||||||
|
|
|
@ -680,6 +680,7 @@ fn load_bank_forks(
|
||||||
account_paths,
|
account_paths,
|
||||||
snapshot_config.as_ref(),
|
snapshot_config.as_ref(),
|
||||||
process_options,
|
process_options,
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@ use crate::{
|
||||||
blockstore::Blockstore,
|
blockstore::Blockstore,
|
||||||
blockstore_processor::{
|
blockstore_processor::{
|
||||||
self, BlockstoreProcessorError, BlockstoreProcessorResult, ProcessOptions,
|
self, BlockstoreProcessorError, BlockstoreProcessorResult, ProcessOptions,
|
||||||
|
TransactionStatusSender,
|
||||||
},
|
},
|
||||||
entry::VerifyRecyclers,
|
entry::VerifyRecyclers,
|
||||||
leader_schedule_cache::LeaderScheduleCache,
|
leader_schedule_cache::LeaderScheduleCache,
|
||||||
|
@ -34,6 +35,7 @@ pub fn load(
|
||||||
account_paths: Vec<PathBuf>,
|
account_paths: Vec<PathBuf>,
|
||||||
snapshot_config: Option<&SnapshotConfig>,
|
snapshot_config: Option<&SnapshotConfig>,
|
||||||
process_options: ProcessOptions,
|
process_options: ProcessOptions,
|
||||||
|
transaction_status_sender: Option<TransactionStatusSender>,
|
||||||
) -> LoadResult {
|
) -> LoadResult {
|
||||||
if let Some(snapshot_config) = snapshot_config.as_ref() {
|
if let Some(snapshot_config) = snapshot_config.as_ref() {
|
||||||
info!(
|
info!(
|
||||||
|
@ -86,6 +88,7 @@ pub fn load(
|
||||||
Arc::new(deserialized_bank),
|
Arc::new(deserialized_bank),
|
||||||
&process_options,
|
&process_options,
|
||||||
&VerifyRecyclers::default(),
|
&VerifyRecyclers::default(),
|
||||||
|
transaction_status_sender,
|
||||||
),
|
),
|
||||||
Some(deserialized_snapshot_hash),
|
Some(deserialized_snapshot_hash),
|
||||||
);
|
);
|
||||||
|
|
|
@ -294,7 +294,7 @@ pub fn process_blockstore(
|
||||||
info!("processing ledger for slot 0...");
|
info!("processing ledger for slot 0...");
|
||||||
let recyclers = VerifyRecyclers::default();
|
let recyclers = VerifyRecyclers::default();
|
||||||
process_bank_0(&bank0, blockstore, &opts, &recyclers)?;
|
process_bank_0(&bank0, blockstore, &opts, &recyclers)?;
|
||||||
process_blockstore_from_root(genesis_config, blockstore, bank0, &opts, &recyclers)
|
process_blockstore_from_root(genesis_config, blockstore, bank0, &opts, &recyclers, None)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process blockstore from a known root bank
|
// Process blockstore from a known root bank
|
||||||
|
@ -304,6 +304,7 @@ pub fn process_blockstore_from_root(
|
||||||
bank: Arc<Bank>,
|
bank: Arc<Bank>,
|
||||||
opts: &ProcessOptions,
|
opts: &ProcessOptions,
|
||||||
recyclers: &VerifyRecyclers,
|
recyclers: &VerifyRecyclers,
|
||||||
|
transaction_status_sender: Option<TransactionStatusSender>,
|
||||||
) -> BlockstoreProcessorResult {
|
) -> BlockstoreProcessorResult {
|
||||||
info!("processing ledger from slot {}...", bank.slot());
|
info!("processing ledger from slot {}...", bank.slot());
|
||||||
let allocated = thread_mem_usage::Allocatedp::default();
|
let allocated = thread_mem_usage::Allocatedp::default();
|
||||||
|
@ -368,6 +369,7 @@ pub fn process_blockstore_from_root(
|
||||||
&mut root,
|
&mut root,
|
||||||
opts,
|
opts,
|
||||||
recyclers,
|
recyclers,
|
||||||
|
transaction_status_sender,
|
||||||
)?;
|
)?;
|
||||||
(initial_forks, leader_schedule_cache)
|
(initial_forks, leader_schedule_cache)
|
||||||
} else {
|
} else {
|
||||||
|
@ -456,6 +458,7 @@ fn confirm_full_slot(
|
||||||
opts: &ProcessOptions,
|
opts: &ProcessOptions,
|
||||||
recyclers: &VerifyRecyclers,
|
recyclers: &VerifyRecyclers,
|
||||||
progress: &mut ConfirmationProgress,
|
progress: &mut ConfirmationProgress,
|
||||||
|
transaction_status_sender: Option<TransactionStatusSender>,
|
||||||
) -> result::Result<(), BlockstoreProcessorError> {
|
) -> result::Result<(), BlockstoreProcessorError> {
|
||||||
let mut timing = ConfirmationTiming::default();
|
let mut timing = ConfirmationTiming::default();
|
||||||
let skip_verification = !opts.poh_verify;
|
let skip_verification = !opts.poh_verify;
|
||||||
|
@ -465,7 +468,7 @@ fn confirm_full_slot(
|
||||||
&mut timing,
|
&mut timing,
|
||||||
progress,
|
progress,
|
||||||
skip_verification,
|
skip_verification,
|
||||||
None,
|
transaction_status_sender,
|
||||||
opts.entry_callback.as_ref(),
|
opts.entry_callback.as_ref(),
|
||||||
recyclers,
|
recyclers,
|
||||||
)?;
|
)?;
|
||||||
|
@ -629,7 +632,7 @@ fn process_bank_0(
|
||||||
) -> result::Result<(), BlockstoreProcessorError> {
|
) -> result::Result<(), BlockstoreProcessorError> {
|
||||||
assert_eq!(bank0.slot(), 0);
|
assert_eq!(bank0.slot(), 0);
|
||||||
let mut progress = ConfirmationProgress::new(bank0.last_blockhash());
|
let mut progress = ConfirmationProgress::new(bank0.last_blockhash());
|
||||||
confirm_full_slot(blockstore, bank0, opts, recyclers, &mut progress)
|
confirm_full_slot(blockstore, bank0, opts, recyclers, &mut progress, None)
|
||||||
.expect("processing for bank 0 must succeed");
|
.expect("processing for bank 0 must succeed");
|
||||||
bank0.freeze();
|
bank0.freeze();
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -702,6 +705,7 @@ fn load_frozen_forks(
|
||||||
root: &mut Slot,
|
root: &mut Slot,
|
||||||
opts: &ProcessOptions,
|
opts: &ProcessOptions,
|
||||||
recyclers: &VerifyRecyclers,
|
recyclers: &VerifyRecyclers,
|
||||||
|
transaction_status_sender: Option<TransactionStatusSender>,
|
||||||
) -> result::Result<Vec<Arc<Bank>>, BlockstoreProcessorError> {
|
) -> result::Result<Vec<Arc<Bank>>, BlockstoreProcessorError> {
|
||||||
let mut initial_forks = HashMap::new();
|
let mut initial_forks = HashMap::new();
|
||||||
let mut last_status_report = Instant::now();
|
let mut last_status_report = Instant::now();
|
||||||
|
@ -741,7 +745,16 @@ fn load_frozen_forks(
|
||||||
let initial_allocation = allocated.get();
|
let initial_allocation = allocated.get();
|
||||||
|
|
||||||
let mut progress = ConfirmationProgress::new(last_entry_hash);
|
let mut progress = ConfirmationProgress::new(last_entry_hash);
|
||||||
if process_single_slot(blockstore, &bank, opts, recyclers, &mut progress).is_err() {
|
if process_single_slot(
|
||||||
|
blockstore,
|
||||||
|
&bank,
|
||||||
|
opts,
|
||||||
|
recyclers,
|
||||||
|
&mut progress,
|
||||||
|
transaction_status_sender.clone(),
|
||||||
|
)
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
txs += progress.num_txs;
|
txs += progress.num_txs;
|
||||||
|
@ -788,10 +801,11 @@ fn process_single_slot(
|
||||||
opts: &ProcessOptions,
|
opts: &ProcessOptions,
|
||||||
recyclers: &VerifyRecyclers,
|
recyclers: &VerifyRecyclers,
|
||||||
progress: &mut ConfirmationProgress,
|
progress: &mut ConfirmationProgress,
|
||||||
|
transaction_status_sender: Option<TransactionStatusSender>,
|
||||||
) -> result::Result<(), BlockstoreProcessorError> {
|
) -> result::Result<(), BlockstoreProcessorError> {
|
||||||
// Mark corrupt slots as dead so validators don't replay this slot and
|
// Mark corrupt slots as dead so validators don't replay this slot and
|
||||||
// see DuplicateSignature errors later in ReplayStage
|
// see DuplicateSignature errors later in ReplayStage
|
||||||
confirm_full_slot(blockstore, bank, opts, recyclers, progress).map_err(|err| {
|
confirm_full_slot(blockstore, bank, opts, recyclers, progress, transaction_status_sender).map_err(|err| {
|
||||||
let slot = bank.slot();
|
let slot = bank.slot();
|
||||||
warn!("slot {} failed to verify: {}", slot, err);
|
warn!("slot {} failed to verify: {}", slot, err);
|
||||||
if blockstore.is_primary_access() {
|
if blockstore.is_primary_access() {
|
||||||
|
@ -2418,14 +2432,21 @@ pub mod tests {
|
||||||
&opts,
|
&opts,
|
||||||
&recyclers,
|
&recyclers,
|
||||||
&mut ConfirmationProgress::new(bank0.last_blockhash()),
|
&mut ConfirmationProgress::new(bank0.last_blockhash()),
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
bank1.squash();
|
bank1.squash();
|
||||||
|
|
||||||
// Test process_blockstore_from_root() from slot 1 onwards
|
// Test process_blockstore_from_root() from slot 1 onwards
|
||||||
let (bank_forks, _leader_schedule) =
|
let (bank_forks, _leader_schedule) = process_blockstore_from_root(
|
||||||
process_blockstore_from_root(&genesis_config, &blockstore, bank1, &opts, &recyclers)
|
&genesis_config,
|
||||||
.unwrap();
|
&blockstore,
|
||||||
|
bank1,
|
||||||
|
&opts,
|
||||||
|
&recyclers,
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
assert_eq!(frozen_bank_slots(&bank_forks), vec![5, 6]);
|
assert_eq!(frozen_bank_slots(&bank_forks), vec![5, 6]);
|
||||||
assert_eq!(bank_forks.working_bank().slot(), 6);
|
assert_eq!(bank_forks.working_bank().slot(), 6);
|
||||||
|
|
Loading…
Reference in New Issue