Block packets in vote-only mode (#24906)

This commit is contained in:
sakridge 2022-05-14 10:53:37 -05:00 committed by GitHub
parent d513407485
commit 3d96a1ab76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 99 additions and 5 deletions

View File

@ -113,6 +113,7 @@ fn main() -> Result<()> {
stats.clone(),
1,
true,
None,
));
}

View File

@ -157,6 +157,7 @@ impl AncestorHashesService {
)),
1,
false,
None,
);
let ancestor_hashes_request_statuses: Arc<DashMap<Slot, DeadSlotAncestorRequestStatus>> =
@ -920,6 +921,7 @@ mod test {
)),
1,
false,
None,
);
let t_listen = ServeRepair::listen(
responder_serve_repair,

View File

@ -53,6 +53,7 @@ impl FetchStage {
&vote_sender,
poh_recorder,
coalesce_ms,
None,
),
receiver,
vote_receiver,
@ -68,6 +69,7 @@ impl FetchStage {
vote_sender: &PacketBatchSender,
poh_recorder: &Arc<Mutex<PohRecorder>>,
coalesce_ms: u64,
in_vote_only_mode: Option<Arc<AtomicBool>>,
) -> Self {
let tx_sockets = sockets.into_iter().map(Arc::new).collect();
let tpu_forwards_sockets = tpu_forwards_sockets.into_iter().map(Arc::new).collect();
@ -81,6 +83,7 @@ impl FetchStage {
vote_sender,
poh_recorder,
coalesce_ms,
in_vote_only_mode,
)
}
@ -135,6 +138,7 @@ impl FetchStage {
vote_sender: &PacketBatchSender,
poh_recorder: &Arc<Mutex<PohRecorder>>,
coalesce_ms: u64,
in_vote_only_mode: Option<Arc<AtomicBool>>,
) -> Self {
let recycler: PacketBatchRecycler = Recycler::warmed(1000, 1024);
@ -150,6 +154,7 @@ impl FetchStage {
tpu_stats.clone(),
coalesce_ms,
true,
in_vote_only_mode.clone(),
)
})
.collect();
@ -167,6 +172,7 @@ impl FetchStage {
tpu_forward_stats.clone(),
coalesce_ms,
true,
in_vote_only_mode.clone(),
)
})
.collect();
@ -183,6 +189,7 @@ impl FetchStage {
tpu_vote_stats.clone(),
coalesce_ms,
true,
None,
)
})
.collect();

View File

