Track max_complete_rewards_slot for use in rpc, bigtable (#30698)

* Add RewardsMessage enum

* Cache and update max_complete_rewards_slot

* Plumb max_complete_rewards_slot into JsonRpcRequestProcesseor

* Use max_complete_rewards_slot to check get_block requests

* Use max_complete_rewards_slot to limit Bigtable uploads

* Plumb max_complete_rewards_slot into RpcSubscriptions

* Use max_complete_rewards_slot to limit block subscriptions

* Nit: fix test
This commit is contained in:
Tyera 2023-03-14 12:08:48 -06:00 committed by GitHub
parent 93c43610ac
commit b389d509a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 175 additions and 29 deletions

View File

@ -140,9 +140,11 @@ fn test_account_subscription() {
bank_forks.write().unwrap().insert(bank1);
let bob = Keypair::new();
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::default())),
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
@ -256,10 +258,12 @@ fn test_block_subscription() {
max_complete_transaction_status_slot,
);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
// setup RpcSubscriptions && PubSubService
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
blockstore.clone(),
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::default())),
@ -342,9 +346,11 @@ fn test_program_subscription() {
bank_forks.write().unwrap().insert(bank1);
let bob = Keypair::new();
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::default())),
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
@ -426,9 +432,11 @@ fn test_root_subscription() {
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::default())),
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
@ -475,9 +483,11 @@ fn test_slot_subscription() {
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::default())),
optimistically_confirmed_bank,
@ -549,9 +559,11 @@ async fn test_slot_subscription_async() {
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::default())),
optimistically_confirmed_bank,

View File

@ -1445,9 +1445,11 @@ mod tests {
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::default())),
optimistically_confirmed_bank,
@ -1559,9 +1561,11 @@ mod tests {
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::default())),
optimistically_confirmed_bank,

View File

@ -22,7 +22,7 @@ use {
latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks,
progress_map::{ForkProgress, ProgressMap, PropagatedStats, ReplaySlotStats},
repair_service::{DumpedSlotsSender, DuplicateSlotsResetReceiver},
rewards_recorder_service::RewardsRecorderSender,
rewards_recorder_service::{RewardsMessage, RewardsRecorderSender},
tower_storage::{SavedTower, SavedTowerVersions, TowerStorage},
unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes,
validator::ProcessBlockStore,
@ -3595,9 +3595,12 @@ impl ReplayStage {
let rewards = bank.rewards.read().unwrap();
if !rewards.is_empty() {
rewards_recorder_sender
.send((bank.slot(), rewards.clone()))
.send(RewardsMessage::Batch((bank.slot(), rewards.clone())))
.unwrap_or_else(|err| warn!("rewards_recorder_sender failed: {:?}", err));
}
rewards_recorder_sender
.send(RewardsMessage::Complete(bank.slot()))
.unwrap_or_else(|err| warn!("rewards_recorder_sender failed: {:?}", err));
}
}
@ -3776,9 +3779,11 @@ pub(crate) mod tests {
OptimisticallyConfirmedBank::locked_from_bank_forks_root(bank_forks);
let exit = Arc::new(AtomicBool::new(false));
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::default())),
optimistically_confirmed_bank,
@ -4343,9 +4348,11 @@ pub(crate) mod tests {
&PrioritizationFeeCache::new(0u64),
);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks.clone(),
block_commitment_cache,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
@ -4414,9 +4421,11 @@ pub(crate) mod tests {
let exit = Arc::new(AtomicBool::new(false));
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks.clone(),
block_commitment_cache.clone(),
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),

View File

