Add max retransmit and shred insert slot (#15475)

This commit is contained in:
sakridge 2021-02-23 13:06:33 -08:00 committed by GitHub
parent cf4e31964b
commit 1b59b163dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 146 additions and 1 deletions

View File

@ -6,6 +6,7 @@ extern crate test;
use log::*;
use solana_core::cluster_info::{ClusterInfo, Node};
use solana_core::contact_info::ContactInfo;
use solana_core::max_slots::MaxSlots;
use solana_core::retransmit_stage::retransmitter;
use solana_ledger::entry::Entry;
use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo};
@ -92,6 +93,7 @@ fn bench_retransmitter(bencher: &mut Bencher) {
&leader_schedule_cache,
cluster_info,
packet_receiver,
&Arc::new(MaxSlots::default()),
);
let mut index = 0;

View File

@ -1,4 +1,4 @@
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions};
use crossbeam_channel::{Receiver, RecvTimeoutError, Sender};
use solana_ledger::blockstore::{Blockstore, CompletedDataSetInfo};
use solana_ledger::entry::Entry;
@ -25,6 +25,7 @@ impl CompletedDataSetsService {
blockstore: Arc<Blockstore>,
rpc_subscriptions: Arc<RpcSubscriptions>,
exit: &Arc<AtomicBool>,
max_slots: Arc<MaxSlots>,
) -> Self {
let exit = exit.clone();
let thread_hdl = Builder::new()
@ -37,6 +38,7 @@ impl CompletedDataSetsService {
&completed_sets_receiver,
&blockstore,
&rpc_subscriptions,
&max_slots,
) {
break;
}
@ -49,8 +51,10 @@ impl CompletedDataSetsService {
completed_sets_receiver: &CompletedDataSetsReceiver,
blockstore: &Blockstore,
rpc_subscriptions: &RpcSubscriptions,
max_slots: &Arc<MaxSlots>,
) -> Result<(), RecvTimeoutError> {
let completed_data_sets = completed_sets_receiver.recv_timeout(Duration::from_secs(1))?;
let mut max_slot = 0;
for completed_set_info in std::iter::once(completed_data_sets)
.chain(completed_sets_receiver.try_iter())
.flatten()
@ -60,6 +64,7 @@ impl CompletedDataSetsService {
start_index,
end_index,
} = completed_set_info;
max_slot = max_slot.max(slot);
match blockstore.get_entries_in_data_block(slot, start_index, end_index, None) {
Ok(entries) => {
let transactions = Self::get_transaction_signatures(entries);
@ -70,6 +75,9 @@ impl CompletedDataSetsService {
Err(e) => warn!("completed-data-set-service deserialize error: {:?}", e),
}
}
max_slots
.shred_insert
.fetch_max(max_slot, Ordering::Relaxed);
Ok(())
}

View File

@ -16,6 +16,7 @@ pub mod cluster_info_vote_listener;
pub mod commitment_service;
pub mod completed_data_sets_service;
mod deprecated;
pub mod max_slots;
pub mod sample_performance_service;
pub mod shred_fetch_stage;
#[macro_use]

7
core/src/max_slots.rs Normal file
View File

@ -0,0 +1,7 @@
use std::sync::atomic::AtomicU64;
#[derive(Default)]
pub struct MaxSlots {
pub retransmit: AtomicU64,
pub shred_insert: AtomicU64,
}

View File

@ -8,6 +8,7 @@ use crate::{
cluster_slots_service::ClusterSlotsService,
completed_data_sets_service::CompletedDataSetsSender,
contact_info::ContactInfo,
max_slots::MaxSlots,
repair_service::DuplicateSlotsResetSender,
repair_service::RepairInfo,
result::{Error, Result},
@ -264,6 +265,7 @@ fn retransmit(
epoch_stakes_cache: &RwLock<EpochStakesCache>,
last_peer_update: &AtomicU64,
shreds_received: &Mutex<ShredFilterAndHasher>,
max_slots: &MaxSlots,
) -> Result<()> {
let timer = Duration::new(1, 0);
let r_lock = r.lock().unwrap();
@ -320,6 +322,7 @@ fn retransmit(
let mut compute_turbine_peers_total = 0;
let mut packets_by_slot: HashMap<Slot, usize> = HashMap::new();
let mut packets_by_source: HashMap<String, usize> = HashMap::new();
let mut max_slot = 0;
for mut packets in packet_v {
for packet in packets.packets.iter_mut() {
// skip discarded packets and repair packets
@ -337,6 +340,7 @@ fn retransmit(
Some(slot) => slot,
None => continue,
};
max_slot = max_slot.max(shred_slot);
let mut compute_turbine_peers = Measure::start("turbine_start");
let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index(
&my_id,
@ -393,6 +397,7 @@ fn retransmit(
retransmit_total += retransmit_time.as_us();
}
}
max_slots.retransmit.fetch_max(max_slot, Ordering::Relaxed);
timer_start.stop();
debug!(
"retransmitted {} packets in {}ms retransmit_time: {}ms id: {}",
@ -433,6 +438,7 @@ pub fn retransmitter(
leader_schedule_cache: &Arc<LeaderScheduleCache>,
cluster_info: Arc<ClusterInfo>,
r: Arc<Mutex<PacketReceiver>>,
max_slots: &Arc<MaxSlots>,
) -> Vec<JoinHandle<()>> {
let stats = Arc::new(RetransmitStats::default());
let shreds_received = Arc::new(Mutex::new((
@ -450,6 +456,7 @@ pub fn retransmitter(
let epoch_stakes_cache = Arc::new(RwLock::new(EpochStakesCache::default()));
let last_peer_update = Arc::new(AtomicU64::new(0));
let shreds_received = shreds_received.clone();
let max_slots = max_slots.clone();
Builder::new()
.name("solana-retransmitter".to_string())
@ -467,6 +474,7 @@ pub fn retransmitter(
&epoch_stakes_cache,
&last_peer_update,
&shreds_received,
&max_slots,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
@ -511,6 +519,7 @@ impl RetransmitStage {
verified_vote_receiver: VerifiedVoteReceiver,
repair_validators: Option<HashSet<Pubkey>>,
completed_data_sets_sender: CompletedDataSetsSender,
max_slots: &Arc<MaxSlots>,
) -> Self {
let (retransmit_sender, retransmit_receiver) = channel();
@ -521,6 +530,7 @@ impl RetransmitStage {
leader_schedule_cache,
cluster_info.clone(),
retransmit_receiver,
max_slots,
);
let leader_schedule_cache_clone = leader_schedule_cache.clone();
@ -638,6 +648,7 @@ mod tests {
&leader_schedule_cache,
cluster_info,
Arc::new(Mutex::new(retransmit_receiver)),
&Arc::new(MaxSlots::default()),
);
let _thread_hdls = vec![t_retransmit];

View File

@ -3,6 +3,7 @@
use crate::{
cluster_info::ClusterInfo,
contact_info::ContactInfo,
max_slots::MaxSlots,
non_circulating_supply::calculate_non_circulating_supply,
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
rpc_health::*,
@ -138,6 +139,7 @@ pub struct JsonRpcRequestProcessor {
bigtable_ledger_storage: Option<solana_storage_bigtable::LedgerStorage>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
largest_accounts_cache: Arc<RwLock<LargestAccountsCache>>,
max_slots: Arc<MaxSlots>,
}
impl Metadata for JsonRpcRequestProcessor {}
@ -221,6 +223,7 @@ impl JsonRpcRequestProcessor {
bigtable_ledger_storage: Option<solana_storage_bigtable::LedgerStorage>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
largest_accounts_cache: Arc<RwLock<LargestAccountsCache>>,
max_slots: Arc<MaxSlots>,
) -> (Self, Receiver<TransactionInfo>) {
let (sender, receiver) = channel();
(
@ -239,6 +242,7 @@ impl JsonRpcRequestProcessor {
bigtable_ledger_storage,
optimistically_confirmed_bank,
largest_accounts_cache,
max_slots,
},
receiver,
)
@ -279,6 +283,7 @@ impl JsonRpcRequestProcessor {
bank: bank.clone(),
})),
largest_accounts_cache: Arc::new(RwLock::new(LargestAccountsCache::new(30))),
max_slots: Arc::new(MaxSlots::default()),
}
}
@ -485,6 +490,14 @@ impl JsonRpcRequestProcessor {
self.bank(commitment).slot()
}
fn get_max_retransmit_slot(&self) -> Slot {
self.max_slots.retransmit.load(Ordering::Relaxed)
}
fn get_max_shred_insert_slot(&self) -> Slot {
self.max_slots.shred_insert.load(Ordering::Relaxed)
}
fn get_slot_leader(&self, commitment: Option<CommitmentConfig>) -> String {
self.bank(commitment).collector_id().to_string()
}
@ -1915,6 +1928,12 @@ pub trait RpcSol {
#[rpc(meta, name = "getSlot")]
fn get_slot(&self, meta: Self::Metadata, commitment: Option<CommitmentConfig>) -> Result<Slot>;
#[rpc(meta, name = "getMaxRetransmitSlot")]
fn get_max_retransmit_slot(&self, meta: Self::Metadata) -> Result<Slot>;
#[rpc(meta, name = "getMaxShredInsertSlot")]
fn get_max_shred_insert_slot(&self, meta: Self::Metadata) -> Result<Slot>;
#[rpc(meta, name = "getTransactionCount")]
fn get_transaction_count(
&self,
@ -2505,6 +2524,16 @@ impl RpcSol for RpcSolImpl {
Ok(meta.get_slot(commitment))
}
fn get_max_retransmit_slot(&self, meta: Self::Metadata) -> Result<Slot> {
debug!("get_max_retransmit_slot rpc request received");
Ok(meta.get_max_retransmit_slot())
}
fn get_max_shred_insert_slot(&self, meta: Self::Metadata) -> Result<Slot> {
debug!("get_max_shred_insert_slot rpc request received");
Ok(meta.get_max_shred_insert_slot())
}
fn get_transaction_count(
&self,
meta: Self::Metadata,
@ -3206,6 +3235,10 @@ pub mod tests {
.write_perf_sample(0, &sample1)
.expect("write to blockstore");
let max_slots = Arc::new(MaxSlots::default());
max_slots.retransmit.store(42, Ordering::Relaxed);
max_slots.shred_insert.store(43, Ordering::Relaxed);
let (meta, receiver) = JsonRpcRequestProcessor::new(
JsonRpcConfig {
enable_rpc_transaction_history: true,
@ -3224,6 +3257,7 @@ pub mod tests {
None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
max_slots,
);
SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1);
@ -4634,6 +4668,7 @@ pub mod tests {
None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
Arc::new(MaxSlots::default()),
);
SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1);
@ -4831,6 +4866,7 @@ pub mod tests {
None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
Arc::new(MaxSlots::default()),
);
SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1);
assert_eq!(request_processor.validator_exit(), false);
@ -4865,6 +4901,7 @@ pub mod tests {
None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
Arc::new(MaxSlots::default()),
);
SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1);
assert_eq!(request_processor.validator_exit(), true);
@ -4892,6 +4929,24 @@ pub mod tests {
assert_eq!(expected, result);
}
fn test_basic_slot(method: &str, expected: Slot) {
let bob_pubkey = solana_sdk::pubkey::new_rand();
let RpcHandler { io, meta, .. } = start_rpc_handler_with_tx(&bob_pubkey);
let req = format!("{{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"{}\"}}", method);
let res = io.handle_request_sync(&req, meta);
let json: Value = serde_json::from_str(&res.unwrap()).unwrap();
let slot: Slot = serde_json::from_value(json["result"].clone()).unwrap();
assert_eq!(slot, expected);
}
#[test]
fn test_rpc_get_max_slots() {
test_basic_slot("getMaxRetransmitSlot", 42);
test_basic_slot("getMaxShredInsertSlot", 43);
}
#[test]
fn test_rpc_get_version() {
let bob_pubkey = solana_sdk::pubkey::new_rand();
@ -4958,6 +5013,7 @@ pub mod tests {
None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
Arc::new(MaxSlots::default()),
);
SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1);
assert_eq!(
@ -6188,6 +6244,7 @@ pub mod tests {
None,
optimistically_confirmed_bank.clone(),
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
Arc::new(MaxSlots::default()),
);
let mut io = MetaIoHandler::default();

View File

@ -3,6 +3,7 @@
use crate::{
bigtable_upload_service::BigTableUploadService,
cluster_info::ClusterInfo,
max_slots::MaxSlots,
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
poh_recorder::PohRecorder,
rpc::*,
@ -272,6 +273,7 @@ impl JsonRpcService {
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
send_transaction_retry_ms: u64,
send_transaction_leader_forward_count: u64,
max_slots: Arc<MaxSlots>,
) -> Self {
info!("rpc bound to {:?}", rpc_addr);
info!("rpc configuration: {:?}", config);
@ -349,6 +351,7 @@ impl JsonRpcService {
bigtable_ledger_storage,
optimistically_confirmed_bank,
largest_accounts_cache,
max_slots,
);
let leader_info =
@ -510,6 +513,7 @@ mod tests {
optimistically_confirmed_bank,
1000,
1,
Arc::new(MaxSlots::default()),
);
let thread = rpc_service.thread_hdl.thread();
assert_eq!(thread.name().unwrap(), "solana-jsonrpc");

View File

@ -11,6 +11,7 @@ use crate::{
completed_data_sets_service::CompletedDataSetsSender,
consensus::Tower,
ledger_cleanup_service::LedgerCleanupService,
max_slots::MaxSlots,
optimistically_confirmed_bank_tracker::BankNotificationSender,
poh_recorder::PohRecorder,
replay_stage::{ReplayStage, ReplayStageConfig},
@ -120,6 +121,7 @@ impl Tvu {
completed_data_sets_sender: CompletedDataSetsSender,
bank_notification_sender: Option<BankNotificationSender>,
tvu_config: TvuConfig,
max_slots: &Arc<MaxSlots>,
) -> Self {
let keypair: Arc<Keypair> = cluster_info.keypair.clone();
@ -174,6 +176,7 @@ impl Tvu {
verified_vote_receiver,
tvu_config.repair_validators,
completed_data_sets_sender,
max_slots,
);
let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel();
@ -407,6 +410,7 @@ pub mod tests {
completed_data_sets_sender,
None,
TvuConfig::default(),
&Arc::new(MaxSlots::default()),
);
exit.store(true, Ordering::Relaxed);
tvu.join().unwrap();

View File

@ -12,6 +12,7 @@ use crate::{
consensus::{reconcile_blockstore_roots_with_tower, Tower},
contact_info::ContactInfo,
gossip_service::GossipService,
max_slots::MaxSlots,
optimistically_confirmed_bank_tracker::{
OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker,
},
@ -414,6 +415,7 @@ impl Validator {
config.pubsub_config.enable_vote_subscription,
));
let max_slots = Arc::new(MaxSlots::default());
let (completed_data_sets_sender, completed_data_sets_receiver) =
bounded(MAX_COMPLETED_DATA_SETS_IN_CHANNEL);
let completed_data_sets_service = CompletedDataSetsService::new(
@ -421,6 +423,7 @@ impl Validator {
blockstore.clone(),
subscriptions.clone(),
&exit,
max_slots.clone(),
);
info!(
@ -485,6 +488,7 @@ impl Validator {
optimistically_confirmed_bank.clone(),
config.send_transaction_retry_ms,
config.send_transaction_leader_forward_count,
max_slots.clone(),
),
pubsub_service: PubSubService::new(
config.pubsub_config.clone(),
@ -654,6 +658,7 @@ impl Validator {
rocksdb_compaction_interval: config.rocksdb_compaction_interval,
rocksdb_max_compaction_jitter: config.rocksdb_compaction_interval,
},
&max_slots,
);
let tpu = Tpu::new(

View File

@ -42,6 +42,8 @@ gives a convenient interface for the RPC methods.
- [getInflationRate](jsonrpc-api.md#getinflationrate)
- [getLargestAccounts](jsonrpc-api.md#getlargestaccounts)
- [getLeaderSchedule](jsonrpc-api.md#getleaderschedule)
- [getMaxRetransmitSlot](jsonrpc-api.md#getmaxretransmitslot)
- [getMaxShredInsertSlot](jsonrpc-api.md#getmaxshredinsertslot)
- [getMinimumBalanceForRentExemption](jsonrpc-api.md#getminimumbalanceforrentexemption)
- [getMultipleAccounts](jsonrpc-api.md#getmultipleaccounts)
- [getProgramAccounts](jsonrpc-api.md#getprogramaccounts)
@ -1612,6 +1614,50 @@ Result:
}
```
### getMaxRetransmitSlot
Get the max slot seen from retransmit stage.
#### Results:
- `<u64>` - Slot
#### Example:
Request:
```bash
curl http://localhost:8899 -X POST -H "Content-Type: application/json" -d '
{"jsonrpc":"2.0","id":1, "method":"getMaxRetransmitSlot"}
'
```
Result:
```json
{"jsonrpc":"2.0","result":1234,"id":1}
```
### getMaxShredInsertSlot
Get the max slot seen from after shred insert.
#### Results:
- `<u64>` - Slot
#### Example:
Request:
```bash
curl http://localhost:8899 -X POST -H "Content-Type: application/json" -d '
{"jsonrpc":"2.0","id":1, "method":"getMaxShredInsertSlot"}
'
```
Result:
```json
{"jsonrpc":"2.0","result":1234,"id":1}
```
### getMinimumBalanceForRentExemption
Returns minimum balance required to make account rent exempt.