@ -49,7 +49,7 @@ use {
solana_runtime::{
accounts_background_service::AbsRequestSender,
bank::{Bank, NewBankOptions},
bank_forks::BankForks,
bank_forks::{BankForks, MAX_ROOT_DISTANCE_FOR_VOTE_ONLY},
commitment::BlockCommitmentCache,
transaction_cost_metrics_sender::TransactionCostMetricsSender,
vote_sender_types::ReplayVoteSender,
@ -432,11 +432,15 @@ impl ReplayStage {
last_refresh_time: Instant::now(),
last_print_time: Instant::now(),
};
let (working_bank, in_vote_only_mode) = {
let r_bank_forks = bank_forks.read().unwrap();
(r_bank_forks.working_bank(), r_bank_forks.get_vote_only_mode_signal())
};
Self::reset_poh_recorder(
&my_pubkey,
&blockstore,
&bank_forks.read().unwrap().working_bank(),
&working_bank,
&poh_recorder,
&leader_schedule_cache,
);
@ -607,6 +611,8 @@ impl ReplayStage {
.select_forks(&frozen_banks, &tower, &progress, &ancestors, &bank_forks);
select_forks_time.stop();
Self::check_for_vote_only_mode(heaviest_bank.slot(), forks_root, &in_vote_only_mode, &bank_forks);
if let Some(heaviest_bank_on_same_voted_fork) = heaviest_bank_on_same_voted_fork.as_ref() {
if let Some(my_latest_landed_vote) = progress.my_latest_landed_vote(heaviest_bank_on_same_voted_fork.slot()) {
Self::refresh_last_vote(&mut tower,
@ -903,6 +909,41 @@ impl ReplayStage {
}
}
fn check_for_vote_only_mode(
heaviest_bank_slot: Slot,
forks_root: Slot,
in_vote_only_mode: &AtomicBool,
bank_forks: &RwLock<BankForks>,
) {
if heaviest_bank_slot.saturating_sub(forks_root) > MAX_ROOT_DISTANCE_FOR_VOTE_ONLY {
if !in_vote_only_mode.load(Ordering::Relaxed)
&& in_vote_only_mode
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
let bank_forks = bank_forks.read().unwrap();
datapoint_warn!(
"bank_forks-entering-vote-only-mode",
("banks_len", bank_forks.len(), i64),
("heaviest_bank", heaviest_bank_slot, i64),
("root", bank_forks.root(), i64),
);
}
} else if in_vote_only_mode.load(Ordering::Relaxed)
&& in_vote_only_mode
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
let bank_forks = bank_forks.read().unwrap();
datapoint_warn!(
"bank_forks-exiting-vote-only-mode",
("banks_len", bank_forks.len(), i64),
("heaviest_bank", heaviest_bank_slot, i64),
("root", bank_forks.root(), i64),
);
}
}
fn maybe_retransmit_unpropagated_slots(
metric_name: &'static str,
retransmit_slots_sender: &RetransmitSlotsSender,
@ -1594,7 +1635,6 @@ impl ReplayStage {
);
let root_distance = poh_slot - root_slot;
const MAX_ROOT_DISTANCE_FOR_VOTE_ONLY: Slot = 400;
let vote_only_bank = if root_distance > MAX_ROOT_DISTANCE_FOR_VOTE_ONLY {
datapoint_info!("vote-only-bank", ("slot", poh_slot, i64));
true
@ -6455,4 +6495,16 @@ pub(crate) mod tests {
) -> bool {
map1.len() == map2.len() && map1.iter().all(|(k, v)| map2.get(k).unwrap() == v)
}
#[test]
fn test_check_for_vote_only_mode() {
let in_vote_only_mode = AtomicBool::new(false);
let genesis_config = create_genesis_config(10_000).genesis_config;
let bank0 = Bank::new_for_tests(&genesis_config);
let bank_forks = RwLock::new(BankForks::new(bank0));
ReplayStage::check_for_vote_only_mode(1000, 0, &in_vote_only_mode, &bank_forks);
assert!(in_vote_only_mode.load(Ordering::Relaxed));
ReplayStage::check_for_vote_only_mode(10, 0, &in_vote_only_mode, &bank_forks);
assert!(!in_vote_only_mode.load(Ordering::Relaxed));
}
}

View File

@ -42,6 +42,7 @@ impl ServeRepairService {
Arc::new(StreamerReceiveStats::new("serve_repair_receiver")),
1,
false,
None,
);
let (response_sender, response_receiver) = unbounded();
let t_responder = streamer::responder(

View File

@ -142,6 +142,7 @@ impl ShredFetchStage {
Arc::new(StreamerReceiveStats::new("packet_modifier")),
1,
true,
None,
)
})
.collect();

View File

@ -113,6 +113,7 @@ impl Tpu {
&vote_packet_sender,
poh_recorder,
tpu_coalesce_ms,
Some(bank_forks.read().unwrap().get_vote_only_mode_signal()),
);
let (find_packet_sender_stake_sender, find_packet_sender_stake_receiver) = unbounded();

View File

@ -57,6 +57,7 @@ impl GossipService {
Arc::new(StreamerReceiveStats::new("gossip_receiver")),
1,
false,
None,
);
let (consume_sender, listen_receiver) = unbounded();
let t_socket_consume = cluster_info.clone().start_socket_consume_thread(

View File

@ -12,11 +12,13 @@ use {
std::{
collections::{hash_map::Entry, HashMap, HashSet},
ops::Index,
sync::Arc,
sync::{atomic::AtomicBool, Arc},
time::Instant,
},
};
pub const MAX_ROOT_DISTANCE_FOR_VOTE_ONLY: Slot = 400;
#[derive(Debug, Default, Copy, Clone)]
struct SetRootMetrics {
timings: SetRootTimings,
@ -48,6 +50,7 @@ pub struct BankForks {
pub accounts_hash_interval_slots: Slot,
last_accounts_hash_slot: Slot,
in_vote_only_mode: Arc<AtomicBool>,
}
impl Index<u64> for BankForks {
@ -67,6 +70,18 @@ impl BankForks {
self.banks.clone()
}
pub fn get_vote_only_mode_signal(&self) -> Arc<AtomicBool> {
self.in_vote_only_mode.clone()
}
pub fn len(&self) -> usize {
self.banks.len()
}
pub fn is_empty(&self) -> bool {
self.banks.is_empty()
}
/// Create a map of bank slot id to the set of ancestors for the bank slot.
pub fn ancestors(&self) -> HashMap<Slot, HashSet<Slot>> {
let root = self.root;
@ -151,6 +166,7 @@ impl BankForks {
snapshot_config: None,
accounts_hash_interval_slots: std::u64::MAX,
last_accounts_hash_slot: root,
in_vote_only_mode: Arc::new(AtomicBool::new(false)),
}
}

View File

@ -18,7 +18,7 @@ use {
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
thread::{Builder, JoinHandle},
thread::{sleep, Builder, JoinHandle},
time::{Duration, Instant},
},
thiserror::Error,
@ -98,6 +98,7 @@ fn recv_loop(
stats: &StreamerReceiveStats,
coalesce_ms: u64,
use_pinned_memory: bool,
in_vote_only_mode: Option<Arc<AtomicBool>>,
) -> Result<()> {
loop {
let mut packet_batch = if use_pinned_memory {
@ -111,6 +112,14 @@ fn recv_loop(
if exit.load(Ordering::Relaxed) {
return Ok(());
}
if let Some(ref in_vote_only_mode) = in_vote_only_mode {
if in_vote_only_mode.load(Ordering::Relaxed) {
sleep(Duration::from_millis(1));
continue;
}
}
if let Ok(len) = packet::recv_from(&mut packet_batch, socket, coalesce_ms) {
if len > 0 {
let StreamerReceiveStats {
@ -144,6 +153,7 @@ pub fn receiver(
stats: Arc<StreamerReceiveStats>,
coalesce_ms: u64,
use_pinned_memory: bool,
in_vote_only_mode: Option<Arc<AtomicBool>>,
) -> JoinHandle<()> {
let res = socket.set_read_timeout(Some(Duration::new(1, 0)));
assert!(res.is_ok(), "streamer::receiver set_read_timeout error");
@ -158,6 +168,7 @@ pub fn receiver(
&stats,
coalesce_ms,
use_pinned_memory,
in_vote_only_mode,
);
})
.unwrap()
@ -451,6 +462,7 @@ mod test {
stats.clone(),
1,
true,
None,
);
const NUM_PACKETS: usize = 5;
let t_responder = {