diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index 9a0272124..0c0f3a3b8 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -6,6 +6,7 @@ extern crate test; use log::*; use solana_core::cluster_info::{ClusterInfo, Node}; use solana_core::contact_info::ContactInfo; +use solana_core::max_slots::MaxSlots; use solana_core::retransmit_stage::retransmitter; use solana_ledger::entry::Entry; use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}; @@ -92,6 +93,7 @@ fn bench_retransmitter(bencher: &mut Bencher) { &leader_schedule_cache, cluster_info, packet_receiver, + &Arc::new(MaxSlots::default()), ); let mut index = 0; diff --git a/core/src/completed_data_sets_service.rs b/core/src/completed_data_sets_service.rs index 9c559fb1e..4fc954482 100644 --- a/core/src/completed_data_sets_service.rs +++ b/core/src/completed_data_sets_service.rs @@ -1,4 +1,4 @@ -use crate::rpc_subscriptions::RpcSubscriptions; +use crate::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions}; use crossbeam_channel::{Receiver, RecvTimeoutError, Sender}; use solana_ledger::blockstore::{Blockstore, CompletedDataSetInfo}; use solana_ledger::entry::Entry; @@ -25,6 +25,7 @@ impl CompletedDataSetsService { blockstore: Arc, rpc_subscriptions: Arc, exit: &Arc, + max_slots: Arc, ) -> Self { let exit = exit.clone(); let thread_hdl = Builder::new() @@ -37,6 +38,7 @@ impl CompletedDataSetsService { &completed_sets_receiver, &blockstore, &rpc_subscriptions, + &max_slots, ) { break; } @@ -49,8 +51,10 @@ impl CompletedDataSetsService { completed_sets_receiver: &CompletedDataSetsReceiver, blockstore: &Blockstore, rpc_subscriptions: &RpcSubscriptions, + max_slots: &Arc, ) -> Result<(), RecvTimeoutError> { let completed_data_sets = completed_sets_receiver.recv_timeout(Duration::from_secs(1))?; + let mut max_slot = 0; for completed_set_info in std::iter::once(completed_data_sets) .chain(completed_sets_receiver.try_iter()) .flatten() @@ -60,6 +64,7 @@ impl CompletedDataSetsService { start_index, end_index, } = completed_set_info; + max_slot = max_slot.max(slot); match blockstore.get_entries_in_data_block(slot, start_index, end_index, None) { Ok(entries) => { let transactions = Self::get_transaction_signatures(entries); @@ -70,6 +75,9 @@ impl CompletedDataSetsService { Err(e) => warn!("completed-data-set-service deserialize error: {:?}", e), } } + max_slots + .shred_insert + .fetch_max(max_slot, Ordering::Relaxed); Ok(()) } diff --git a/core/src/lib.rs b/core/src/lib.rs index 822e7130b..59b987c4d 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -16,6 +16,7 @@ pub mod cluster_info_vote_listener; pub mod commitment_service; pub mod completed_data_sets_service; mod deprecated; +pub mod max_slots; pub mod sample_performance_service; pub mod shred_fetch_stage; #[macro_use] diff --git a/core/src/max_slots.rs b/core/src/max_slots.rs new file mode 100644 index 000000000..3dc12f07d --- /dev/null +++ b/core/src/max_slots.rs @@ -0,0 +1,7 @@ +use std::sync::atomic::AtomicU64; + +#[derive(Default)] +pub struct MaxSlots { + pub retransmit: AtomicU64, + pub shred_insert: AtomicU64, +} diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index fc19084c0..d517e53a9 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -8,6 +8,7 @@ use crate::{ cluster_slots_service::ClusterSlotsService, completed_data_sets_service::CompletedDataSetsSender, contact_info::ContactInfo, + max_slots::MaxSlots, repair_service::DuplicateSlotsResetSender, repair_service::RepairInfo, result::{Error, Result}, @@ -264,6 +265,7 @@ fn retransmit( epoch_stakes_cache: &RwLock, last_peer_update: &AtomicU64, shreds_received: &Mutex, + max_slots: &MaxSlots, ) -> Result<()> { let timer = Duration::new(1, 0); let r_lock = r.lock().unwrap(); @@ -320,6 +322,7 @@ fn retransmit( let mut compute_turbine_peers_total = 0; let mut packets_by_slot: HashMap = HashMap::new(); let mut packets_by_source: HashMap = HashMap::new(); + let mut max_slot = 0; for mut packets in packet_v { for packet in packets.packets.iter_mut() { // skip discarded packets and repair packets @@ -337,6 +340,7 @@ fn retransmit( Some(slot) => slot, None => continue, }; + max_slot = max_slot.max(shred_slot); let mut compute_turbine_peers = Measure::start("turbine_start"); let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index( &my_id, @@ -393,6 +397,7 @@ fn retransmit( retransmit_total += retransmit_time.as_us(); } } + max_slots.retransmit.fetch_max(max_slot, Ordering::Relaxed); timer_start.stop(); debug!( "retransmitted {} packets in {}ms retransmit_time: {}ms id: {}", @@ -433,6 +438,7 @@ pub fn retransmitter( leader_schedule_cache: &Arc, cluster_info: Arc, r: Arc>, + max_slots: &Arc, ) -> Vec> { let stats = Arc::new(RetransmitStats::default()); let shreds_received = Arc::new(Mutex::new(( @@ -450,6 +456,7 @@ pub fn retransmitter( let epoch_stakes_cache = Arc::new(RwLock::new(EpochStakesCache::default())); let last_peer_update = Arc::new(AtomicU64::new(0)); let shreds_received = shreds_received.clone(); + let max_slots = max_slots.clone(); Builder::new() .name("solana-retransmitter".to_string()) @@ -467,6 +474,7 @@ pub fn retransmitter( &epoch_stakes_cache, &last_peer_update, &shreds_received, + &max_slots, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, @@ -511,6 +519,7 @@ impl RetransmitStage { verified_vote_receiver: VerifiedVoteReceiver, repair_validators: Option>, completed_data_sets_sender: CompletedDataSetsSender, + max_slots: &Arc, ) -> Self { let (retransmit_sender, retransmit_receiver) = channel(); @@ -521,6 +530,7 @@ impl RetransmitStage { leader_schedule_cache, cluster_info.clone(), retransmit_receiver, + max_slots, ); let leader_schedule_cache_clone = leader_schedule_cache.clone(); @@ -638,6 +648,7 @@ mod tests { &leader_schedule_cache, cluster_info, Arc::new(Mutex::new(retransmit_receiver)), + &Arc::new(MaxSlots::default()), ); let _thread_hdls = vec![t_retransmit]; diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 684ad6b8b..1a47dd0e3 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -3,6 +3,7 @@ use crate::{ cluster_info::ClusterInfo, contact_info::ContactInfo, + max_slots::MaxSlots, non_circulating_supply::calculate_non_circulating_supply, optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, rpc_health::*, @@ -138,6 +139,7 @@ pub struct JsonRpcRequestProcessor { bigtable_ledger_storage: Option, optimistically_confirmed_bank: Arc>, largest_accounts_cache: Arc>, + max_slots: Arc, } impl Metadata for JsonRpcRequestProcessor {} @@ -221,6 +223,7 @@ impl JsonRpcRequestProcessor { bigtable_ledger_storage: Option, optimistically_confirmed_bank: Arc>, largest_accounts_cache: Arc>, + max_slots: Arc, ) -> (Self, Receiver) { let (sender, receiver) = channel(); ( @@ -239,6 +242,7 @@ impl JsonRpcRequestProcessor { bigtable_ledger_storage, optimistically_confirmed_bank, largest_accounts_cache, + max_slots, }, receiver, ) @@ -279,6 +283,7 @@ impl JsonRpcRequestProcessor { bank: bank.clone(), })), largest_accounts_cache: Arc::new(RwLock::new(LargestAccountsCache::new(30))), + max_slots: Arc::new(MaxSlots::default()), } } @@ -485,6 +490,14 @@ impl JsonRpcRequestProcessor { self.bank(commitment).slot() } + fn get_max_retransmit_slot(&self) -> Slot { + self.max_slots.retransmit.load(Ordering::Relaxed) + } + + fn get_max_shred_insert_slot(&self) -> Slot { + self.max_slots.shred_insert.load(Ordering::Relaxed) + } + fn get_slot_leader(&self, commitment: Option) -> String { self.bank(commitment).collector_id().to_string() } @@ -1915,6 +1928,12 @@ pub trait RpcSol { #[rpc(meta, name = "getSlot")] fn get_slot(&self, meta: Self::Metadata, commitment: Option) -> Result; + #[rpc(meta, name = "getMaxRetransmitSlot")] + fn get_max_retransmit_slot(&self, meta: Self::Metadata) -> Result; + + #[rpc(meta, name = "getMaxShredInsertSlot")] + fn get_max_shred_insert_slot(&self, meta: Self::Metadata) -> Result; + #[rpc(meta, name = "getTransactionCount")] fn get_transaction_count( &self, @@ -2505,6 +2524,16 @@ impl RpcSol for RpcSolImpl { Ok(meta.get_slot(commitment)) } + fn get_max_retransmit_slot(&self, meta: Self::Metadata) -> Result { + debug!("get_max_retransmit_slot rpc request received"); + Ok(meta.get_max_retransmit_slot()) + } + + fn get_max_shred_insert_slot(&self, meta: Self::Metadata) -> Result { + debug!("get_max_shred_insert_slot rpc request received"); + Ok(meta.get_max_shred_insert_slot()) + } + fn get_transaction_count( &self, meta: Self::Metadata, @@ -3206,6 +3235,10 @@ pub mod tests { .write_perf_sample(0, &sample1) .expect("write to blockstore"); + let max_slots = Arc::new(MaxSlots::default()); + max_slots.retransmit.store(42, Ordering::Relaxed); + max_slots.shred_insert.store(43, Ordering::Relaxed); + let (meta, receiver) = JsonRpcRequestProcessor::new( JsonRpcConfig { enable_rpc_transaction_history: true, @@ -3224,6 +3257,7 @@ pub mod tests { None, OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), Arc::new(RwLock::new(LargestAccountsCache::new(30))), + max_slots, ); SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); @@ -4634,6 +4668,7 @@ pub mod tests { None, OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), Arc::new(RwLock::new(LargestAccountsCache::new(30))), + Arc::new(MaxSlots::default()), ); SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); @@ -4831,6 +4866,7 @@ pub mod tests { None, OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), Arc::new(RwLock::new(LargestAccountsCache::new(30))), + Arc::new(MaxSlots::default()), ); SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); assert_eq!(request_processor.validator_exit(), false); @@ -4865,6 +4901,7 @@ pub mod tests { None, OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), Arc::new(RwLock::new(LargestAccountsCache::new(30))), + Arc::new(MaxSlots::default()), ); SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); assert_eq!(request_processor.validator_exit(), true); @@ -4892,6 +4929,24 @@ pub mod tests { assert_eq!(expected, result); } + fn test_basic_slot(method: &str, expected: Slot) { + let bob_pubkey = solana_sdk::pubkey::new_rand(); + let RpcHandler { io, meta, .. } = start_rpc_handler_with_tx(&bob_pubkey); + + let req = format!("{{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"{}\"}}", method); + let res = io.handle_request_sync(&req, meta); + + let json: Value = serde_json::from_str(&res.unwrap()).unwrap(); + let slot: Slot = serde_json::from_value(json["result"].clone()).unwrap(); + assert_eq!(slot, expected); + } + + #[test] + fn test_rpc_get_max_slots() { + test_basic_slot("getMaxRetransmitSlot", 42); + test_basic_slot("getMaxShredInsertSlot", 43); + } + #[test] fn test_rpc_get_version() { let bob_pubkey = solana_sdk::pubkey::new_rand(); @@ -4958,6 +5013,7 @@ pub mod tests { None, OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), Arc::new(RwLock::new(LargestAccountsCache::new(30))), + Arc::new(MaxSlots::default()), ); SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); assert_eq!( @@ -6188,6 +6244,7 @@ pub mod tests { None, optimistically_confirmed_bank.clone(), Arc::new(RwLock::new(LargestAccountsCache::new(30))), + Arc::new(MaxSlots::default()), ); let mut io = MetaIoHandler::default(); diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index 40d388e22..0b20a013e 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -3,6 +3,7 @@ use crate::{ bigtable_upload_service::BigTableUploadService, cluster_info::ClusterInfo, + max_slots::MaxSlots, optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, poh_recorder::PohRecorder, rpc::*, @@ -272,6 +273,7 @@ impl JsonRpcService { optimistically_confirmed_bank: Arc>, send_transaction_retry_ms: u64, send_transaction_leader_forward_count: u64, + max_slots: Arc, ) -> Self { info!("rpc bound to {:?}", rpc_addr); info!("rpc configuration: {:?}", config); @@ -349,6 +351,7 @@ impl JsonRpcService { bigtable_ledger_storage, optimistically_confirmed_bank, largest_accounts_cache, + max_slots, ); let leader_info = @@ -510,6 +513,7 @@ mod tests { optimistically_confirmed_bank, 1000, 1, + Arc::new(MaxSlots::default()), ); let thread = rpc_service.thread_hdl.thread(); assert_eq!(thread.name().unwrap(), "solana-jsonrpc"); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 8d4e1c2a7..e2f41a06d 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -11,6 +11,7 @@ use crate::{ completed_data_sets_service::CompletedDataSetsSender, consensus::Tower, ledger_cleanup_service::LedgerCleanupService, + max_slots::MaxSlots, optimistically_confirmed_bank_tracker::BankNotificationSender, poh_recorder::PohRecorder, replay_stage::{ReplayStage, ReplayStageConfig}, @@ -120,6 +121,7 @@ impl Tvu { completed_data_sets_sender: CompletedDataSetsSender, bank_notification_sender: Option, tvu_config: TvuConfig, + max_slots: &Arc, ) -> Self { let keypair: Arc = cluster_info.keypair.clone(); @@ -174,6 +176,7 @@ impl Tvu { verified_vote_receiver, tvu_config.repair_validators, completed_data_sets_sender, + max_slots, ); let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel(); @@ -407,6 +410,7 @@ pub mod tests { completed_data_sets_sender, None, TvuConfig::default(), + &Arc::new(MaxSlots::default()), ); exit.store(true, Ordering::Relaxed); tvu.join().unwrap(); diff --git a/core/src/validator.rs b/core/src/validator.rs index 71e604277..0c569b766 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -12,6 +12,7 @@ use crate::{ consensus::{reconcile_blockstore_roots_with_tower, Tower}, contact_info::ContactInfo, gossip_service::GossipService, + max_slots::MaxSlots, optimistically_confirmed_bank_tracker::{ OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker, }, @@ -414,6 +415,7 @@ impl Validator { config.pubsub_config.enable_vote_subscription, )); + let max_slots = Arc::new(MaxSlots::default()); let (completed_data_sets_sender, completed_data_sets_receiver) = bounded(MAX_COMPLETED_DATA_SETS_IN_CHANNEL); let completed_data_sets_service = CompletedDataSetsService::new( @@ -421,6 +423,7 @@ impl Validator { blockstore.clone(), subscriptions.clone(), &exit, + max_slots.clone(), ); info!( @@ -485,6 +488,7 @@ impl Validator { optimistically_confirmed_bank.clone(), config.send_transaction_retry_ms, config.send_transaction_leader_forward_count, + max_slots.clone(), ), pubsub_service: PubSubService::new( config.pubsub_config.clone(), @@ -654,6 +658,7 @@ impl Validator { rocksdb_compaction_interval: config.rocksdb_compaction_interval, rocksdb_max_compaction_jitter: config.rocksdb_compaction_interval, }, + &max_slots, ); let tpu = Tpu::new( diff --git a/docs/src/developing/clients/jsonrpc-api.md b/docs/src/developing/clients/jsonrpc-api.md index e82d36893..7a0b4ed82 100644 --- a/docs/src/developing/clients/jsonrpc-api.md +++ b/docs/src/developing/clients/jsonrpc-api.md @@ -42,6 +42,8 @@ gives a convenient interface for the RPC methods. - [getInflationRate](jsonrpc-api.md#getinflationrate) - [getLargestAccounts](jsonrpc-api.md#getlargestaccounts) - [getLeaderSchedule](jsonrpc-api.md#getleaderschedule) +- [getMaxRetransmitSlot](jsonrpc-api.md#getmaxretransmitslot) +- [getMaxShredInsertSlot](jsonrpc-api.md#getmaxshredinsertslot) - [getMinimumBalanceForRentExemption](jsonrpc-api.md#getminimumbalanceforrentexemption) - [getMultipleAccounts](jsonrpc-api.md#getmultipleaccounts) - [getProgramAccounts](jsonrpc-api.md#getprogramaccounts) @@ -1612,6 +1614,50 @@ Result: } ``` +### getMaxRetransmitSlot + +Get the max slot seen from retransmit stage. + +#### Results: + +- `` - Slot + +#### Example: + +Request: +```bash +curl http://localhost:8899 -X POST -H "Content-Type: application/json" -d ' + {"jsonrpc":"2.0","id":1, "method":"getMaxRetransmitSlot"} +' +``` + +Result: +```json +{"jsonrpc":"2.0","result":1234,"id":1} +``` + +### getMaxShredInsertSlot + +Get the max slot seen from after shred insert. + +#### Results: + +- `` - Slot + +#### Example: + +Request: +```bash +curl http://localhost:8899 -X POST -H "Content-Type: application/json" -d ' + {"jsonrpc":"2.0","id":1, "method":"getMaxShredInsertSlot"} +' +``` + +Result: +```json +{"jsonrpc":"2.0","result":1234,"id":1} +``` + ### getMinimumBalanceForRentExemption Returns minimum balance required to make account rent exempt.