@ -6,7 +6,7 @@ use {
solana_transaction_status::Reward,
std::{
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
thread::{self, Builder, JoinHandle},
@ -14,8 +14,14 @@ use {
},
};
pub type RewardsRecorderReceiver = Receiver<(Slot, Vec<(Pubkey, RewardInfo)>)>;
pub type RewardsRecorderSender = Sender<(Slot, Vec<(Pubkey, RewardInfo)>)>;
pub type RewardsBatch = (Slot, Vec<(Pubkey, RewardInfo)>);
pub type RewardsRecorderReceiver = Receiver<RewardsMessage>;
pub type RewardsRecorderSender = Sender<RewardsMessage>;
pub enum RewardsMessage {
Batch(RewardsBatch),
Complete(Slot),
}
pub struct RewardsRecorderService {
thread_hdl: JoinHandle<()>,
@ -25,6 +31,7 @@ impl RewardsRecorderService {
#[allow(clippy::new_ret_no_self)]
pub fn new(
rewards_receiver: RewardsRecorderReceiver,
max_complete_rewards_slot: Arc<AtomicU64>,
blockstore: Arc<Blockstore>,
exit: &Arc<AtomicBool>,
) -> Self {
@ -36,7 +43,7 @@ impl RewardsRecorderService {
break;
}
if let Err(RecvTimeoutError::Disconnected) =
Self::write_rewards(&rewards_receiver, &blockstore)
Self::write_rewards(&rewards_receiver, &max_complete_rewards_slot, &blockstore)
{
break;
}
@ -47,23 +54,30 @@ impl RewardsRecorderService {
fn write_rewards(
rewards_receiver: &RewardsRecorderReceiver,
max_complete_rewards_slot: &Arc<AtomicU64>,
blockstore: &Arc<Blockstore>,
) -> Result<(), RecvTimeoutError> {
let (slot, rewards) = rewards_receiver.recv_timeout(Duration::from_secs(1))?;
let rpc_rewards = rewards
.into_iter()
.map(|(pubkey, reward_info)| Reward {
pubkey: pubkey.to_string(),
lamports: reward_info.lamports,
post_balance: reward_info.post_balance,
reward_type: Some(reward_info.reward_type),
commission: reward_info.commission,
})
.collect();
match rewards_receiver.recv_timeout(Duration::from_secs(1))? {
RewardsMessage::Batch((slot, rewards)) => {
let rpc_rewards = rewards
.into_iter()
.map(|(pubkey, reward_info)| Reward {
pubkey: pubkey.to_string(),
lamports: reward_info.lamports,
post_balance: reward_info.post_balance,
reward_type: Some(reward_info.reward_type),
commission: reward_info.commission,
})
.collect();
blockstore
.write_rewards(slot, rpc_rewards)
.expect("Expect database write to succeed");
blockstore
.write_rewards(slot, rpc_rewards)
.expect("Expect database write to succeed");
}
RewardsMessage::Complete(slot) => {
max_complete_rewards_slot.fetch_max(slot, Ordering::SeqCst);
}
}
Ok(())
}

View File

@ -425,6 +425,7 @@ pub mod tests {
let (_, gossip_confirmed_slots_receiver) = unbounded();
let bank_forks = Arc::new(RwLock::new(bank_forks));
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let _ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let tvu = Tvu::new(
&vote_keypair.pubkey(),
@ -445,6 +446,7 @@ pub mod tests {
&Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks.clone(),
block_commitment_cache.clone(),
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),

View File

@ -345,6 +345,7 @@ struct TransactionHistoryServices {
max_complete_transaction_status_slot: Arc<AtomicU64>,
rewards_recorder_sender: Option<RewardsRecorderSender>,
rewards_recorder_service: Option<RewardsRecorderService>,
max_complete_rewards_slot: Arc<AtomicU64>,
cache_block_meta_sender: Option<CacheBlockMetaSender>,
cache_block_meta_service: Option<CacheBlockMetaService>,
}
@ -539,6 +540,7 @@ impl Validator {
max_complete_transaction_status_slot,
rewards_recorder_sender,
rewards_recorder_service,
max_complete_rewards_slot,
cache_block_meta_sender,
cache_block_meta_service,
},
@ -718,6 +720,7 @@ impl Validator {
let rpc_subscriptions = Arc::new(RpcSubscriptions::new_with_config(
&exit,
max_complete_transaction_status_slot.clone(),
max_complete_rewards_slot.clone(),
blockstore.clone(),
bank_forks.clone(),
block_commitment_cache.clone(),
@ -828,6 +831,7 @@ impl Validator {
leader_schedule_cache.clone(),
connection_cache.clone(),
max_complete_transaction_status_slot,
max_complete_rewards_slot,
prioritization_fee_cache.clone(),
)?;
@ -1871,10 +1875,12 @@ fn initialize_rpc_transaction_history_services(
exit,
));
let max_complete_rewards_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
let (rewards_recorder_sender, rewards_receiver) = unbounded();
let rewards_recorder_sender = Some(rewards_recorder_sender);
let rewards_recorder_service = Some(RewardsRecorderService::new(
rewards_receiver,
max_complete_rewards_slot.clone(),
blockstore.clone(),
exit,
));
@ -1892,6 +1898,7 @@ fn initialize_rpc_transaction_history_services(
max_complete_transaction_status_slot,
rewards_recorder_sender,
rewards_recorder_service,
max_complete_rewards_slot,
cache_block_meta_sender,
cache_block_meta_service,
}

View File

@ -26,6 +26,7 @@ impl BigTableUploadService {
blockstore: Arc<Blockstore>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
max_complete_transaction_status_slot: Arc<AtomicU64>,
max_complete_rewards_slot: Arc<AtomicU64>,
exit: Arc<AtomicBool>,
) -> Self {
Self::new_with_config(
@ -34,6 +35,7 @@ impl BigTableUploadService {
blockstore,
block_commitment_cache,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
ConfirmedBlockUploadConfig::default(),
exit,
)
@ -45,6 +47,7 @@ impl BigTableUploadService {
blockstore: Arc<Blockstore>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
max_complete_transaction_status_slot: Arc<AtomicU64>,
max_complete_rewards_slot: Arc<AtomicU64>,
config: ConfirmedBlockUploadConfig,
exit: Arc<AtomicBool>,
) -> Self {
@ -58,6 +61,7 @@ impl BigTableUploadService {
blockstore,
block_commitment_cache,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
config,
exit,
)
@ -73,6 +77,7 @@ impl BigTableUploadService {
blockstore: Arc<Blockstore>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
max_complete_transaction_status_slot: Arc<AtomicU64>,
max_complete_rewards_slot: Arc<AtomicU64>,
config: ConfirmedBlockUploadConfig,
exit: Arc<AtomicBool>,
) {
@ -83,11 +88,15 @@ impl BigTableUploadService {
}
// The highest slot eligible for upload is the highest root that has complete
// transaction-status metadata
let highest_complete_root = min(
// transaction-status metadata and rewards
let highest_complete_root = [
max_complete_transaction_status_slot.load(Ordering::SeqCst),
max_complete_rewards_slot.load(Ordering::SeqCst),
block_commitment_cache.read().unwrap().root(),
);
]
.into_iter()
.min()
.expect("root and max_complete slots exist");
let end_slot = min(
highest_complete_root,
start_slot.saturating_add(config.max_num_slots_to_check as u64 * 2),

View File

@ -346,9 +346,11 @@ mod tests {
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks.clone(),
block_commitment_cache,
optimistically_confirmed_bank.clone(),

View File

@ -198,6 +198,7 @@ pub struct JsonRpcRequestProcessor {
max_slots: Arc<MaxSlots>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
max_complete_transaction_status_slot: Arc<AtomicU64>,
max_complete_rewards_slot: Arc<AtomicU64>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
}
impl Metadata for JsonRpcRequestProcessor {}
@ -304,6 +305,7 @@ impl JsonRpcRequestProcessor {
max_slots: Arc<MaxSlots>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
max_complete_transaction_status_slot: Arc<AtomicU64>,
max_complete_rewards_slot: Arc<AtomicU64>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
) -> (Self, Receiver<TransactionInfo>) {
let (sender, receiver) = unbounded();
@ -325,6 +327,7 @@ impl JsonRpcRequestProcessor {
max_slots,
leader_schedule_cache,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
prioritization_fee_cache,
},
receiver,
@ -393,6 +396,7 @@ impl JsonRpcRequestProcessor {
max_slots: Arc::new(MaxSlots::default()),
leader_schedule_cache: Arc::new(LeaderScheduleCache::new_from_bank(bank)),
max_complete_transaction_status_slot: Arc::new(AtomicU64::default()),
max_complete_rewards_slot: Arc::new(AtomicU64::default()),
prioritization_fee_cache: Arc::new(PrioritizationFeeCache::default()),
}
}
@ -1045,11 +1049,12 @@ impl JsonRpcRequestProcessor {
Ok(())
}
fn check_status_is_complete(&self, slot: Slot) -> Result<()> {
fn check_blockstore_writes_complete(&self, slot: Slot) -> Result<()> {
if slot
> self
.max_complete_transaction_status_slot
.load(Ordering::SeqCst)
|| slot > self.max_complete_rewards_slot.load(Ordering::SeqCst)
{
Err(RpcCustomError::BlockStatusNotAvailableYet { slot }.into())
} else {
@ -1083,7 +1088,7 @@ impl JsonRpcRequestProcessor {
.unwrap()
.highest_confirmed_root()
{
self.check_status_is_complete(slot)?;
self.check_blockstore_writes_complete(slot)?;
let result = self.blockstore.get_rooted_block(slot, true);
self.check_blockstore_root(&result, slot)?;
let encode_block = |confirmed_block: ConfirmedBlock| -> Result<UiConfirmedBlock> {
@ -1114,7 +1119,7 @@ impl JsonRpcRequestProcessor {
// Check if block is confirmed
let confirmed_bank = self.bank(Some(CommitmentConfig::confirmed()));
if confirmed_bank.status_cache_ancestors().contains(&slot) {
self.check_status_is_complete(slot)?;
self.check_blockstore_writes_complete(slot)?;
let result = self.blockstore.get_complete_block(slot, true);
return result
.ok()
@ -4766,6 +4771,7 @@ pub mod tests {
let max_slots = Arc::new(MaxSlots::default());
// note that this means that slot 0 will always be considered complete
let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(0));
let max_complete_rewards_slot = Arc::new(AtomicU64::new(0));
let meta = JsonRpcRequestProcessor::new(
config,
@ -4783,6 +4789,7 @@ pub mod tests {
max_slots.clone(),
Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
max_complete_transaction_status_slot.clone(),
max_complete_rewards_slot,
Arc::new(PrioritizationFeeCache::default()),
)
.0;
@ -6410,6 +6417,7 @@ pub mod tests {
Arc::new(MaxSlots::default()),
Arc::new(LeaderScheduleCache::default()),
Arc::new(AtomicU64::default()),
Arc::new(AtomicU64::default()),
Arc::new(PrioritizationFeeCache::default()),
);
let connection_cache = Arc::new(ConnectionCache::default());
@ -6677,6 +6685,7 @@ pub mod tests {
Arc::new(MaxSlots::default()),
Arc::new(LeaderScheduleCache::default()),
Arc::new(AtomicU64::default()),
Arc::new(AtomicU64::default()),
Arc::new(PrioritizationFeeCache::default()),
);
let connection_cache = Arc::new(ConnectionCache::default());
@ -8272,9 +8281,11 @@ pub mod tests {
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let mut pending_optimistically_confirmed_banks = HashSet::new();
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_transaction_status_slot.clone(),
max_complete_rewards_slot.clone(),
bank_forks.clone(),
block_commitment_cache.clone(),
optimistically_confirmed_bank.clone(),
@ -8295,7 +8306,8 @@ pub mod tests {
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
Arc::new(MaxSlots::default()),
Arc::new(LeaderScheduleCache::default()),
Arc::new(AtomicU64::default()),
max_complete_transaction_status_slot,
max_complete_rewards_slot,
Arc::new(PrioritizationFeeCache::default()),
);

View File

@ -685,9 +685,11 @@ mod tests {
let blockhash = bank.last_blockhash();
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&Arc::new(AtomicBool::new(false)),
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
@ -814,8 +816,10 @@ mod tests {
let mut io = IoHandler::<()>::default();
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks(
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks,
));
let (rpc, _receiver) = rpc_pubsub_service::test_connection(&subscriptions);
@ -871,9 +875,11 @@ mod tests {
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&Arc::new(AtomicBool::new(false)),
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
1, 1,
@ -997,9 +1003,11 @@ mod tests {
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&Arc::new(AtomicBool::new(false)),
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
1, 1,
@ -1085,8 +1093,10 @@ mod tests {
let mut io = IoHandler::<()>::default();
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks(
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks,
));
let (rpc, _receiver) = rpc_pubsub_service::test_connection(&subscriptions);
@ -1132,9 +1142,11 @@ mod tests {
let exit = Arc::new(AtomicBool::new(false));
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
@ -1186,9 +1198,11 @@ mod tests {
let exit = Arc::new(AtomicBool::new(false));
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests()));
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks.clone(),
block_commitment_cache,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
@ -1259,8 +1273,10 @@ mod tests {
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let rpc_subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks(
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks,
));
let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&rpc_subscriptions);
@ -1290,8 +1306,10 @@ mod tests {
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let rpc_subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks(
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks,
));
let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&rpc_subscriptions);
@ -1335,9 +1353,11 @@ mod tests {
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks,
block_commitment_cache,
optimistically_confirmed_bank,
@ -1371,8 +1391,10 @@ mod tests {
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let rpc_subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks(
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks,
));
let (rpc, _receiver) = rpc_pubsub_service::test_connection(&rpc_subscriptions);
@ -1388,8 +1410,10 @@ mod tests {
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let rpc_subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks(
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks,
));
let (rpc, _receiver) = rpc_pubsub_service::test_connection(&rpc_subscriptions);

View File

@ -401,6 +401,7 @@ mod tests {
let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
let exit = Arc::new(AtomicBool::new(false));
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
@ -409,6 +410,7 @@ mod tests {
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
optimistically_confirmed_bank,

View File

@ -358,6 +358,7 @@ impl JsonRpcService {
leader_schedule_cache: Arc<LeaderScheduleCache>,
connection_cache: Arc<ConnectionCache>,
max_complete_transaction_status_slot: Arc<AtomicU64>,
max_complete_rewards_slot: Arc<AtomicU64>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
) -> Result<Self, String> {
info!("rpc bound to {:?}", rpc_addr);
@ -429,6 +430,7 @@ impl JsonRpcService {
blockstore.clone(),
block_commitment_cache.clone(),
max_complete_transaction_status_slot.clone(),
max_complete_rewards_slot.clone(),
ConfirmedBlockUploadConfig::default(),
exit_bigtable_ledger_upload_service.clone(),
)))
@ -470,6 +472,7 @@ impl JsonRpcService {
max_slots,
leader_schedule_cache,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
prioritization_fee_cache,
);
@ -647,6 +650,7 @@ mod tests {
Arc::new(LeaderScheduleCache::default()),
connection_cache,
Arc::new(AtomicU64::default()),
Arc::new(AtomicU64::default()),
Arc::new(PrioritizationFeeCache::default()),
)
.expect("assume successful JsonRpcService start");

View File

@ -529,6 +529,7 @@ impl RpcSubscriptions {
pub fn new(
exit: &Arc<AtomicBool>,
max_complete_transaction_status_slot: Arc<AtomicU64>,
max_complete_rewards_slot: Arc<AtomicU64>,
blockstore: Arc<Blockstore>,
bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
@ -537,6 +538,7 @@ impl RpcSubscriptions {
Self::new_with_config(
exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
blockstore,
bank_forks,
block_commitment_cache,
@ -549,6 +551,7 @@ impl RpcSubscriptions {
pub fn new_for_tests(
exit: &Arc<AtomicBool>,
max_complete_transaction_status_slot: Arc<AtomicU64>,
max_complete_rewards_slot: Arc<AtomicU64>,
bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
@ -560,6 +563,7 @@ impl RpcSubscriptions {
Self::new_for_tests_with_blockstore(
exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
blockstore,
bank_forks,
block_commitment_cache,
@ -570,6 +574,7 @@ impl RpcSubscriptions {
pub fn new_for_tests_with_blockstore(
exit: &Arc<AtomicBool>,
max_complete_transaction_status_slot: Arc<AtomicU64>,
max_complete_rewards_slot: Arc<AtomicU64>,
blockstore: Arc<Blockstore>,
bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
@ -580,6 +585,7 @@ impl RpcSubscriptions {
let rpc_subscriptions = Self::new_with_config(
exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
blockstore,
bank_forks,
block_commitment_cache,
@ -604,6 +610,7 @@ impl RpcSubscriptions {
pub fn new_with_config(
exit: &Arc<AtomicBool>,
max_complete_transaction_status_slot: Arc<AtomicU64>,
max_complete_rewards_slot: Arc<AtomicU64>,
blockstore: Arc<Blockstore>,
bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
@ -645,6 +652,7 @@ impl RpcSubscriptions {
Self::process_notifications(
exit_clone,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
blockstore,
notifier,
notification_receiver,
@ -680,6 +688,7 @@ impl RpcSubscriptions {
// For tests only...
pub fn default_with_bank_forks(
max_complete_transaction_status_slot: Arc<AtomicU64>,
max_complete_rewards_slot: Arc<AtomicU64>,
bank_forks: Arc<RwLock<BankForks>>,
) -> Self {
let ledger_path = get_tmp_ledger_path!();
@ -690,6 +699,7 @@ impl RpcSubscriptions {
Self::new(
&Arc::new(AtomicBool::new(false)),
max_complete_transaction_status_slot,
max_complete_rewards_slot,
blockstore,
bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::default())),
@ -759,9 +769,11 @@ impl RpcSubscriptions {
}
}
#[allow(clippy::too_many_arguments)]
fn process_notifications(
exit: Arc<AtomicBool>,
max_complete_transaction_status_slot: Arc<AtomicU64>,
max_complete_rewards_slot: Arc<AtomicU64>,
blockstore: Arc<Blockstore>,
notifier: RpcNotifier,
notification_receiver: Receiver<TimestampedNotificationEntry>,
@ -847,6 +859,7 @@ impl RpcSubscriptions {
const SOURCE: &str = "bank";
RpcSubscriptions::notify_watchers(
max_complete_transaction_status_slot.clone(),
max_complete_rewards_slot.clone(),
subscriptions.commitment_watchers(),
&bank_forks,
&blockstore,
@ -863,6 +876,7 @@ impl RpcSubscriptions {
const SOURCE: &str = "gossip";
RpcSubscriptions::notify_watchers(
max_complete_transaction_status_slot.clone(),
max_complete_rewards_slot.clone(),
subscriptions.gossip_watchers(),
&bank_forks,
&blockstore,
@ -917,6 +931,7 @@ impl RpcSubscriptions {
fn notify_watchers(
max_complete_transaction_status_slot: Arc<AtomicU64>,
max_complete_rewards_slot: Arc<AtomicU64>,
subscriptions: &HashMap<SubscriptionId, Arc<SubscriptionInfo>>,
bank_forks: &Arc<RwLock<BankForks>>,
blockstore: &Blockstore,
@ -1010,7 +1025,9 @@ impl RpcSubscriptions {
// caused by non-deterministic concurrency accesses, we
// break out of the loop. Besides if the current `s` is
// greater, then any `s + K` is also greater.
if s > max_complete_transaction_status_slot.load(Ordering::SeqCst) {
if s > max_complete_transaction_status_slot.load(Ordering::SeqCst)
|| s > max_complete_rewards_slot.load(Ordering::SeqCst)
{
break;
}
@ -1314,9 +1331,11 @@ pub(crate) mod tests {
let exit = Arc::new(AtomicBool::new(false));
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
1, 1,
@ -1457,9 +1476,11 @@ pub(crate) mod tests {
let blockstore = Blockstore::open(&ledger_path).unwrap();
let blockstore = Arc::new(blockstore);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
blockstore.clone(),
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
@ -1575,9 +1596,11 @@ pub(crate) mod tests {
let blockstore = Blockstore::open(&ledger_path).unwrap();
let blockstore = Arc::new(blockstore);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
blockstore.clone(),
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
@ -1691,9 +1714,11 @@ pub(crate) mod tests {
let blockstore = Blockstore::open(&ledger_path).unwrap();
let blockstore = Arc::new(blockstore);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
blockstore.clone(),
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
@ -1822,9 +1847,11 @@ pub(crate) mod tests {
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
optimistically_confirmed_bank,
@ -1970,9 +1997,11 @@ pub(crate) mod tests {
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let mut pending_optimistically_confirmed_banks = HashSet::new();
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
1, 1,
@ -2141,9 +2170,11 @@ pub(crate) mod tests {
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let mut pending_optimistically_confirmed_banks = HashSet::new();
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
1, 1,
@ -2252,9 +2283,11 @@ pub(crate) mod tests {
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let mut pending_optimistically_confirmed_banks = HashSet::new();
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
1, 1,
@ -2440,9 +2473,11 @@ pub(crate) mod tests {
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks,
Arc::new(RwLock::new(block_commitment_cache)),
optimistically_confirmed_bank,
@ -2614,9 +2649,11 @@ pub(crate) mod tests {
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
optimistically_confirmed_bank,
@ -2659,9 +2696,11 @@ pub(crate) mod tests {
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
optimistically_confirmed_bank,
@ -2718,9 +2757,11 @@ pub(crate) mod tests {
let exit = Arc::new(AtomicBool::new(false));
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
1, 1,
@ -2913,11 +2954,13 @@ pub(crate) mod tests {
let exit = Arc::new(AtomicBool::new(false));
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
optimistically_confirmed_bank,
@ -2988,9 +3031,11 @@ pub(crate) mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100);
let bank = Bank::new_for_tests(&genesis_config);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks(
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks,
));