diff --git a/client-test/tests/client.rs b/client-test/tests/client.rs index 57687746cc..93cad370c2 100644 --- a/client-test/tests/client.rs +++ b/client-test/tests/client.rs @@ -140,9 +140,11 @@ fn test_account_subscription() { bank_forks.write().unwrap().insert(bank1); let bob = Keypair::new(); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::default())), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), @@ -256,10 +258,12 @@ fn test_block_subscription() { max_complete_transaction_status_slot, ); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); // setup RpcSubscriptions && PubSubService let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, blockstore.clone(), bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::default())), @@ -342,9 +346,11 @@ fn test_program_subscription() { bank_forks.write().unwrap().insert(bank1); let bob = Keypair::new(); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::default())), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), @@ -426,9 +432,11 @@ fn test_root_subscription() { let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); bank_forks.write().unwrap().insert(bank1); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::default())), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), @@ -475,9 +483,11 @@ fn test_slot_subscription() { let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::default())), optimistically_confirmed_bank, @@ -549,9 +559,11 @@ async fn test_slot_subscription_async() { let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::default())), optimistically_confirmed_bank, diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index e250e59c49..d61d5b60cd 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -1445,9 +1445,11 @@ mod tests { let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::default())), optimistically_confirmed_bank, @@ -1559,9 +1561,11 @@ mod tests { let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::default())), optimistically_confirmed_bank, diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 806a2e2fe2..6da51fa8c8 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -22,7 +22,7 @@ use { latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks, progress_map::{ForkProgress, ProgressMap, PropagatedStats, ReplaySlotStats}, repair_service::{DumpedSlotsSender, DuplicateSlotsResetReceiver}, - rewards_recorder_service::RewardsRecorderSender, + rewards_recorder_service::{RewardsMessage, RewardsRecorderSender}, tower_storage::{SavedTower, SavedTowerVersions, TowerStorage}, unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes, validator::ProcessBlockStore, @@ -3595,9 +3595,12 @@ impl ReplayStage { let rewards = bank.rewards.read().unwrap(); if !rewards.is_empty() { rewards_recorder_sender - .send((bank.slot(), rewards.clone())) + .send(RewardsMessage::Batch((bank.slot(), rewards.clone()))) .unwrap_or_else(|err| warn!("rewards_recorder_sender failed: {:?}", err)); } + rewards_recorder_sender + .send(RewardsMessage::Complete(bank.slot())) + .unwrap_or_else(|err| warn!("rewards_recorder_sender failed: {:?}", err)); } } @@ -3776,9 +3779,11 @@ pub(crate) mod tests { OptimisticallyConfirmedBank::locked_from_bank_forks_root(bank_forks); let exit = Arc::new(AtomicBool::new(false)); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::default())), optimistically_confirmed_bank, @@ -4343,9 +4348,11 @@ pub(crate) mod tests { &PrioritizationFeeCache::new(0u64), ); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks.clone(), block_commitment_cache, OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), @@ -4414,9 +4421,11 @@ pub(crate) mod tests { let exit = Arc::new(AtomicBool::new(false)); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks.clone(), block_commitment_cache.clone(), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), diff --git a/core/src/rewards_recorder_service.rs b/core/src/rewards_recorder_service.rs index 10dd8ea9cd..d83b0a285f 100644 --- a/core/src/rewards_recorder_service.rs +++ b/core/src/rewards_recorder_service.rs @@ -6,7 +6,7 @@ use { solana_transaction_status::Reward, std::{ sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, }, thread::{self, Builder, JoinHandle}, @@ -14,8 +14,14 @@ use { }, }; -pub type RewardsRecorderReceiver = Receiver<(Slot, Vec<(Pubkey, RewardInfo)>)>; -pub type RewardsRecorderSender = Sender<(Slot, Vec<(Pubkey, RewardInfo)>)>; +pub type RewardsBatch = (Slot, Vec<(Pubkey, RewardInfo)>); +pub type RewardsRecorderReceiver = Receiver; +pub type RewardsRecorderSender = Sender; + +pub enum RewardsMessage { + Batch(RewardsBatch), + Complete(Slot), +} pub struct RewardsRecorderService { thread_hdl: JoinHandle<()>, @@ -25,6 +31,7 @@ impl RewardsRecorderService { #[allow(clippy::new_ret_no_self)] pub fn new( rewards_receiver: RewardsRecorderReceiver, + max_complete_rewards_slot: Arc, blockstore: Arc, exit: &Arc, ) -> Self { @@ -36,7 +43,7 @@ impl RewardsRecorderService { break; } if let Err(RecvTimeoutError::Disconnected) = - Self::write_rewards(&rewards_receiver, &blockstore) + Self::write_rewards(&rewards_receiver, &max_complete_rewards_slot, &blockstore) { break; } @@ -47,23 +54,30 @@ impl RewardsRecorderService { fn write_rewards( rewards_receiver: &RewardsRecorderReceiver, + max_complete_rewards_slot: &Arc, blockstore: &Arc, ) -> Result<(), RecvTimeoutError> { - let (slot, rewards) = rewards_receiver.recv_timeout(Duration::from_secs(1))?; - let rpc_rewards = rewards - .into_iter() - .map(|(pubkey, reward_info)| Reward { - pubkey: pubkey.to_string(), - lamports: reward_info.lamports, - post_balance: reward_info.post_balance, - reward_type: Some(reward_info.reward_type), - commission: reward_info.commission, - }) - .collect(); + match rewards_receiver.recv_timeout(Duration::from_secs(1))? { + RewardsMessage::Batch((slot, rewards)) => { + let rpc_rewards = rewards + .into_iter() + .map(|(pubkey, reward_info)| Reward { + pubkey: pubkey.to_string(), + lamports: reward_info.lamports, + post_balance: reward_info.post_balance, + reward_type: Some(reward_info.reward_type), + commission: reward_info.commission, + }) + .collect(); - blockstore - .write_rewards(slot, rpc_rewards) - .expect("Expect database write to succeed"); + blockstore + .write_rewards(slot, rpc_rewards) + .expect("Expect database write to succeed"); + } + RewardsMessage::Complete(slot) => { + max_complete_rewards_slot.fetch_max(slot, Ordering::SeqCst); + } + } Ok(()) } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index cc78b1613d..39f20c1b23 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -425,6 +425,7 @@ pub mod tests { let (_, gossip_confirmed_slots_receiver) = unbounded(); let bank_forks = Arc::new(RwLock::new(bank_forks)); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let _ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let tvu = Tvu::new( &vote_keypair.pubkey(), @@ -445,6 +446,7 @@ pub mod tests { &Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks.clone(), block_commitment_cache.clone(), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), diff --git a/core/src/validator.rs b/core/src/validator.rs index 5b0d6ac25b..d2223ba99e 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -345,6 +345,7 @@ struct TransactionHistoryServices { max_complete_transaction_status_slot: Arc, rewards_recorder_sender: Option, rewards_recorder_service: Option, + max_complete_rewards_slot: Arc, cache_block_meta_sender: Option, cache_block_meta_service: Option, } @@ -539,6 +540,7 @@ impl Validator { max_complete_transaction_status_slot, rewards_recorder_sender, rewards_recorder_service, + max_complete_rewards_slot, cache_block_meta_sender, cache_block_meta_service, }, @@ -718,6 +720,7 @@ impl Validator { let rpc_subscriptions = Arc::new(RpcSubscriptions::new_with_config( &exit, max_complete_transaction_status_slot.clone(), + max_complete_rewards_slot.clone(), blockstore.clone(), bank_forks.clone(), block_commitment_cache.clone(), @@ -828,6 +831,7 @@ impl Validator { leader_schedule_cache.clone(), connection_cache.clone(), max_complete_transaction_status_slot, + max_complete_rewards_slot, prioritization_fee_cache.clone(), )?; @@ -1871,10 +1875,12 @@ fn initialize_rpc_transaction_history_services( exit, )); + let max_complete_rewards_slot = Arc::new(AtomicU64::new(blockstore.max_root())); let (rewards_recorder_sender, rewards_receiver) = unbounded(); let rewards_recorder_sender = Some(rewards_recorder_sender); let rewards_recorder_service = Some(RewardsRecorderService::new( rewards_receiver, + max_complete_rewards_slot.clone(), blockstore.clone(), exit, )); @@ -1892,6 +1898,7 @@ fn initialize_rpc_transaction_history_services( max_complete_transaction_status_slot, rewards_recorder_sender, rewards_recorder_service, + max_complete_rewards_slot, cache_block_meta_sender, cache_block_meta_service, } diff --git a/ledger/src/bigtable_upload_service.rs b/ledger/src/bigtable_upload_service.rs index 857190a47d..3149eb96a3 100644 --- a/ledger/src/bigtable_upload_service.rs +++ b/ledger/src/bigtable_upload_service.rs @@ -26,6 +26,7 @@ impl BigTableUploadService { blockstore: Arc, block_commitment_cache: Arc>, max_complete_transaction_status_slot: Arc, + max_complete_rewards_slot: Arc, exit: Arc, ) -> Self { Self::new_with_config( @@ -34,6 +35,7 @@ impl BigTableUploadService { blockstore, block_commitment_cache, max_complete_transaction_status_slot, + max_complete_rewards_slot, ConfirmedBlockUploadConfig::default(), exit, ) @@ -45,6 +47,7 @@ impl BigTableUploadService { blockstore: Arc, block_commitment_cache: Arc>, max_complete_transaction_status_slot: Arc, + max_complete_rewards_slot: Arc, config: ConfirmedBlockUploadConfig, exit: Arc, ) -> Self { @@ -58,6 +61,7 @@ impl BigTableUploadService { blockstore, block_commitment_cache, max_complete_transaction_status_slot, + max_complete_rewards_slot, config, exit, ) @@ -73,6 +77,7 @@ impl BigTableUploadService { blockstore: Arc, block_commitment_cache: Arc>, max_complete_transaction_status_slot: Arc, + max_complete_rewards_slot: Arc, config: ConfirmedBlockUploadConfig, exit: Arc, ) { @@ -83,11 +88,15 @@ impl BigTableUploadService { } // The highest slot eligible for upload is the highest root that has complete - // transaction-status metadata - let highest_complete_root = min( + // transaction-status metadata and rewards + let highest_complete_root = [ max_complete_transaction_status_slot.load(Ordering::SeqCst), + max_complete_rewards_slot.load(Ordering::SeqCst), block_commitment_cache.read().unwrap().root(), - ); + ] + .into_iter() + .min() + .expect("root and max_complete slots exist"); let end_slot = min( highest_complete_root, start_slot.saturating_add(config.max_num_slots_to_check as u64 * 2), diff --git a/rpc/src/optimistically_confirmed_bank_tracker.rs b/rpc/src/optimistically_confirmed_bank_tracker.rs index 6495f44709..9693999d23 100644 --- a/rpc/src/optimistically_confirmed_bank_tracker.rs +++ b/rpc/src/optimistically_confirmed_bank_tracker.rs @@ -346,9 +346,11 @@ mod tests { let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks.clone(), block_commitment_cache, optimistically_confirmed_bank.clone(), diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index a90137f7e6..71de4f683d 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -198,6 +198,7 @@ pub struct JsonRpcRequestProcessor { max_slots: Arc, leader_schedule_cache: Arc, max_complete_transaction_status_slot: Arc, + max_complete_rewards_slot: Arc, prioritization_fee_cache: Arc, } impl Metadata for JsonRpcRequestProcessor {} @@ -304,6 +305,7 @@ impl JsonRpcRequestProcessor { max_slots: Arc, leader_schedule_cache: Arc, max_complete_transaction_status_slot: Arc, + max_complete_rewards_slot: Arc, prioritization_fee_cache: Arc, ) -> (Self, Receiver) { let (sender, receiver) = unbounded(); @@ -325,6 +327,7 @@ impl JsonRpcRequestProcessor { max_slots, leader_schedule_cache, max_complete_transaction_status_slot, + max_complete_rewards_slot, prioritization_fee_cache, }, receiver, @@ -393,6 +396,7 @@ impl JsonRpcRequestProcessor { max_slots: Arc::new(MaxSlots::default()), leader_schedule_cache: Arc::new(LeaderScheduleCache::new_from_bank(bank)), max_complete_transaction_status_slot: Arc::new(AtomicU64::default()), + max_complete_rewards_slot: Arc::new(AtomicU64::default()), prioritization_fee_cache: Arc::new(PrioritizationFeeCache::default()), } } @@ -1045,11 +1049,12 @@ impl JsonRpcRequestProcessor { Ok(()) } - fn check_status_is_complete(&self, slot: Slot) -> Result<()> { + fn check_blockstore_writes_complete(&self, slot: Slot) -> Result<()> { if slot > self .max_complete_transaction_status_slot .load(Ordering::SeqCst) + || slot > self.max_complete_rewards_slot.load(Ordering::SeqCst) { Err(RpcCustomError::BlockStatusNotAvailableYet { slot }.into()) } else { @@ -1083,7 +1088,7 @@ impl JsonRpcRequestProcessor { .unwrap() .highest_confirmed_root() { - self.check_status_is_complete(slot)?; + self.check_blockstore_writes_complete(slot)?; let result = self.blockstore.get_rooted_block(slot, true); self.check_blockstore_root(&result, slot)?; let encode_block = |confirmed_block: ConfirmedBlock| -> Result { @@ -1114,7 +1119,7 @@ impl JsonRpcRequestProcessor { // Check if block is confirmed let confirmed_bank = self.bank(Some(CommitmentConfig::confirmed())); if confirmed_bank.status_cache_ancestors().contains(&slot) { - self.check_status_is_complete(slot)?; + self.check_blockstore_writes_complete(slot)?; let result = self.blockstore.get_complete_block(slot, true); return result .ok() @@ -4766,6 +4771,7 @@ pub mod tests { let max_slots = Arc::new(MaxSlots::default()); // note that this means that slot 0 will always be considered complete let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(0)); + let max_complete_rewards_slot = Arc::new(AtomicU64::new(0)); let meta = JsonRpcRequestProcessor::new( config, @@ -4783,6 +4789,7 @@ pub mod tests { max_slots.clone(), Arc::new(LeaderScheduleCache::new_from_bank(&bank)), max_complete_transaction_status_slot.clone(), + max_complete_rewards_slot, Arc::new(PrioritizationFeeCache::default()), ) .0; @@ -6410,6 +6417,7 @@ pub mod tests { Arc::new(MaxSlots::default()), Arc::new(LeaderScheduleCache::default()), Arc::new(AtomicU64::default()), + Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), ); let connection_cache = Arc::new(ConnectionCache::default()); @@ -6677,6 +6685,7 @@ pub mod tests { Arc::new(MaxSlots::default()), Arc::new(LeaderScheduleCache::default()), Arc::new(AtomicU64::default()), + Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), ); let connection_cache = Arc::new(ConnectionCache::default()); @@ -8272,9 +8281,11 @@ pub mod tests { OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let mut pending_optimistically_confirmed_banks = HashSet::new(); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, - max_complete_transaction_status_slot, + max_complete_transaction_status_slot.clone(), + max_complete_rewards_slot.clone(), bank_forks.clone(), block_commitment_cache.clone(), optimistically_confirmed_bank.clone(), @@ -8295,7 +8306,8 @@ pub mod tests { Arc::new(RwLock::new(LargestAccountsCache::new(30))), Arc::new(MaxSlots::default()), Arc::new(LeaderScheduleCache::default()), - Arc::new(AtomicU64::default()), + max_complete_transaction_status_slot, + max_complete_rewards_slot, Arc::new(PrioritizationFeeCache::default()), ); diff --git a/rpc/src/rpc_pubsub.rs b/rpc/src/rpc_pubsub.rs index 0dd409d379..0aec5951e6 100644 --- a/rpc/src/rpc_pubsub.rs +++ b/rpc/src/rpc_pubsub.rs @@ -685,9 +685,11 @@ mod tests { let blockhash = bank.last_blockhash(); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &Arc::new(AtomicBool::new(false)), max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), @@ -814,8 +816,10 @@ mod tests { let mut io = IoHandler::<()>::default(); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks( max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks, )); let (rpc, _receiver) = rpc_pubsub_service::test_connection(&subscriptions); @@ -871,9 +875,11 @@ mod tests { let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); bank_forks.write().unwrap().insert(bank1); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &Arc::new(AtomicBool::new(false)), max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots( 1, 1, @@ -997,9 +1003,11 @@ mod tests { let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); bank_forks.write().unwrap().insert(bank1); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &Arc::new(AtomicBool::new(false)), max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots( 1, 1, @@ -1085,8 +1093,10 @@ mod tests { let mut io = IoHandler::<()>::default(); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks( max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks, )); let (rpc, _receiver) = rpc_pubsub_service::test_connection(&subscriptions); @@ -1132,9 +1142,11 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), @@ -1186,9 +1198,11 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks.clone(), block_commitment_cache, OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), @@ -1259,8 +1273,10 @@ mod tests { let bank = Bank::new_for_tests(&genesis_config); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks( max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks, )); let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&rpc_subscriptions); @@ -1290,8 +1306,10 @@ mod tests { let bank = Bank::new_for_tests(&genesis_config); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks( max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks, )); let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&rpc_subscriptions); @@ -1335,9 +1353,11 @@ mod tests { let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks, block_commitment_cache, optimistically_confirmed_bank, @@ -1371,8 +1391,10 @@ mod tests { let bank = Bank::new_for_tests(&genesis_config); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks( max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks, )); let (rpc, _receiver) = rpc_pubsub_service::test_connection(&rpc_subscriptions); @@ -1388,8 +1410,10 @@ mod tests { let bank = Bank::new_for_tests(&genesis_config); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks( max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks, )); let (rpc, _receiver) = rpc_pubsub_service::test_connection(&rpc_subscriptions); diff --git a/rpc/src/rpc_pubsub_service.rs b/rpc/src/rpc_pubsub_service.rs index e5c0d9bf19..59ff6ccbc8 100644 --- a/rpc/src/rpc_pubsub_service.rs +++ b/rpc/src/rpc_pubsub_service.rs @@ -401,6 +401,7 @@ mod tests { let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0); let exit = Arc::new(AtomicBool::new(false)); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let bank = Bank::new_for_tests(&genesis_config); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); @@ -409,6 +410,7 @@ mod tests { let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), optimistically_confirmed_bank, diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index 8a8e9013af..b4c3bd79c4 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -358,6 +358,7 @@ impl JsonRpcService { leader_schedule_cache: Arc, connection_cache: Arc, max_complete_transaction_status_slot: Arc, + max_complete_rewards_slot: Arc, prioritization_fee_cache: Arc, ) -> Result { info!("rpc bound to {:?}", rpc_addr); @@ -429,6 +430,7 @@ impl JsonRpcService { blockstore.clone(), block_commitment_cache.clone(), max_complete_transaction_status_slot.clone(), + max_complete_rewards_slot.clone(), ConfirmedBlockUploadConfig::default(), exit_bigtable_ledger_upload_service.clone(), ))) @@ -470,6 +472,7 @@ impl JsonRpcService { max_slots, leader_schedule_cache, max_complete_transaction_status_slot, + max_complete_rewards_slot, prioritization_fee_cache, ); @@ -647,6 +650,7 @@ mod tests { Arc::new(LeaderScheduleCache::default()), connection_cache, Arc::new(AtomicU64::default()), + Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), ) .expect("assume successful JsonRpcService start"); diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index b1d87b91f9..7eca2ea8b5 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -529,6 +529,7 @@ impl RpcSubscriptions { pub fn new( exit: &Arc, max_complete_transaction_status_slot: Arc, + max_complete_rewards_slot: Arc, blockstore: Arc, bank_forks: Arc>, block_commitment_cache: Arc>, @@ -537,6 +538,7 @@ impl RpcSubscriptions { Self::new_with_config( exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, blockstore, bank_forks, block_commitment_cache, @@ -549,6 +551,7 @@ impl RpcSubscriptions { pub fn new_for_tests( exit: &Arc, max_complete_transaction_status_slot: Arc, + max_complete_rewards_slot: Arc, bank_forks: Arc>, block_commitment_cache: Arc>, optimistically_confirmed_bank: Arc>, @@ -560,6 +563,7 @@ impl RpcSubscriptions { Self::new_for_tests_with_blockstore( exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, blockstore, bank_forks, block_commitment_cache, @@ -570,6 +574,7 @@ impl RpcSubscriptions { pub fn new_for_tests_with_blockstore( exit: &Arc, max_complete_transaction_status_slot: Arc, + max_complete_rewards_slot: Arc, blockstore: Arc, bank_forks: Arc>, block_commitment_cache: Arc>, @@ -580,6 +585,7 @@ impl RpcSubscriptions { let rpc_subscriptions = Self::new_with_config( exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, blockstore, bank_forks, block_commitment_cache, @@ -604,6 +610,7 @@ impl RpcSubscriptions { pub fn new_with_config( exit: &Arc, max_complete_transaction_status_slot: Arc, + max_complete_rewards_slot: Arc, blockstore: Arc, bank_forks: Arc>, block_commitment_cache: Arc>, @@ -645,6 +652,7 @@ impl RpcSubscriptions { Self::process_notifications( exit_clone, max_complete_transaction_status_slot, + max_complete_rewards_slot, blockstore, notifier, notification_receiver, @@ -680,6 +688,7 @@ impl RpcSubscriptions { // For tests only... pub fn default_with_bank_forks( max_complete_transaction_status_slot: Arc, + max_complete_rewards_slot: Arc, bank_forks: Arc>, ) -> Self { let ledger_path = get_tmp_ledger_path!(); @@ -690,6 +699,7 @@ impl RpcSubscriptions { Self::new( &Arc::new(AtomicBool::new(false)), max_complete_transaction_status_slot, + max_complete_rewards_slot, blockstore, bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::default())), @@ -759,9 +769,11 @@ impl RpcSubscriptions { } } + #[allow(clippy::too_many_arguments)] fn process_notifications( exit: Arc, max_complete_transaction_status_slot: Arc, + max_complete_rewards_slot: Arc, blockstore: Arc, notifier: RpcNotifier, notification_receiver: Receiver, @@ -847,6 +859,7 @@ impl RpcSubscriptions { const SOURCE: &str = "bank"; RpcSubscriptions::notify_watchers( max_complete_transaction_status_slot.clone(), + max_complete_rewards_slot.clone(), subscriptions.commitment_watchers(), &bank_forks, &blockstore, @@ -863,6 +876,7 @@ impl RpcSubscriptions { const SOURCE: &str = "gossip"; RpcSubscriptions::notify_watchers( max_complete_transaction_status_slot.clone(), + max_complete_rewards_slot.clone(), subscriptions.gossip_watchers(), &bank_forks, &blockstore, @@ -917,6 +931,7 @@ impl RpcSubscriptions { fn notify_watchers( max_complete_transaction_status_slot: Arc, + max_complete_rewards_slot: Arc, subscriptions: &HashMap>, bank_forks: &Arc>, blockstore: &Blockstore, @@ -1010,7 +1025,9 @@ impl RpcSubscriptions { // caused by non-deterministic concurrency accesses, we // break out of the loop. Besides if the current `s` is // greater, then any `s + K` is also greater. - if s > max_complete_transaction_status_slot.load(Ordering::SeqCst) { + if s > max_complete_transaction_status_slot.load(Ordering::SeqCst) + || s > max_complete_rewards_slot.load(Ordering::SeqCst) + { break; } @@ -1314,9 +1331,11 @@ pub(crate) mod tests { let exit = Arc::new(AtomicBool::new(false)); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots( 1, 1, @@ -1457,9 +1476,11 @@ pub(crate) mod tests { let blockstore = Blockstore::open(&ledger_path).unwrap(); let blockstore = Arc::new(blockstore); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, blockstore.clone(), bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), @@ -1575,9 +1596,11 @@ pub(crate) mod tests { let blockstore = Blockstore::open(&ledger_path).unwrap(); let blockstore = Arc::new(blockstore); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, blockstore.clone(), bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), @@ -1691,9 +1714,11 @@ pub(crate) mod tests { let blockstore = Blockstore::open(&ledger_path).unwrap(); let blockstore = Arc::new(blockstore); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, blockstore.clone(), bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), @@ -1822,9 +1847,11 @@ pub(crate) mod tests { let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), optimistically_confirmed_bank, @@ -1970,9 +1997,11 @@ pub(crate) mod tests { OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let mut pending_optimistically_confirmed_banks = HashSet::new(); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots( 1, 1, @@ -2141,9 +2170,11 @@ pub(crate) mod tests { OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let mut pending_optimistically_confirmed_banks = HashSet::new(); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots( 1, 1, @@ -2252,9 +2283,11 @@ pub(crate) mod tests { OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let mut pending_optimistically_confirmed_banks = HashSet::new(); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots( 1, 1, @@ -2440,9 +2473,11 @@ pub(crate) mod tests { let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks, Arc::new(RwLock::new(block_commitment_cache)), optimistically_confirmed_bank, @@ -2614,9 +2649,11 @@ pub(crate) mod tests { let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), optimistically_confirmed_bank, @@ -2659,9 +2696,11 @@ pub(crate) mod tests { let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), optimistically_confirmed_bank, @@ -2718,9 +2757,11 @@ pub(crate) mod tests { let exit = Arc::new(AtomicBool::new(false)); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots( 1, 1, @@ -2913,11 +2954,13 @@ pub(crate) mod tests { let exit = Arc::new(AtomicBool::new(false)); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), optimistically_confirmed_bank, @@ -2988,9 +3031,11 @@ pub(crate) mod tests { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100); let bank = Bank::new_for_tests(&genesis_config); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks( max_complete_transaction_status_slot, + max_complete_rewards_slot, bank_forks, ));