diff --git a/core/src/commitment.rs b/core/src/commitment.rs index 2496c2fd27..945cd878e0 100644 --- a/core/src/commitment.rs +++ b/core/src/commitment.rs @@ -1,3 +1,6 @@ +use crate::consensus::VOTE_THRESHOLD_SIZE; +use solana_measure::measure::Measure; +use solana_metrics::inc_new_counter_info; use solana_runtime::bank::Bank; use solana_sdk::clock::Slot; use solana_vote_program::{vote_state::VoteState, vote_state::MAX_LOCKOUT_HISTORY}; @@ -88,27 +91,34 @@ impl BlockCommitmentCache { self.root } - pub fn get_block_with_depth_commitment( - &self, - minimum_depth: usize, - minimum_stake_percentage: f64, - ) -> Option { - self.block_commitment - .iter() - .filter(|&(_, block_commitment)| { - let fork_stake_minimum_depth: u64 = block_commitment.commitment[minimum_depth..] - .iter() - .cloned() - .sum(); - fork_stake_minimum_depth as f64 / self.total_stake as f64 - >= minimum_stake_percentage - }) - .map(|(slot, _)| *slot) - .max() + pub fn get_confirmation_count(&self, slot: Slot) -> Option { + self.get_lockout_count(slot, VOTE_THRESHOLD_SIZE) } - pub fn get_rooted_block_with_commitment(&self, minimum_stake_percentage: f64) -> Option { - self.get_block_with_depth_commitment(MAX_LOCKOUT_HISTORY - 1, minimum_stake_percentage) + // Returns the lowest level at which at least `minimum_stake_percentage` of the total epoch + // stake is locked out + fn get_lockout_count(&self, slot: Slot, minimum_stake_percentage: f64) -> Option { + self.get_block_commitment(slot).map(|block_commitment| { + let iterator = block_commitment.commitment.iter().enumerate().rev(); + let mut sum = 0; + for (i, stake) in iterator { + sum += stake; + if (sum as f64 / self.total_stake as f64) > minimum_stake_percentage { + return i + 1; + } + } + 0 + }) + } + #[cfg(test)] + pub fn new_for_tests() -> Self { + let mut block_commitment: HashMap = HashMap::new(); + block_commitment.insert(0, BlockCommitment::default()); + Self { + block_commitment, + total_stake: 42, + ..Self::default() + } } } @@ -184,6 +194,7 @@ impl AggregateCommitmentService { continue; } + let mut aggregate_commitment_time = Measure::start("aggregate-commitment-ms"); let block_commitment = Self::aggregate_commitment(&ancestors, &aggregation_data.bank); let mut new_block_commitment = BlockCommitmentCache::new( @@ -196,6 +207,11 @@ impl AggregateCommitmentService { let mut w_block_commitment_cache = block_commitment_cache.write().unwrap(); std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment); + aggregate_commitment_time.stop(); + inc_new_counter_info!( + "aggregate-commitment-ms", + aggregate_commitment_time.as_ms() as usize + ); } } @@ -290,86 +306,31 @@ mod tests { } #[test] - fn test_get_block_with_depth_commitment() { + fn test_get_confirmations() { let bank = Arc::new(Bank::default()); // Build BlockCommitmentCache with votes at depths 0 and 1 for 2 slots let mut cache0 = BlockCommitment::default(); - cache0.increase_confirmation_stake(1, 15); - cache0.increase_confirmation_stake(2, 25); + cache0.increase_confirmation_stake(1, 5); + cache0.increase_confirmation_stake(2, 40); let mut cache1 = BlockCommitment::default(); - cache1.increase_confirmation_stake(1, 10); - cache1.increase_confirmation_stake(2, 20); + cache1.increase_confirmation_stake(1, 40); + cache1.increase_confirmation_stake(2, 5); + + let mut cache2 = BlockCommitment::default(); + cache2.increase_confirmation_stake(1, 20); + cache2.increase_confirmation_stake(2, 5); let mut block_commitment = HashMap::new(); block_commitment.entry(0).or_insert(cache0.clone()); block_commitment.entry(1).or_insert(cache1.clone()); + block_commitment.entry(2).or_insert(cache2.clone()); let block_commitment_cache = BlockCommitmentCache::new(block_commitment, 50, bank, 0); - // Neither slot has rooted votes - assert_eq!( - block_commitment_cache.get_rooted_block_with_commitment(0.1), - None - ); - // Neither slot meets the minimum level of commitment 0.6 at depth 1 - assert_eq!( - block_commitment_cache.get_block_with_depth_commitment(1, 0.6), - None - ); - // Only slot 0 meets the minimum level of commitment 0.5 at depth 1 - assert_eq!( - block_commitment_cache.get_block_with_depth_commitment(1, 0.5), - Some(0) - ); - // If multiple slots meet the minimum level of commitment, method should return the most recent - assert_eq!( - block_commitment_cache.get_block_with_depth_commitment(1, 0.4), - Some(1) - ); - // If multiple slots meet the minimum level of commitment, method should return the most recent - assert_eq!( - block_commitment_cache.get_block_with_depth_commitment(0, 0.6), - Some(1) - ); - // Neither slot meets the minimum level of commitment 0.9 at depth 0 - assert_eq!( - block_commitment_cache.get_block_with_depth_commitment(0, 0.9), - None - ); - } - - #[test] - fn test_get_rooted_block_with_commitment() { - let bank = Arc::new(Bank::default()); - // Build BlockCommitmentCache with rooted votes - let mut cache0 = BlockCommitment::new([0; MAX_LOCKOUT_HISTORY]); - cache0.increase_confirmation_stake(MAX_LOCKOUT_HISTORY, 40); - cache0.increase_confirmation_stake(MAX_LOCKOUT_HISTORY - 1, 10); - let mut cache1 = BlockCommitment::new([0; MAX_LOCKOUT_HISTORY]); - cache1.increase_confirmation_stake(MAX_LOCKOUT_HISTORY, 30); - cache1.increase_confirmation_stake(MAX_LOCKOUT_HISTORY - 1, 10); - cache1.increase_confirmation_stake(MAX_LOCKOUT_HISTORY - 2, 10); - - let mut block_commitment = HashMap::new(); - block_commitment.entry(0).or_insert(cache0.clone()); - block_commitment.entry(1).or_insert(cache1.clone()); - let block_commitment_cache = BlockCommitmentCache::new(block_commitment, 50, bank, 0); - - // Only slot 0 meets the minimum level of commitment 0.66 at root - assert_eq!( - block_commitment_cache.get_rooted_block_with_commitment(0.66), - Some(0) - ); - // If multiple slots meet the minimum level of commitment, method should return the most recent - assert_eq!( - block_commitment_cache.get_rooted_block_with_commitment(0.6), - Some(1) - ); - // Neither slot meets the minimum level of commitment 0.9 at root - assert_eq!( - block_commitment_cache.get_rooted_block_with_commitment(0.9), - None - ); + assert_eq!(block_commitment_cache.get_confirmation_count(0), Some(2)); + assert_eq!(block_commitment_cache.get_confirmation_count(1), Some(1)); + assert_eq!(block_commitment_cache.get_confirmation_count(2), Some(0),); + assert_eq!(block_commitment_cache.get_confirmation_count(3), None,); } #[test] diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 501272004a..43d5ac2186 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -136,7 +136,7 @@ impl ReplayStage { // Start the replay stage loop let (lockouts_sender, commitment_service) = - AggregateCommitmentService::new(&exit, block_commitment_cache); + AggregateCommitmentService::new(&exit, block_commitment_cache.clone()); #[allow(clippy::cognitive_complexity)] let t_replay = Builder::new() @@ -306,7 +306,7 @@ impl ReplayStage { // Vote on a fork let voted_on_different_fork = { if let Some(ref vote_bank) = vote_bank { - subscriptions.notify_subscribers(vote_bank.slot(), &bank_forks); + subscriptions.notify_subscribers(block_commitment_cache.read().unwrap().slot(), &bank_forks); if let Some(votable_leader) = leader_schedule_cache .slot_leader_at(vote_bank.slot(), Some(vote_bank)) { @@ -1904,7 +1904,10 @@ pub(crate) mod tests { ); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0)); let exit = Arc::new(AtomicBool::new(false)); - let subscriptions = Arc::new(RpcSubscriptions::new(&exit)); + let subscriptions = Arc::new(RpcSubscriptions::new( + &exit, + Arc::new(RwLock::new(BlockCommitmentCache::default())), + )); let mut bank_forks = BankForks::new(0, bank0); // Insert a non-root bank so that the propagation logic will update this diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 5c5de70b2d..cc5c8a3c6f 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -13,7 +13,7 @@ use solana_ledger::{ bank_forks::BankForks, blockstore::Blockstore, rooted_slot_iterator::RootedSlotIterator, }; use solana_perf::packet::PACKET_DATA_SIZE; -use solana_runtime::{bank::Bank, status_cache::SignatureConfirmationStatus}; +use solana_runtime::bank::Bank; use solana_sdk::{ clock::{Slot, UnixTimestamp}, commitment_config::{CommitmentConfig, CommitmentLevel}, @@ -196,11 +196,9 @@ impl JsonRpcRequestProcessor { match signature { Err(e) => Err(e), Ok(sig) => { - let status = bank.get_signature_confirmation_status(&sig); + let status = bank.get_signature_status(&sig); match status { - Some(SignatureConfirmationStatus { status, .. }) => { - new_response(bank, status.is_ok()) - } + Some(status) => new_response(bank, status.is_ok()), None => new_response(bank, false), } } @@ -409,21 +407,24 @@ impl JsonRpcRequestProcessor { let bank = self.bank(commitment); for signature in signatures { - let status = bank.get_signature_confirmation_status(&signature).map( - |SignatureConfirmationStatus { - slot, - status, - confirmations, - }| TransactionStatus { - slot, - status, - confirmations: if confirmations <= MAX_LOCKOUT_HISTORY { - Some(confirmations) - } else { + let status = bank + .get_signature_status_slot(&signature) + .map(|(slot, status)| { + let r_block_commitment_cache = self.block_commitment_cache.read().unwrap(); + + let confirmations = if r_block_commitment_cache.root() >= slot { None - }, - }, - ); + } else { + r_block_commitment_cache + .get_confirmation_count(slot) + .or(Some(0)) + }; + TransactionStatus { + slot, + status, + confirmations, + } + }); statuses.push(status); } Ok(Response { @@ -1237,8 +1238,10 @@ pub mod tests { blockstore.clone(), ); - let commitment_slot0 = BlockCommitment::new([8; MAX_LOCKOUT_HISTORY]); - let commitment_slot1 = BlockCommitment::new([9; MAX_LOCKOUT_HISTORY]); + let mut commitment_slot0 = BlockCommitment::default(); + commitment_slot0.increase_confirmation_stake(2, 9); + let mut commitment_slot1 = BlockCommitment::default(); + commitment_slot1.increase_confirmation_stake(1, 9); let mut block_commitment: HashMap = HashMap::new(); block_commitment .entry(0) @@ -1248,7 +1251,7 @@ pub mod tests { .or_insert(commitment_slot1.clone()); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::new( block_commitment, - 42, + 10, bank.clone(), 0, ))); @@ -1774,7 +1777,9 @@ pub mod tests { let result: Option = serde_json::from_value(json["result"]["value"][0].clone()) .expect("actual response deserialization"); - assert_eq!(expected_res, result.as_ref().unwrap().status); + let result = result.as_ref().unwrap(); + assert_eq!(expected_res, result.status); + assert_eq!(None, result.confirmations); // Test getSignatureStatus request on unprocessed tx let tx = system_transaction::transfer(&alice, &bob_pubkey, 10, blockhash); @@ -2211,7 +2216,7 @@ pub mod tests { .get_block_commitment(0) .map(|block_commitment| block_commitment.commitment) ); - assert_eq!(total_stake, 42); + assert_eq!(total_stake, 10); let req = format!(r#"{{"jsonrpc":"2.0","id":1,"method":"getBlockCommitment","params":[2]}}"#); @@ -2229,7 +2234,7 @@ pub mod tests { panic!("Expected single response"); }; assert_eq!(commitment_response.commitment, None); - assert_eq!(commitment_response.total_stake, 42); + assert_eq!(commitment_response.total_stake, 10); } #[test] diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index 89e8ace666..79b31a2c51 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -312,7 +312,10 @@ impl RpcSolPubSub for RpcSolPubSubImpl { #[cfg(test)] mod tests { use super::*; - use crate::rpc_subscriptions::tests::robust_poll_or_panic; + use crate::{ + commitment::{BlockCommitment, BlockCommitmentCache}, + rpc_subscriptions::tests::robust_poll_or_panic, + }; use jsonrpc_core::{futures::sync::mpsc, Response}; use jsonrpc_pubsub::{PubSubHandler, Session}; use solana_budget_program::{self, budget_instruction}; @@ -325,7 +328,12 @@ mod tests { system_program, system_transaction, transaction::{self, Transaction}, }; - use std::{sync::RwLock, thread::sleep, time::Duration}; + use std::{ + collections::HashMap, + sync::{atomic::AtomicBool, RwLock}, + thread::sleep, + time::Duration, + }; fn process_transaction_and_notify( bank_forks: &Arc>, @@ -358,8 +366,13 @@ mod tests { let bank = Bank::new(&genesis_config); let blockhash = bank.last_blockhash(); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); - - let rpc = RpcSolPubSubImpl::default(); + let rpc = RpcSolPubSubImpl { + subscriptions: Arc::new(RpcSubscriptions::new( + &Arc::new(AtomicBool::new(false)), + Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + )), + ..RpcSolPubSubImpl::default() + }; // Test signature subscriptions let tx = system_transaction::transfer(&alice, &bob_pubkey, 20, blockhash); @@ -457,7 +470,13 @@ mod tests { let blockhash = bank.last_blockhash(); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); - let rpc = RpcSolPubSubImpl::default(); + let rpc = RpcSolPubSubImpl { + subscriptions: Arc::new(RpcSubscriptions::new( + &Arc::new(AtomicBool::new(false)), + Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + )), + ..RpcSolPubSubImpl::default() + }; let session = create_session(); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); rpc.account_subscribe( @@ -591,7 +610,13 @@ mod tests { let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let bob = Keypair::new(); - let rpc = RpcSolPubSubImpl::default(); + let mut rpc = RpcSolPubSubImpl::default(); + let exit = Arc::new(AtomicBool::new(false)); + let subscriptions = RpcSubscriptions::new( + &exit, + Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + ); + rpc.subscriptions = Arc::new(subscriptions); let session = create_session(); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); rpc.account_subscribe(session, subscriber, bob.pubkey().to_string(), Some(2)); @@ -622,7 +647,12 @@ mod tests { let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let bob = Keypair::new(); - let rpc = RpcSolPubSubImpl::default(); + let mut rpc = RpcSolPubSubImpl::default(); + let exit = Arc::new(AtomicBool::new(false)); + let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())); + + let subscriptions = RpcSubscriptions::new(&exit, block_commitment_cache.clone()); + rpc.subscriptions = Arc::new(subscriptions); let session = create_session(); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); rpc.account_subscribe(session, subscriber, bob.pubkey().to_string(), Some(2)); @@ -640,10 +670,32 @@ mod tests { let bank0 = bank_forks.read().unwrap()[0].clone(); let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); bank_forks.write().unwrap().insert(bank1); - rpc.subscriptions.notify_subscribers(1, &bank_forks); let bank1 = bank_forks.read().unwrap()[1].clone(); + + let mut cache0 = BlockCommitment::default(); + cache0.increase_confirmation_stake(1, 10); + let mut block_commitment = HashMap::new(); + block_commitment.entry(0).or_insert(cache0.clone()); + let mut new_block_commitment = + BlockCommitmentCache::new(block_commitment, 10, bank1.clone(), 0); + let mut w_block_commitment_cache = block_commitment_cache.write().unwrap(); + std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment); + drop(w_block_commitment_cache); + + rpc.subscriptions.notify_subscribers(1, &bank_forks); let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2); bank_forks.write().unwrap().insert(bank2); + let bank2 = bank_forks.read().unwrap()[2].clone(); + + let mut cache0 = BlockCommitment::default(); + cache0.increase_confirmation_stake(2, 10); + let mut block_commitment = HashMap::new(); + block_commitment.entry(0).or_insert(cache0.clone()); + let mut new_block_commitment = BlockCommitmentCache::new(block_commitment, 10, bank2, 0); + let mut w_block_commitment_cache = block_commitment_cache.write().unwrap(); + std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment); + drop(w_block_commitment_cache); + rpc.subscriptions.notify_subscribers(2, &bank_forks); let expected = json!({ "jsonrpc": "2.0", diff --git a/core/src/rpc_pubsub_service.rs b/core/src/rpc_pubsub_service.rs index 98b80ee93b..e58bdc775c 100644 --- a/core/src/rpc_pubsub_service.rs +++ b/core/src/rpc_pubsub_service.rs @@ -1,14 +1,20 @@ //! The `pubsub` module implements a threaded subscription service on client RPC request -use crate::rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl}; -use crate::rpc_subscriptions::RpcSubscriptions; +use crate::{ + rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl}, + rpc_subscriptions::RpcSubscriptions, +}; use jsonrpc_pubsub::{PubSubHandler, Session}; use jsonrpc_ws_server::{RequestContext, ServerBuilder}; -use std::net::SocketAddr; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use std::thread::{self, sleep, Builder, JoinHandle}; -use std::time::Duration; +use std::{ + net::SocketAddr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{self, sleep, Builder, JoinHandle}, + time::Duration, +}; pub struct PubSubService { thread_hdl: JoinHandle<()>, @@ -66,13 +72,20 @@ impl PubSubService { #[cfg(test)] mod tests { use super::*; - use std::net::{IpAddr, Ipv4Addr}; + use crate::commitment::BlockCommitmentCache; + use std::{ + net::{IpAddr, Ipv4Addr}, + sync::RwLock, + }; #[test] fn test_pubsub_new() { let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); let exit = Arc::new(AtomicBool::new(false)); - let subscriptions = Arc::new(RpcSubscriptions::new(&exit)); + let subscriptions = Arc::new(RpcSubscriptions::new( + &exit, + Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + )); let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit); let thread = pubsub_service.thread_hdl.thread(); assert_eq!(thread.name().unwrap(), "solana-pubsub"); diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index ad7368f3a9..889630a951 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -1,5 +1,6 @@ //! The `pubsub` module implements a threaded subscription service on client RPC request +use crate::commitment::BlockCommitmentCache; use core::hash::Hash; use jsonrpc_core::futures::Future; use jsonrpc_pubsub::{ @@ -14,11 +15,14 @@ use solana_sdk::{ account::Account, clock::Slot, pubkey::Pubkey, signature::Signature, transaction, }; use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{Receiver, RecvTimeoutError, SendError, Sender}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::{Receiver, RecvTimeoutError, SendError, Sender}, +}; use std::thread::{Builder, JoinHandle}; use std::time::Duration; use std::{ + cmp::min, collections::{HashMap, HashSet}, iter, sync::{Arc, Mutex, RwLock}, @@ -80,11 +84,7 @@ fn add_subscription( { let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let confirmations = confirmations.unwrap_or(0); - let confirmations = if confirmations > MAX_LOCKOUT_HISTORY { - MAX_LOCKOUT_HISTORY - } else { - confirmations - }; + let confirmations = min(confirmations, MAX_LOCKOUT_HISTORY + 1); if let Some(current_hashmap) = subscriptions.get_mut(&hashmap_key) { current_hashmap.insert(sub_id, (sink, confirmations)); return; @@ -120,8 +120,8 @@ where fn check_confirmations_and_notify( subscriptions: &HashMap>, Confirmations)>>, hashmap_key: &K, - current_slot: Slot, bank_forks: &Arc>, + block_commitment_cache: &Arc>, bank_method: B, filter_results: F, notifier: &RpcNotifier, @@ -133,6 +133,10 @@ where F: Fn(X, u64) -> Box>, X: Clone + Serialize, { + let mut confirmation_slots: HashMap = HashMap::new(); + let r_block_commitment_cache = block_commitment_cache.read().unwrap(); + let current_slot = r_block_commitment_cache.slot(); + let root = r_block_commitment_cache.root(); let current_ancestors = bank_forks .read() .unwrap() @@ -140,25 +144,24 @@ where .unwrap() .ancestors .clone(); + for (slot, _) in current_ancestors.iter() { + if let Some(confirmations) = r_block_commitment_cache.get_confirmation_count(*slot) { + confirmation_slots.entry(confirmations).or_insert(*slot); + } + } + drop(r_block_commitment_cache); let mut notified_set: HashSet = HashSet::new(); if let Some(hashmap) = subscriptions.get(hashmap_key) { for (sub_id, (sink, confirmations)) in hashmap.iter() { - let desired_slot: Vec = current_ancestors - .iter() - .filter(|(_, &v)| v == *confirmations) - .map(|(k, _)| k) - .cloned() - .collect(); - let root: Vec = current_ancestors - .iter() - .filter(|(_, &v)| v == 32) - .map(|(k, _)| k) - .cloned() - .collect(); - let root = if root.len() == 1 { root[0] } else { 0 }; - if desired_slot.len() == 1 { - let slot = desired_slot[0]; + let desired_slot = if *confirmations == 0 { + Some(¤t_slot) + } else if *confirmations == MAX_LOCKOUT_HISTORY + 1 { + Some(&root) + } else { + confirmation_slots.get(confirmations) + }; + if let Some(&slot) = desired_slot { let results = { let bank_forks = bank_forks.read().unwrap(); let desired_bank = bank_forks.get(slot).unwrap(); @@ -239,7 +242,10 @@ pub struct RpcSubscriptions { impl Default for RpcSubscriptions { fn default() -> Self { - Self::new(&Arc::new(AtomicBool::new(false))) + Self::new( + &Arc::new(AtomicBool::new(false)), + Arc::new(RwLock::new(BlockCommitmentCache::default())), + ) } } @@ -252,7 +258,10 @@ impl Drop for RpcSubscriptions { } impl RpcSubscriptions { - pub fn new(exit: &Arc) -> Self { + pub fn new( + exit: &Arc, + block_commitment_cache: Arc>, + ) -> Self { let (notification_sender, notification_receiver): ( Sender, Receiver, @@ -291,6 +300,7 @@ impl RpcSubscriptions { signature_subscriptions_clone, slot_subscriptions_clone, root_subscriptions_clone, + block_commitment_cache, ); }) .unwrap(); @@ -310,8 +320,8 @@ impl RpcSubscriptions { fn check_account( pubkey: &Pubkey, - current_slot: Slot, bank_forks: &Arc>, + block_commitment_cache: &Arc>, account_subscriptions: Arc, notifier: &RpcNotifier, ) { @@ -319,8 +329,8 @@ impl RpcSubscriptions { check_confirmations_and_notify( &subscriptions, pubkey, - current_slot, bank_forks, + block_commitment_cache, Bank::get_account_modified_since_parent, filter_account_result, notifier, @@ -329,8 +339,8 @@ impl RpcSubscriptions { fn check_program( program_id: &Pubkey, - current_slot: Slot, bank_forks: &Arc>, + block_commitment_cache: &Arc>, program_subscriptions: Arc, notifier: &RpcNotifier, ) { @@ -338,8 +348,8 @@ impl RpcSubscriptions { check_confirmations_and_notify( &subscriptions, program_id, - current_slot, bank_forks, + block_commitment_cache, Bank::get_program_accounts_modified_since_parent, filter_program_results, notifier, @@ -348,8 +358,8 @@ impl RpcSubscriptions { fn check_signature( signature: &Signature, - current_slot: Slot, bank_forks: &Arc>, + block_commitment_cache: &Arc>, signature_subscriptions: Arc, notifier: &RpcNotifier, ) { @@ -357,8 +367,8 @@ impl RpcSubscriptions { let notified_ids = check_confirmations_and_notify( &subscriptions, signature, - current_slot, bank_forks, + block_commitment_cache, Bank::get_signature_status_processed_since_parent, filter_signature_result, notifier, @@ -502,6 +512,7 @@ impl RpcSubscriptions { signature_subscriptions: Arc, slot_subscriptions: Arc, root_subscriptions: Arc, + block_commitment_cache: Arc>, ) { loop { if exit.load(Ordering::Relaxed) { @@ -521,7 +532,7 @@ impl RpcSubscriptions { notifier.notify(root, sink); } } - NotificationEntry::Bank((current_slot, bank_forks)) => { + NotificationEntry::Bank((_current_slot, bank_forks)) => { let pubkeys: Vec<_> = { let subs = account_subscriptions.read().unwrap(); subs.keys().cloned().collect() @@ -529,8 +540,8 @@ impl RpcSubscriptions { for pubkey in &pubkeys { Self::check_account( pubkey, - current_slot, &bank_forks, + &block_commitment_cache, account_subscriptions.clone(), ¬ifier, ); @@ -543,8 +554,8 @@ impl RpcSubscriptions { for program_id in &programs { Self::check_program( program_id, - current_slot, &bank_forks, + &block_commitment_cache, program_subscriptions.clone(), ¬ifier, ); @@ -557,8 +568,8 @@ impl RpcSubscriptions { for signature in &signatures { Self::check_signature( signature, - current_slot, &bank_forks, + &block_commitment_cache, signature_subscriptions.clone(), ¬ifier, ); @@ -599,6 +610,7 @@ impl RpcSubscriptions { #[cfg(test)] pub(crate) mod tests { use super::*; + use crate::commitment::BlockCommitment; use jsonrpc_core::futures::{self, stream::Stream}; use jsonrpc_pubsub::typed::Subscriber; use solana_budget_program; @@ -666,7 +678,10 @@ pub(crate) mod tests { Subscriber::new_test("accountNotification"); let sub_id = SubscriptionId::Number(0 as u64); let exit = Arc::new(AtomicBool::new(false)); - let subscriptions = RpcSubscriptions::new(&exit); + let subscriptions = RpcSubscriptions::new( + &exit, + Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + ); subscriptions.add_account_subscription(alice.pubkey(), None, sub_id.clone(), subscriber); assert!(subscriptions @@ -735,7 +750,10 @@ pub(crate) mod tests { Subscriber::new_test("programNotification"); let sub_id = SubscriptionId::Number(0 as u64); let exit = Arc::new(AtomicBool::new(false)); - let subscriptions = RpcSubscriptions::new(&exit); + let subscriptions = RpcSubscriptions::new( + &exit, + Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + ); subscriptions.add_program_subscription( solana_budget_program::id(), None, @@ -815,27 +833,41 @@ pub(crate) mod tests { .unwrap() .process_transaction(&processed_tx) .unwrap(); + let bank1 = bank_forks[1].clone(); let bank_forks = Arc::new(RwLock::new(bank_forks)); - let exit = Arc::new(AtomicBool::new(false)); - let subscriptions = RpcSubscriptions::new(&exit); + let mut cache0 = BlockCommitment::default(); + cache0.increase_confirmation_stake(1, 10); + let cache1 = BlockCommitment::default(); - let (past_bank_sub, _id_receiver, past_bank_recv) = + let mut block_commitment = HashMap::new(); + block_commitment.entry(0).or_insert(cache0.clone()); + block_commitment.entry(1).or_insert(cache1.clone()); + let block_commitment_cache = BlockCommitmentCache::new(block_commitment, 10, bank1, 0); + + let exit = Arc::new(AtomicBool::new(false)); + let subscriptions = + RpcSubscriptions::new(&exit, Arc::new(RwLock::new(block_commitment_cache))); + + let (past_bank_sub1, _id_receiver, past_bank_recv1) = + Subscriber::new_test("signatureNotification"); + let (past_bank_sub2, _id_receiver, past_bank_recv2) = Subscriber::new_test("signatureNotification"); let (processed_sub, _id_receiver, processed_recv) = Subscriber::new_test("signatureNotification"); + subscriptions.add_signature_subscription( past_bank_tx.signatures[0], Some(0), SubscriptionId::Number(1 as u64), - Subscriber::new_test("signatureNotification").0, + past_bank_sub1, ); subscriptions.add_signature_subscription( past_bank_tx.signatures[0], Some(1), SubscriptionId::Number(2 as u64), - past_bank_sub, + past_bank_sub2, ); subscriptions.add_signature_subscription( processed_tx.signatures[0], @@ -860,41 +892,46 @@ pub(crate) mod tests { subscriptions.notify_subscribers(1, &bank_forks); let expected_res: Option> = Some(Ok(())); - let expected = json!({ - "jsonrpc": "2.0", - "method": "signatureNotification", - "params": { - "result": { - "context": { "slot": 0 }, - "value": expected_res, - }, - "subscription": 2, - } - }); - let (response, _) = robust_poll_or_panic(past_bank_recv); - assert_eq!(serde_json::to_string(&expected).unwrap(), response); + struct Notification { + slot: Slot, + id: u64, + } - let expected = json!({ - "jsonrpc": "2.0", - "method": "signatureNotification", - "params": { - "result": { - "context": { "slot": 1 }, - "value": expected_res, - }, - "subscription": 3, - } - }); + let expected_notification = |exp: Notification| -> String { + let json = json!({ + "jsonrpc": "2.0", + "method": "signatureNotification", + "params": { + "result": { + "context": { "slot": exp.slot }, + "value": &expected_res, + }, + "subscription": exp.id, + } + }); + serde_json::to_string(&json).unwrap() + }; + + // Expect to receive a notification from bank 1 because this subscription is + // looking for 0 confirmations and so checks the current bank + let expected = expected_notification(Notification { slot: 1, id: 1 }); + let (response, _) = robust_poll_or_panic(past_bank_recv1); + assert_eq!(expected, response); + + // Expect to receive a notification from bank 0 because this subscription is + // looking for 1 confirmation and so checks the past bank + let expected = expected_notification(Notification { slot: 0, id: 2 }); + let (response, _) = robust_poll_or_panic(past_bank_recv2); + assert_eq!(expected, response); + + let expected = expected_notification(Notification { slot: 1, id: 3 }); let (response, _) = robust_poll_or_panic(processed_recv); - assert_eq!(serde_json::to_string(&expected).unwrap(), response); - - let sig_subs = subscriptions.signature_subscriptions.read().unwrap(); + assert_eq!(expected, response); // Subscription should be automatically removed after notification + let sig_subs = subscriptions.signature_subscriptions.read().unwrap(); assert!(!sig_subs.contains_key(&processed_tx.signatures[0])); - - // Only one notification is expected for signature processed in previous bank - assert_eq!(sig_subs.get(&past_bank_tx.signatures[0]).unwrap().len(), 1); + assert!(!sig_subs.contains_key(&past_bank_tx.signatures[0])); // Unprocessed signature subscription should not be removed assert_eq!( @@ -909,7 +946,10 @@ pub(crate) mod tests { Subscriber::new_test("slotNotification"); let sub_id = SubscriptionId::Number(0 as u64); let exit = Arc::new(AtomicBool::new(false)); - let subscriptions = RpcSubscriptions::new(&exit); + let subscriptions = RpcSubscriptions::new( + &exit, + Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + ); subscriptions.add_slot_subscription(sub_id.clone(), subscriber); assert!(subscriptions @@ -947,7 +987,10 @@ pub(crate) mod tests { Subscriber::new_test("rootNotification"); let sub_id = SubscriptionId::Number(0 as u64); let exit = Arc::new(AtomicBool::new(false)); - let subscriptions = RpcSubscriptions::new(&exit); + let subscriptions = RpcSubscriptions::new( + &exit, + Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + ); subscriptions.add_root_subscription(sub_id.clone(), subscriber); assert!(subscriptions diff --git a/core/src/storage_stage.rs b/core/src/storage_stage.rs index 0e174f6303..4b4c25d956 100644 --- a/core/src/storage_stage.rs +++ b/core/src/storage_stage.rs @@ -4,6 +4,7 @@ use crate::{ cluster_info::ClusterInfo, + commitment::BlockCommitmentCache, contact_info::ContactInfo, result::{Error, Result}, }; @@ -11,9 +12,7 @@ use rand::{Rng, SeedableRng}; use rand_chacha::ChaChaRng; use solana_chacha_cuda::chacha_cuda::chacha_cbc_encrypt_file_many_keys; use solana_ledger::{bank_forks::BankForks, blockstore::Blockstore}; -use solana_runtime::{ - bank::Bank, status_cache::SignatureConfirmationStatus, storage_utils::archiver_accounts, -}; +use solana_runtime::{bank::Bank, storage_utils::archiver_accounts}; use solana_sdk::{ account::Account, account_utils::StateMut, @@ -30,6 +29,7 @@ use solana_storage_program::{ storage_instruction, storage_instruction::proof_validation, }; +use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY; use std::{ cmp, collections::HashMap, @@ -185,6 +185,7 @@ impl StorageStage { exit: &Arc, bank_forks: &Arc>, cluster_info: &Arc>, + block_commitment_cache: Arc>, ) -> Self { let (instruction_sender, instruction_receiver) = channel(); @@ -256,6 +257,7 @@ impl StorageStage { &keypair, &storage_keypair, &transactions_socket, + &block_commitment_cache, ) .unwrap_or_else(|err| { info!("failed to send storage transaction: {:?}", err) @@ -289,6 +291,7 @@ impl StorageStage { keypair: &Arc, storage_keypair: &Arc, transactions_socket: &UdpSocket, + block_commitment_cache: &Arc>, ) -> io::Result<()> { let working_bank = bank_forks.read().unwrap().working_bank(); let blockhash = working_bank.confirmed_last_blockhash().0; @@ -323,8 +326,13 @@ impl StorageStage { cluster_info.read().unwrap().my_data().tpu, )?; sleep(Duration::from_millis(100)); - if Self::poll_for_signature_confirmation(bank_forks, &transaction.signatures[0], 0) - .is_ok() + if Self::poll_for_signature_confirmation( + bank_forks, + block_commitment_cache, + &transaction.signatures[0], + 0, + ) + .is_ok() { break; }; @@ -334,23 +342,24 @@ impl StorageStage { fn poll_for_signature_confirmation( bank_forks: &Arc>, + block_commitment_cache: &Arc>, signature: &Signature, min_confirmed_blocks: usize, ) -> Result<()> { let mut now = Instant::now(); let mut confirmed_blocks = 0; loop { - let response = bank_forks - .read() - .unwrap() - .working_bank() - .get_signature_confirmation_status(signature); - if let Some(SignatureConfirmationStatus { - confirmations, - status, - .. - }) = response - { + let working_bank = bank_forks.read().unwrap().working_bank(); + let response = working_bank.get_signature_status_slot(signature); + if let Some((slot, status)) = response { + let confirmations = if working_bank.src.roots().contains(&slot) { + MAX_LOCKOUT_HISTORY + 1 + } else { + let r_block_commitment_cache = block_commitment_cache.read().unwrap(); + r_block_commitment_cache + .get_confirmation_count(slot) + .unwrap_or(0) + }; if status.is_ok() { if confirmed_blocks != confirmations { now = Instant::now(); @@ -655,12 +664,18 @@ mod tests { use rayon::prelude::*; use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}; use solana_runtime::bank::Bank; - use solana_sdk::hash::Hasher; - use solana_sdk::signature::{Keypair, Signer}; - use std::cmp::{max, min}; - use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; - use std::sync::mpsc::channel; - use std::sync::{Arc, RwLock}; + use solana_sdk::{ + hash::Hasher, + signature::{Keypair, Signer}, + }; + use std::{ + cmp::{max, min}, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + mpsc::channel, + Arc, RwLock, + }, + }; #[test] fn test_storage_stage_none_ledger() { @@ -675,6 +690,7 @@ mod tests { &[bank.clone()], vec![0], ))); + let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let (_slot_sender, slot_receiver) = channel(); let storage_state = StorageState::new( &bank.last_blockhash(), @@ -690,6 +706,7 @@ mod tests { &exit.clone(), &bank_forks, &cluster_info, + block_commitment_cache, ); exit.store(true, Ordering::Relaxed); storage_stage.join().unwrap(); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 5bf98e4a75..806067bfad 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -185,7 +185,7 @@ impl Tvu { leader_schedule_cache: leader_schedule_cache.clone(), latest_root_senders: vec![ledger_cleanup_slot_sender], accounts_hash_sender: Some(accounts_hash_sender), - block_commitment_cache, + block_commitment_cache: block_commitment_cache.clone(), transaction_status_sender, rewards_recorder_sender, }; @@ -221,6 +221,7 @@ impl Tvu { &exit, &bank_forks, &cluster_info, + block_commitment_cache, ); Tvu { @@ -307,7 +308,10 @@ pub mod tests { blockstore, &StorageState::default(), l_receiver, - &Arc::new(RpcSubscriptions::new(&exit)), + &Arc::new(RpcSubscriptions::new( + &exit, + Arc::new(RwLock::new(BlockCommitmentCache::default())), + )), &poh_recorder, &leader_schedule_cache, &exit, diff --git a/core/src/validator.rs b/core/src/validator.rs index f698dca7ec..cec231afc1 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -234,7 +234,7 @@ impl Validator { let blockstore = Arc::new(blockstore); - let subscriptions = Arc::new(RpcSubscriptions::new(&exit)); + let subscriptions = Arc::new(RpcSubscriptions::new(&exit, block_commitment_cache.clone())); let rpc_service = config.rpc_ports.map(|(rpc_port, rpc_pubsub_port)| { if ContactInfo::is_valid_address(&node.info.rpc) { diff --git a/core/tests/client.rs b/core/tests/client.rs index 73cb9e201b..7b1d9645df 100644 --- a/core/tests/client.rs +++ b/core/tests/client.rs @@ -3,8 +3,8 @@ use solana_client::{ rpc_client::RpcClient, }; use solana_core::{ - rpc_pubsub_service::PubSubService, rpc_subscriptions::RpcSubscriptions, - validator::TestValidator, + commitment::BlockCommitmentCache, rpc_pubsub_service::PubSubService, + rpc_subscriptions::RpcSubscriptions, validator::TestValidator, }; use solana_sdk::{ commitment_config::CommitmentConfig, pubkey::Pubkey, rpc_port, signature::Signer, @@ -15,7 +15,7 @@ use std::{ net::{IpAddr, SocketAddr}, sync::{ atomic::{AtomicBool, Ordering}, - Arc, + Arc, RwLock, }, thread::sleep, time::{Duration, Instant}, @@ -85,7 +85,10 @@ fn test_slot_subscription() { rpc_port::DEFAULT_RPC_PUBSUB_PORT, ); let exit = Arc::new(AtomicBool::new(false)); - let subscriptions = Arc::new(RpcSubscriptions::new(&exit)); + let subscriptions = Arc::new(RpcSubscriptions::new( + &exit, + Arc::new(RwLock::new(BlockCommitmentCache::default())), + )); let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit); std::thread::sleep(Duration::from_millis(400)); diff --git a/core/tests/storage_stage.rs b/core/tests/storage_stage.rs index 73295f078f..b6c936fc10 100644 --- a/core/tests/storage_stage.rs +++ b/core/tests/storage_stage.rs @@ -3,28 +3,36 @@ #[cfg(test)] mod tests { use log::*; - use solana_core::storage_stage::{test_cluster_info, SLOTS_PER_TURN_TEST}; - use solana_core::storage_stage::{StorageStage, StorageState}; - use solana_ledger::bank_forks::BankForks; - use solana_ledger::blockstore_processor; - use solana_ledger::entry; - use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}; - use solana_ledger::{blockstore::Blockstore, create_new_tmp_ledger}; + use solana_core::{ + commitment::BlockCommitmentCache, + storage_stage::{test_cluster_info, StorageStage, StorageState, SLOTS_PER_TURN_TEST}, + }; + use solana_ledger::{ + bank_forks::BankForks, + blockstore::Blockstore, + blockstore_processor, create_new_tmp_ledger, entry, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + }; use solana_runtime::bank::Bank; - use solana_sdk::clock::DEFAULT_TICKS_PER_SLOT; - use solana_sdk::hash::Hash; - use solana_sdk::message::Message; - use solana_sdk::pubkey::Pubkey; - use solana_sdk::signature::{Keypair, Signer}; - use solana_sdk::transaction::Transaction; - use solana_storage_program::storage_instruction; - use solana_storage_program::storage_instruction::StorageAccountType; - use std::fs::remove_dir_all; - use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::mpsc::channel; - use std::sync::{Arc, RwLock}; - use std::thread::sleep; - use std::time::Duration; + use solana_sdk::{ + clock::DEFAULT_TICKS_PER_SLOT, + hash::Hash, + message::Message, + pubkey::Pubkey, + signature::{Keypair, Signer}, + transaction::Transaction, + }; + use solana_storage_program::storage_instruction::{self, StorageAccountType}; + use std::{ + fs::remove_dir_all, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::channel, + Arc, RwLock, + }, + thread::sleep, + time::Duration, + }; #[test] fn test_storage_stage_process_account_proofs() { @@ -52,6 +60,7 @@ mod tests { &[bank.clone()], vec![0], ))); + let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let cluster_info = test_cluster_info(&keypair.pubkey()); let (bank_sender, bank_receiver) = channel(); @@ -69,6 +78,7 @@ mod tests { &exit.clone(), &bank_forks, &cluster_info, + block_commitment_cache, ); bank_sender.send(vec![bank.clone()]).unwrap(); @@ -171,6 +181,7 @@ mod tests { &[bank.clone()], vec![0], ))); + let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let cluster_info = test_cluster_info(&keypair.pubkey()); let (bank_sender, bank_receiver) = channel(); @@ -188,6 +199,7 @@ mod tests { &exit.clone(), &bank_forks, &cluster_info, + block_commitment_cache, ); bank_sender.send(vec![bank.clone()]).unwrap(); diff --git a/docs/src/apps/jsonrpc-api.md b/docs/src/apps/jsonrpc-api.md index aa3d09fb98..7008c3ca3a 100644 --- a/docs/src/apps/jsonrpc-api.md +++ b/docs/src/apps/jsonrpc-api.md @@ -118,7 +118,7 @@ Many methods that take a commitment parameter return an RpcResponse JSON object ### confirmTransaction -Returns a transaction receipt +Returns a transaction receipt. This method only searches the recent status cache of signatures, which retains all active slots plus `MAX_RECENT_BLOCKHASHES` rooted slots. #### Parameters: @@ -656,14 +656,13 @@ curl -X POST -H "Content-Type: application/json" -d '{"jsonrpc":"2.0","id":1, "m ### getSignatureStatus -Returns the status of a given signature. This method is similar to [confirmTransaction](jsonrpc-api.md#confirmtransaction) but provides more resolution for error events. +Returns the status of a given signature. This method is similar to [confirmTransaction](jsonrpc-api.md#confirmtransaction) but provides more resolution for error events. This method only searches the recent status cache of signatures, which retains all active slots plus `MAX_RECENT_BLOCKHASHES` rooted slots. #### Parameters: * `` - An array of transaction signatures to confirm, as base-58 encoded strings * `` - (optional) Extended Rpc configuration, containing the following optional fields: * `commitment: ` - [Commitment](jsonrpc-api.md#configuring-state-commitment) - * `searchTransactionHistory: ` - whether to search the ledger transaction status cache, which may be expensive #### Results: diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index f243c83574..b0a6bfbc55 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -14,7 +14,7 @@ use crate::{ deserialize_atomicbool, deserialize_atomicu64, serialize_atomicbool, serialize_atomicu64, }, stakes::Stakes, - status_cache::{SignatureConfirmationStatus, SlotDelta, StatusCache}, + status_cache::{SlotDelta, StatusCache}, storage_utils, storage_utils::StorageAccounts, system_instruction_processor::{get_system_account_kind, SystemAccountKind}, @@ -1845,29 +1845,25 @@ impl Bank { &self, signature: &Signature, ) -> Option> { - if let Some(status) = self.get_signature_confirmation_status(signature) { - if status.slot == self.slot() { - return Some(status.status); + if let Some((slot, status)) = self.get_signature_status_slot(signature) { + if slot <= self.slot() { + return Some(status); } } None } - pub fn get_signature_confirmation_status( - &self, - signature: &Signature, - ) -> Option>> { + pub fn get_signature_status_slot(&self, signature: &Signature) -> Option<(Slot, Result<()>)> { let rcache = self.src.status_cache.read().unwrap(); - rcache.get_signature_status_slow(signature, &self.ancestors) + rcache.get_signature_slot(signature, &self.ancestors) } pub fn get_signature_status(&self, signature: &Signature) -> Option> { - self.get_signature_confirmation_status(signature) - .map(|v| v.status) + self.get_signature_status_slot(signature).map(|v| v.1) } pub fn has_signature(&self, signature: &Signature) -> bool { - self.get_signature_confirmation_status(signature).is_some() + self.get_signature_status_slot(signature).is_some() } /// Hash the `accounts` HashMap. This represents a validator's interpretation diff --git a/runtime/src/bank_client.rs b/runtime/src/bank_client.rs index 9346359c15..fa9731c2e4 100644 --- a/runtime/src/bank_client.rs +++ b/runtime/src/bank_client.rs @@ -1,4 +1,4 @@ -use crate::{bank::Bank, status_cache::SignatureConfirmationStatus}; +use crate::bank::Bank; use solana_sdk::{ account::Account, client::{AsyncClient, Client, SyncClient}, @@ -184,26 +184,15 @@ impl SyncClient for BankClient { signature: &Signature, min_confirmed_blocks: usize, ) -> Result { - let mut now = Instant::now(); - let mut confirmed_blocks = 0; + // https://github.com/solana-labs/solana/issues/7199 + assert_eq!(min_confirmed_blocks, 1, "BankClient cannot observe the passage of multiple blocks, so min_confirmed_blocks must be 1"); + let now = Instant::now(); + let confirmed_blocks; loop { - let response = self.bank.get_signature_confirmation_status(signature); - if let Some(SignatureConfirmationStatus { - confirmations, - status, - .. - }) = response - { - if status.is_ok() { - if confirmed_blocks != confirmations { - now = Instant::now(); - confirmed_blocks = confirmations; - } - if confirmations >= min_confirmed_blocks { - break; - } - } - }; + if self.bank.get_signature_status(signature).is_some() { + confirmed_blocks = 1; + break; + } if now.elapsed().as_secs() > 15 { return Err(TransportError::IoError(io::Error::new( io::ErrorKind::Other, diff --git a/runtime/src/status_cache.rs b/runtime/src/status_cache.rs index d7c0961209..b6ae088965 100644 --- a/runtime/src/status_cache.rs +++ b/runtime/src/status_cache.rs @@ -103,29 +103,20 @@ impl StatusCache { None } - pub fn get_signature_status_slow( + pub fn get_signature_slot( &self, - sig: &Signature, + signature: &Signature, ancestors: &HashMap, - ) -> Option> { - trace!("get_signature_status_slow"); + ) -> Option<(Slot, T)> { let mut keys = vec![]; let mut val: Vec<_> = self.cache.iter().map(|(k, _)| *k).collect(); keys.append(&mut val); for blockhash in keys.iter() { - trace!("get_signature_status_slow: trying {}", blockhash); - if let Some((forkid, res)) = self.get_signature_status(sig, blockhash, ancestors) { - trace!("get_signature_status_slow: got {}", forkid); - let confirmations = ancestors - .get(&forkid) - .copied() - .unwrap_or_else(|| ancestors.len()); - return Some(SignatureConfirmationStatus { - slot: forkid, - confirmations, - status: res, - }); + trace!("get_signature_slot: trying {}", blockhash); + let status = self.get_signature_status(signature, blockhash, ancestors); + if status.is_some() { + return status; } } None @@ -265,10 +256,7 @@ mod tests { status_cache.get_signature_status(&sig, &blockhash, &HashMap::new()), None ); - assert_eq!( - status_cache.get_signature_status_slow(&sig, &HashMap::new()), - None - ); + assert_eq!(status_cache.get_signature_slot(&sig, &HashMap::new()), None); } #[test] @@ -283,12 +271,8 @@ mod tests { Some((0, ())) ); assert_eq!( - status_cache.get_signature_status_slow(&sig, &ancestors), - Some(SignatureConfirmationStatus { - slot: 0, - confirmations: 1, - status: () - }) + status_cache.get_signature_slot(&sig, &ancestors), + Some((0, ())) ); } @@ -303,10 +287,7 @@ mod tests { status_cache.get_signature_status(&sig, &blockhash, &ancestors), None ); - assert_eq!( - status_cache.get_signature_status_slow(&sig, &ancestors), - None - ); + assert_eq!(status_cache.get_signature_slot(&sig, &ancestors), None); } #[test] @@ -323,24 +304,6 @@ mod tests { ); } - #[test] - fn test_find_sig_with_root_ancestor_fork_max_len() { - let sig = Signature::default(); - let mut status_cache = BankStatusCache::default(); - let blockhash = hash(Hash::default().as_ref()); - let ancestors = vec![(2, 2)].into_iter().collect(); - status_cache.insert(&blockhash, &sig, 0, ()); - status_cache.add_root(0); - assert_eq!( - status_cache.get_signature_status_slow(&sig, &ancestors), - Some(SignatureConfirmationStatus { - slot: 0, - confirmations: ancestors.len(), - status: () - }) - ); - } - #[test] fn test_insert_picks_latest_blockhash_fork() { let sig = Signature::default(); @@ -371,10 +334,6 @@ mod tests { status_cache.get_signature_status(&sig, &blockhash, &ancestors), None ); - assert_eq!( - status_cache.get_signature_status_slow(&sig, &ancestors), - None - ); } #[test]