Wallclock BankingStage Throttle (#15731)

This commit is contained in:
carllin 2021-03-15 17:11:15 -07:00 committed by GitHub
parent 60e5fd11c9
commit c1ba265dd9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 177 additions and 62 deletions

View File

@ -81,6 +81,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
bencher.iter(move || {
let _ignored = BankingStage::consume_buffered_packets(
&my_pubkey,
std::u128::MAX,
&poh_recorder,
&mut packets,
None,
@ -154,6 +155,9 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
let (verified_sender, verified_receiver) = unbounded();
let (vote_sender, vote_receiver) = unbounded();
let mut bank = Bank::new(&genesis_config);
// Allow arbitrary transaction processing time for the purposes of this bench
bank.ns_per_slot = std::u128::MAX;
let bank = Arc::new(Bank::new(&genesis_config));
debug!("threads: {} txs: {}", num_threads, txes);

View File

@ -159,9 +159,9 @@ pub struct BankingStage {
bank_thread_hdls: Vec<JoinHandle<()>>,
}
#[derive(Debug, PartialEq, Eq, Clone)]
#[derive(Debug, Clone)]
pub enum BufferedPacketsDecision {
Consume,
Consume(u128),
Forward,
ForwardAndHold,
Hold,
@ -288,6 +288,7 @@ impl BankingStage {
pub fn consume_buffered_packets(
my_pubkey: &Pubkey,
max_tx_ingestion_ns: u128,
poh_recorder: &Arc<Mutex<PohRecorder>>,
buffered_packets: &mut UnprocessedPackets,
transaction_status_sender: Option<TransactionStatusSender>,
@ -316,18 +317,24 @@ impl BankingStage {
new_unprocessed_indexes,
)
} else {
let bank = poh_recorder.lock().unwrap().bank();
if let Some(bank) = bank {
let bank_start = poh_recorder.lock().unwrap().bank_start();
if let Some((bank, bank_creation_time)) = bank_start {
let (processed, verified_txs_len, new_unprocessed_indexes) =
Self::process_received_packets(
Self::process_packets_transactions(
&bank,
&bank_creation_time,
&poh_recorder,
&msgs,
original_unprocessed_indexes.to_owned(),
transaction_status_sender.clone(),
gossip_vote_sender,
);
if processed < verified_txs_len {
if processed < verified_txs_len
|| !Bank::should_bank_still_be_processing_txs(
&bank_creation_time,
max_tx_ingestion_ns,
)
{
reached_end_of_slot =
Some((poh_recorder.lock().unwrap().next_slot_leader(), bank));
}
@ -380,7 +387,7 @@ impl BankingStage {
fn consume_or_forward_packets(
my_pubkey: &Pubkey,
leader_pubkey: Option<Pubkey>,
bank_is_available: bool,
bank_still_processing_txs: Option<&Arc<Bank>>,
would_be_leader: bool,
would_be_leader_shortly: bool,
) -> BufferedPacketsDecision {
@ -389,9 +396,9 @@ impl BankingStage {
BufferedPacketsDecision::Hold,
// else process the packets
|x| {
if bank_is_available {
if let Some(bank) = bank_still_processing_txs {
// If the bank is available, this node is the leader
BufferedPacketsDecision::Consume
BufferedPacketsDecision::Consume(bank.ns_per_slot)
} else if would_be_leader_shortly {
// If the node will be the leader soon, hold the packets for now
BufferedPacketsDecision::Hold
@ -422,11 +429,18 @@ impl BankingStage {
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
) -> BufferedPacketsDecision {
let (leader_at_slot_offset, poh_has_bank, would_be_leader, would_be_leader_shortly) = {
let bank_start;
let (
leader_at_slot_offset,
bank_still_processing_txs,
would_be_leader,
would_be_leader_shortly,
) = {
let poh = poh_recorder.lock().unwrap();
bank_start = poh.bank_start();
(
poh.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET),
poh.has_bank(),
PohRecorder::get_bank_still_processing_txs(&bank_start),
poh.would_be_leader(HOLD_TRANSACTIONS_SLOT_OFFSET * DEFAULT_TICKS_PER_SLOT),
poh.would_be_leader(
(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET - 1) * DEFAULT_TICKS_PER_SLOT,
@ -437,15 +451,16 @@ impl BankingStage {
let decision = Self::consume_or_forward_packets(
my_pubkey,
leader_at_slot_offset,
poh_has_bank,
bank_still_processing_txs,
would_be_leader,
would_be_leader_shortly,
);
match decision {
BufferedPacketsDecision::Consume => {
BufferedPacketsDecision::Consume(max_tx_ingestion_ns) => {
Self::consume_buffered_packets(
my_pubkey,
max_tx_ingestion_ns,
poh_recorder,
buffered_packets,
transaction_status_sender,
@ -545,8 +560,8 @@ impl BankingStage {
&gossip_vote_sender,
&banking_stage_stats,
);
if decision == BufferedPacketsDecision::Hold
|| decision == BufferedPacketsDecision::ForwardAndHold
if matches!(decision, BufferedPacketsDecision::Hold)
|| matches!(decision, BufferedPacketsDecision::ForwardAndHold)
{
// If we are waiting on a new bank,
// check the receiver for more transactions/for exiting
@ -829,6 +844,7 @@ impl BankingStage {
/// than the total number if max PoH height was reached and the bank halted
fn process_transactions(
bank: &Arc<Bank>,
bank_creation_time: &Instant,
transactions: &[Transaction],
poh: &Arc<Mutex<PohRecorder>>,
transaction_status_sender: Option<TransactionStatusSender>,
@ -855,17 +871,25 @@ impl BankingStage {
// Add the retryable txs (transactions that errored in a way that warrants a retry)
// to the list of unprocessed txs.
unprocessed_txs.extend_from_slice(&retryable_txs_in_chunk);
if let Err(PohRecorderError::MaxHeightReached) = result {
info!(
"process transactions: max height reached slot: {} height: {}",
bank.slot(),
bank.tick_height()
);
// process_and_record_transactions has returned all retryable errors in
// transactions[chunk_start..chunk_end], so we just need to push the remaining
// transactions into the unprocessed queue.
unprocessed_txs.extend(chunk_end..transactions.len());
break;
// If `bank_creation_time` is None, it's a test so ignore the option so
// allow processing
let should_bank_still_be_processing_txs =
Bank::should_bank_still_be_processing_txs(bank_creation_time, bank.ns_per_slot);
match (result, should_bank_still_be_processing_txs) {
(Err(PohRecorderError::MaxHeightReached), _) | (_, false) => {
info!(
"process transactions: max height reached slot: {} height: {}",
bank.slot(),
bank.tick_height()
);
// process_and_record_transactions has returned all retryable errors in
// transactions[chunk_start..chunk_end], so we just need to push the remaining
// transactions into the unprocessed queue.
unprocessed_txs.extend(chunk_end..transactions.len());
break;
}
_ => (),
}
// Don't exit early on any other type of error, continue processing...
chunk_start = chunk_end;
@ -990,8 +1014,9 @@ impl BankingStage {
Self::filter_valid_transaction_indexes(&result, transaction_to_packet_indexes)
}
fn process_received_packets(
fn process_packets_transactions(
bank: &Arc<Bank>,
bank_creation_time: &Instant,
poh: &Arc<Mutex<PohRecorder>>,
msgs: &Packets,
packet_indexes: Vec<usize>,
@ -1013,6 +1038,7 @@ impl BankingStage {
let (processed, unprocessed_tx_indexes) = Self::process_transactions(
bank,
bank_creation_time,
&transactions,
poh,
transaction_status_sender,
@ -1120,7 +1146,7 @@ impl BankingStage {
id,
);
inc_new_counter_debug!("banking_stage-transactions_received", count);
let mut proc_start = Measure::start("process_received_packets_process");
let mut proc_start = Measure::start("process_packets_transactions_process");
let mut new_tx_count = 0;
let mut mms_iter = mms.into_iter();
@ -1128,8 +1154,8 @@ impl BankingStage {
let mut newly_buffered_packets_count = 0;
while let Some(msgs) = mms_iter.next() {
let packet_indexes = Self::generate_packet_indexes(&msgs.packets);
let bank = poh.lock().unwrap().bank();
if bank.is_none() {
let bank_start = poh.lock().unwrap().bank_start();
if PohRecorder::get_bank_still_processing_txs(&bank_start).is_none() {
Self::push_unprocessed(
buffered_packets,
msgs,
@ -1141,16 +1167,18 @@ impl BankingStage {
);
continue;
}
let bank = bank.unwrap();
let (bank, bank_creation_time) = bank_start.unwrap();
let (processed, verified_txs_len, unprocessed_indexes) = Self::process_received_packets(
&bank,
&poh,
&msgs,
packet_indexes,
transaction_status_sender.clone(),
gossip_vote_sender,
);
let (processed, verified_txs_len, unprocessed_indexes) =
Self::process_packets_transactions(
&bank,
&bank_creation_time,
&poh,
&msgs,
packet_indexes,
transaction_status_sender.clone(),
gossip_vote_sender,
);
new_tx_count += processed;
@ -1165,6 +1193,7 @@ impl BankingStage {
duplicates,
);
// If there were retryable transactions, add the unexpired ones to the buffered queue
if processed < verified_txs_len {
let next_leader = poh.lock().unwrap().next_slot_leader();
// Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones
@ -1661,8 +1690,10 @@ mod tests {
..
} = create_genesis_config(10_000);
let bank = Arc::new(Bank::new(&genesis_config));
let start = Arc::new(Instant::now());
let working_bank = WorkingBank {
bank: bank.clone(),
start,
min_tick_height: bank.tick_height(),
max_tick_height: std::u64::MAX,
};
@ -1915,80 +1946,80 @@ mod tests {
fn test_should_process_or_forward_packets() {
let my_pubkey = solana_sdk::pubkey::new_rand();
let my_pubkey1 = solana_sdk::pubkey::new_rand();
assert_eq!(
BankingStage::consume_or_forward_packets(&my_pubkey, None, true, false, false),
let bank = Arc::new(Bank::default());
assert_matches!(
BankingStage::consume_or_forward_packets(&my_pubkey, None, Some(&bank), false, false),
BufferedPacketsDecision::Hold
);
assert_eq!(
BankingStage::consume_or_forward_packets(&my_pubkey, None, false, false, false),
assert_matches!(
BankingStage::consume_or_forward_packets(&my_pubkey, None, None, false, false),
BufferedPacketsDecision::Hold
);
assert_eq!(
BankingStage::consume_or_forward_packets(&my_pubkey1, None, false, false, false),
assert_matches!(
BankingStage::consume_or_forward_packets(&my_pubkey1, None, None, false, false),
BufferedPacketsDecision::Hold
);
assert_eq!(
assert_matches!(
BankingStage::consume_or_forward_packets(
&my_pubkey,
Some(my_pubkey1),
false,
None,
false,
false
),
BufferedPacketsDecision::Forward
);
assert_eq!(
assert_matches!(
BankingStage::consume_or_forward_packets(
&my_pubkey,
Some(my_pubkey1),
false,
None,
true,
true
),
BufferedPacketsDecision::Hold
);
assert_eq!(
assert_matches!(
BankingStage::consume_or_forward_packets(
&my_pubkey,
Some(my_pubkey1),
false,
None,
true,
false
),
BufferedPacketsDecision::ForwardAndHold
);
assert_eq!(
assert_matches!(
BankingStage::consume_or_forward_packets(
&my_pubkey,
Some(my_pubkey1),
true,
Some(&bank),
false,
false
),
BufferedPacketsDecision::Consume
BufferedPacketsDecision::Consume(_)
);
assert_eq!(
assert_matches!(
BankingStage::consume_or_forward_packets(
&my_pubkey1,
Some(my_pubkey1),
false,
None,
false,
false
),
BufferedPacketsDecision::Hold
);
assert_eq!(
assert_matches!(
BankingStage::consume_or_forward_packets(
&my_pubkey1,
Some(my_pubkey1),
true,
Some(&bank),
false,
false
),
BufferedPacketsDecision::Consume
BufferedPacketsDecision::Consume(_)
);
}
@ -2010,8 +2041,10 @@ mod tests {
genesis_config.hash(),
)];
let start = Arc::new(Instant::now());
let working_bank = WorkingBank {
bank: bank.clone(),
start,
min_tick_height: bank.tick_height(),
max_tick_height: bank.tick_height() + 1,
};
@ -2106,8 +2139,10 @@ mod tests {
system_transaction::transfer(&mint_keypair, &pubkey1, 1, genesis_config.hash()),
];
let start = Arc::new(Instant::now());
let working_bank = WorkingBank {
bank: bank.clone(),
start,
min_tick_height: bank.tick_height(),
max_tick_height: bank.tick_height() + 1,
};
@ -2196,6 +2231,9 @@ mod tests {
mint_keypair,
..
} = create_genesis_config(10_000);
let mut bank = Bank::new(&genesis_config);
// Allow arbitrary transaction processing time for the purposes of this test
bank.ns_per_slot = std::u128::MAX;
let bank = Arc::new(Bank::new(&genesis_config));
let pubkey = solana_sdk::pubkey::new_rand();
@ -2231,6 +2269,7 @@ mod tests {
let (processed_transactions_count, mut retryable_txs) =
BankingStage::process_transactions(
&bank,
&Instant::now(),
&transactions,
&poh_recorder,
None,
@ -2276,8 +2315,10 @@ mod tests {
let transactions = vec![success_tx, ix_error_tx, fail_tx];
bank.transfer(4, &mint_keypair, &keypair1.pubkey()).unwrap();
let start = Arc::new(Instant::now());
let working_bank = WorkingBank {
bank: bank.clone(),
start,
min_tick_height: bank.tick_height(),
max_tick_height: bank.tick_height() + 1,
};
@ -2420,8 +2461,10 @@ mod tests {
// When the working bank in poh_recorder is None, no packets should be processed
assert!(!poh_recorder.lock().unwrap().has_bank());
let max_tx_processing_ns = std::u128::MAX;
BankingStage::consume_buffered_packets(
&Pubkey::default(),
max_tx_processing_ns,
&poh_recorder,
&mut buffered_packets,
None,
@ -2436,6 +2479,7 @@ mod tests {
poh_recorder.lock().unwrap().set_bank(&bank);
BankingStage::consume_buffered_packets(
&Pubkey::default(),
max_tx_processing_ns,
&poh_recorder,
&mut buffered_packets,
None,
@ -2492,6 +2536,7 @@ mod tests {
.spawn(move || {
BankingStage::consume_buffered_packets(
&Pubkey::default(),
std::u128::MAX,
&poh_recorder_,
&mut buffered_packets,
None,

View File

@ -49,10 +49,12 @@ pub enum PohRecorderError {
type Result<T> = std::result::Result<T, PohRecorderError>;
pub type WorkingBankEntry = (Arc<Bank>, (Entry, u64));
pub type BankStart = (Arc<Bank>, Arc<Instant>);
#[derive(Clone)]
pub struct WorkingBank {
pub bank: Arc<Bank>,
pub start: Arc<Instant>,
pub min_tick_height: u64,
pub max_tick_height: u64,
}
@ -97,7 +99,14 @@ impl PohRecorder {
self.grace_ticks = grace_ticks;
self.leader_first_tick_height = leader_first_tick_height;
self.leader_last_tick_height = leader_last_tick_height;
datapoint_info!(
"leader-slot-start-to-cleared-elapsed-ms",
("slot", bank.slot(), i64),
("elapsed", working_bank.start.elapsed().as_millis(), i64),
);
}
if let Some(ref signal) = self.clear_bank_signal {
let _ = signal.try_send(true);
}
@ -126,7 +135,13 @@ impl PohRecorder {
}
pub fn bank(&self) -> Option<Arc<Bank>> {
self.working_bank.clone().map(|w| w.bank)
self.working_bank.as_ref().map(|w| w.bank.clone())
}
pub fn bank_start(&self) -> Option<BankStart> {
self.working_bank
.as_ref()
.map(|w| (w.bank.clone(), w.start.clone()))
}
pub fn has_bank(&self) -> bool {
@ -273,11 +288,15 @@ impl PohRecorder {
trace!("new working bank");
assert_eq!(working_bank.bank.ticks_per_slot(), self.ticks_per_slot());
self.working_bank = Some(working_bank);
// TODO: adjust the working_bank.start time based on number of ticks
// that have already elapsed based on current tick height.
let _ = self.flush_cache(false);
}
pub fn set_bank(&mut self, bank: &Arc<Bank>) {
let working_bank = WorkingBank {
bank: bank.clone(),
start: Arc::new(Instant::now()),
min_tick_height: bank.tick_height(),
max_tick_height: bank.max_tick_height(),
};
@ -517,6 +536,18 @@ impl PohRecorder {
)
}
// Filters the return result of PohRecorder::bank_start(), returns the bank
// if it's still processing transactions
pub fn get_bank_still_processing_txs(bank_start: &Option<BankStart>) -> Option<&Arc<Bank>> {
bank_start.as_ref().and_then(|(bank, bank_creation_time)| {
if Bank::should_bank_still_be_processing_txs(bank_creation_time, bank.ns_per_slot) {
Some(bank)
} else {
None
}
})
}
#[cfg(test)]
pub fn schedule_dummy_max_height_reached_failure(&mut self) {
self.reset(Hash::default(), 1, None);
@ -635,8 +666,10 @@ mod tests {
&Arc::new(PohConfig::default()),
);
let start = Arc::new(Instant::now());
let working_bank = WorkingBank {
bank,
start,
min_tick_height: 2,
max_tick_height: 3,
};
@ -669,8 +702,10 @@ mod tests {
&Arc::new(PohConfig::default()),
);
let start = Arc::new(Instant::now());
let working_bank = WorkingBank {
bank: bank.clone(),
start,
min_tick_height: 2,
max_tick_height: 3,
};
@ -725,8 +760,10 @@ mod tests {
assert_eq!(poh_recorder.tick_cache.last().unwrap().1, 4);
assert_eq!(poh_recorder.tick_height, 4);
let start = Arc::new(Instant::now());
let working_bank = WorkingBank {
bank,
start,
min_tick_height: 2,
max_tick_height: 3,
};
@ -765,8 +802,10 @@ mod tests {
&Arc::new(PohConfig::default()),
);
let start = Arc::new(Instant::now());
let working_bank = WorkingBank {
bank: bank.clone(),
start,
min_tick_height: 2,
max_tick_height: 3,
};
@ -801,8 +840,10 @@ mod tests {
&Arc::new(PohConfig::default()),
);
let start = Arc::new(Instant::now());
let working_bank = WorkingBank {
bank: bank.clone(),
start,
min_tick_height: 1,
max_tick_height: 2,
};
@ -841,8 +882,10 @@ mod tests {
&Arc::new(PohConfig::default()),
);
let start = Arc::new(Instant::now());
let working_bank = WorkingBank {
bank: bank.clone(),
start,
min_tick_height: 1,
max_tick_height: 2,
};
@ -885,8 +928,10 @@ mod tests {
&Arc::new(PohConfig::default()),
);
let start = Arc::new(Instant::now());
let working_bank = WorkingBank {
bank: bank.clone(),
start,
min_tick_height: 1,
max_tick_height: 2,
};
@ -927,8 +972,10 @@ mod tests {
&Arc::new(PohConfig::default()),
);
let start = Arc::new(Instant::now());
let working_bank = WorkingBank {
bank,
start,
min_tick_height: 2,
max_tick_height: 3,
};
@ -1049,8 +1096,10 @@ mod tests {
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
let start = Arc::new(Instant::now());
let working_bank = WorkingBank {
bank,
start,
min_tick_height: 2,
max_tick_height: 3,
};
@ -1118,8 +1167,10 @@ mod tests {
let end_slot = 3;
let max_tick_height = (end_slot + 1) * ticks_per_slot;
let start = Arc::new(Instant::now());
let working_bank = WorkingBank {
bank: bank.clone(),
start,
min_tick_height: 1,
max_tick_height,
};

View File

@ -208,8 +208,10 @@ mod tests {
);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let exit = Arc::new(AtomicBool::new(false));
let start = Arc::new(Instant::now());
let working_bank = WorkingBank {
bank: bank.clone(),
start,
min_tick_height: bank.tick_height(),
max_tick_height: std::u64::MAX,
};

View File

@ -90,6 +90,7 @@ use std::{
LockResult, RwLockWriteGuard, {Arc, RwLock, RwLockReadGuard},
},
time::Duration,
time::Instant,
};
pub const SECONDS_PER_YEAR: f64 = 365.25 * 24.0 * 60.0 * 60.0;
@ -782,7 +783,7 @@ pub struct Bank {
ticks_per_slot: u64,
/// length of a slot in ns
ns_per_slot: u128,
pub ns_per_slot: u128,
/// genesis time, used for computed clock
genesis_creation_time: UnixTimestamp,
@ -4638,6 +4639,16 @@ impl Bank {
.is_active(&feature_set::check_init_vote_data::id())
}
// Check if the wallclock time from bank creation to now has exceeded the allotted
// time for transaction processing
pub fn should_bank_still_be_processing_txs(
bank_creation_time: &Instant,
max_tx_ingestion_nanos: u128,
) -> bool {
// Do this check outside of the poh lock, hence not a method on PohRecorder
bank_creation_time.elapsed().as_nanos() <= max_tx_ingestion_nanos
}
pub fn deactivate_feature(&mut self, id: &Pubkey) {
let mut feature_set = Arc::make_mut(&mut self.feature_set).clone();
feature_set.active.remove(&id);

View File

@ -6,6 +6,8 @@ pub const DEFAULT_TICKS_PER_SECOND: u64 = 160;
pub const MS_PER_TICK: u64 = 1000 / DEFAULT_TICKS_PER_SECOND;
pub const SLOT_MS: u64 = (DEFAULT_TICKS_PER_SLOT * 1000) / DEFAULT_TICKS_PER_SECOND;
// At 160 ticks/s, 64 ticks per slot implies that leader rotation and voting will happen
// every 400 ms. A fast voting cadence ensures faster finality and convergence
pub const DEFAULT_TICKS_PER_SLOT: u64 = 64;