eliminate lock on record (#15929)

* eliminate lock on record

* use same error as MaxHeightReached

* clippy

* review feedback

* refactor should_tick code

* pr feedback
This commit is contained in:
Jeff Washington (jwash) 2021-03-23 09:10:04 -05:00 committed by GitHub
parent 6271665ba6
commit 57ba86c821
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 452 additions and 153 deletions

View File

@ -66,6 +66,8 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
let (exit, poh_recorder, poh_service, _signal_receiver) =
create_test_recorder(&bank, &blockstore, None);
let recorder = poh_recorder.lock().unwrap().recorder();
let tx = test_tx();
let len = 4096;
let chunk_size = 1024;
@ -88,6 +90,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
&s,
None::<Box<dyn Fn()>>,
None,
&recorder,
);
});

View File

@ -4,7 +4,7 @@
use crate::{
cluster_info::ClusterInfo,
packet_hasher::PacketHasher,
poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntry},
poh_recorder::{PohRecorder, PohRecorderError, TransactionRecorder, WorkingBankEntry},
poh_service::{self, PohService},
};
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
@ -295,6 +295,7 @@ impl BankingStage {
gossip_vote_sender: &ReplayVoteSender,
test_fn: Option<impl Fn()>,
banking_stage_stats: Option<&BankingStageStats>,
recorder: &TransactionRecorder,
) {
let mut rebuffered_packets_len = 0;
let mut new_tx_count = 0;
@ -323,7 +324,7 @@ impl BankingStage {
Self::process_packets_transactions(
&bank,
&bank_creation_time,
&poh_recorder,
&recorder,
&msgs,
original_unprocessed_indexes.to_owned(),
transaction_status_sender.clone(),
@ -428,6 +429,7 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
recorder: &TransactionRecorder,
) -> BufferedPacketsDecision {
let bank_start;
let (
@ -467,6 +469,7 @@ impl BankingStage {
gossip_vote_sender,
None::<Box<dyn Fn()>>,
Some(banking_stage_stats),
recorder,
);
}
BufferedPacketsDecision::Forward => {
@ -544,6 +547,7 @@ impl BankingStage {
gossip_vote_sender: ReplayVoteSender,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
) {
let recorder = poh_recorder.lock().unwrap().recorder();
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut buffered_packets = VecDeque::with_capacity(batch_limit);
let banking_stage_stats = BankingStageStats::new(id);
@ -552,13 +556,14 @@ impl BankingStage {
let decision = Self::process_buffered_packets(
&my_pubkey,
&socket,
poh_recorder,
&poh_recorder,
cluster_info,
&mut buffered_packets,
enable_forwarding,
transaction_status_sender.clone(),
&gossip_vote_sender,
&banking_stage_stats,
&recorder,
);
if matches!(decision, BufferedPacketsDecision::Hold)
|| matches!(decision, BufferedPacketsDecision::ForwardAndHold)
@ -592,6 +597,7 @@ impl BankingStage {
&mut buffered_packets,
&banking_stage_stats,
duplicates,
&recorder,
) {
Ok(()) | Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => break,
@ -625,7 +631,7 @@ impl BankingStage {
bank_slot: Slot,
txs: &[Transaction],
results: &[TransactionExecutionResult],
poh: &Arc<Mutex<PohRecorder>>,
recorder: &TransactionRecorder,
) -> (Result<usize, PohRecorderError>, Vec<usize>) {
let mut processed_generation = Measure::start("record::process_generation");
let (processed_transactions, processed_transactions_indexes): (Vec<_>, Vec<_>) = results
@ -655,10 +661,7 @@ impl BankingStage {
let mut poh_record = Measure::start("record::poh_record");
// record and unlock will unlock all the successful transactions
let res = poh
.lock()
.unwrap()
.record(bank_slot, hash, processed_transactions);
let res = recorder.record(bank_slot, hash, processed_transactions);
match res {
Ok(()) => (),
Err(PohRecorderError::MaxHeightReached) => {
@ -683,7 +686,7 @@ impl BankingStage {
fn process_and_record_transactions_locked(
bank: &Arc<Bank>,
poh: &Arc<Mutex<PohRecorder>>,
poh: &TransactionRecorder,
batch: &TransactionBatch,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
@ -802,7 +805,7 @@ impl BankingStage {
pub fn process_and_record_transactions(
bank: &Arc<Bank>,
txs: &[Transaction],
poh: &Arc<Mutex<PohRecorder>>,
poh: &TransactionRecorder,
chunk_offset: usize,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
@ -846,7 +849,7 @@ impl BankingStage {
bank: &Arc<Bank>,
bank_creation_time: &Instant,
transactions: &[Transaction],
poh: &Arc<Mutex<PohRecorder>>,
poh: &TransactionRecorder,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
) -> (usize, Vec<usize>) {
@ -1017,7 +1020,7 @@ impl BankingStage {
fn process_packets_transactions(
bank: &Arc<Bank>,
bank_creation_time: &Instant,
poh: &Arc<Mutex<PohRecorder>>,
poh: &TransactionRecorder,
msgs: &Packets,
packet_indexes: Vec<usize>,
transaction_status_sender: Option<TransactionStatusSender>,
@ -1131,6 +1134,7 @@ impl BankingStage {
buffered_packets: &mut UnprocessedPackets,
banking_stage_stats: &BankingStageStats,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
recorder: &TransactionRecorder,
) -> Result<(), RecvTimeoutError> {
let mut recv_time = Measure::start("process_packets_recv");
let mms = verified_receiver.recv_timeout(recv_timeout)?;
@ -1173,7 +1177,7 @@ impl BankingStage {
Self::process_packets_transactions(
&bank,
&bank_creation_time,
&poh,
recorder,
&msgs,
packet_indexes,
transaction_status_sender.clone(),
@ -1309,7 +1313,7 @@ pub fn create_test_recorder(
) {
let exit = Arc::new(AtomicBool::new(false));
let poh_config = Arc::new(poh_config.unwrap_or_default());
let (mut poh_recorder, entry_receiver) = PohRecorder::new(
let (mut poh_recorder, entry_receiver, record_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
@ -1330,6 +1334,7 @@ pub fn create_test_recorder(
bank.ticks_per_slot(),
poh_service::DEFAULT_PINNED_CPU_CORE,
poh_service::DEFAULT_HASHES_PER_BATCH,
record_receiver,
);
(exit, poh_recorder, poh_service, entry_receiver)
@ -1339,7 +1344,7 @@ pub fn create_test_recorder(
mod tests {
use super::*;
use crate::{
cluster_info::Node, poh_recorder::WorkingBank,
cluster_info::Node, poh_recorder::Record, poh_recorder::WorkingBank,
transaction_status_service::TransactionStatusService,
};
use crossbeam_channel::unbounded;
@ -1359,7 +1364,15 @@ mod tests {
transaction::TransactionError,
};
use solana_transaction_status::TransactionWithStatusMeta;
use std::{net::SocketAddr, path::Path, sync::atomic::Ordering, thread::sleep};
use std::{
net::SocketAddr,
path::Path,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::Receiver,
},
thread::sleep,
};
#[test]
fn test_banking_stage_shutdown1() {
@ -1684,6 +1697,8 @@ mod tests {
#[test]
fn test_bank_record_transactions() {
solana_logger::setup();
let GenesisConfigInfo {
genesis_config,
mint_keypair,
@ -1701,7 +1716,8 @@ mod tests {
{
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let (poh_recorder, entry_receiver) = PohRecorder::new(
let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new(
// TODO use record_receiver
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
@ -1712,8 +1728,11 @@ mod tests {
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
let recorder = poh_recorder.recorder();
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let (poh_simulator, exit) = simulate_poh(record_receiver, &poh_recorder);
poh_recorder.lock().unwrap().set_working_bank(working_bank);
let pubkey = solana_sdk::pubkey::new_rand();
let keypair2 = Keypair::new();
@ -1725,12 +1744,8 @@ mod tests {
];
let mut results = vec![(Ok(()), None), (Ok(()), None)];
let _ = BankingStage::record_transactions(
bank.slot(),
&transactions,
&results,
&poh_recorder,
);
let _ =
BankingStage::record_transactions(bank.slot(), &transactions, &results, &recorder);
let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
assert_eq!(entry.transactions.len(), transactions.len());
@ -1742,12 +1757,8 @@ mod tests {
)),
None,
);
let (res, retryable) = BankingStage::record_transactions(
bank.slot(),
&transactions,
&results,
&poh_recorder,
);
let (res, retryable) =
BankingStage::record_transactions(bank.slot(), &transactions, &results, &recorder);
res.unwrap();
assert!(retryable.is_empty());
let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
@ -1755,12 +1766,8 @@ mod tests {
// Other TransactionErrors should not be recorded
results[0] = (Err(TransactionError::AccountNotFound), None);
let (res, retryable) = BankingStage::record_transactions(
bank.slot(),
&transactions,
&results,
&poh_recorder,
);
let (res, retryable) =
BankingStage::record_transactions(bank.slot(), &transactions, &results, &recorder);
res.unwrap();
assert!(retryable.is_empty());
let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
@ -1773,7 +1780,7 @@ mod tests {
bank.slot() + 1,
&transactions,
&results,
&poh_recorder,
&recorder,
);
assert_matches!(res, Err(PohRecorderError::MaxHeightReached));
// The first result was an error so it's filtered out. The second result was Ok(),
@ -1781,6 +1788,9 @@ mod tests {
assert_eq!(retryable, vec![1]);
// Should receive nothing from PohRecorder b/c record failed
assert!(entry_receiver.try_recv().is_err());
exit.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
}
Blockstore::destroy(&ledger_path).unwrap();
}
@ -2052,7 +2062,7 @@ mod tests {
{
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let (poh_recorder, entry_receiver) = PohRecorder::new(
let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
@ -2063,15 +2073,18 @@ mod tests {
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
let recorder = poh_recorder.recorder();
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let (poh_simulator, exit) = simulate_poh(record_receiver, &poh_recorder);
poh_recorder.lock().unwrap().set_working_bank(working_bank);
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
BankingStage::process_and_record_transactions(
&bank,
&transactions,
&poh_recorder,
&recorder,
0,
None,
&gossip_vote_sender,
@ -2108,7 +2121,7 @@ mod tests {
BankingStage::process_and_record_transactions(
&bank,
&transactions,
&poh_recorder,
&recorder,
0,
None,
&gossip_vote_sender,
@ -2117,11 +2130,36 @@ mod tests {
Err(PohRecorderError::MaxHeightReached)
);
exit.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
assert_eq!(bank.get_balance(&pubkey), 1);
}
Blockstore::destroy(&ledger_path).unwrap();
}
fn simulate_poh(
record_receiver: Receiver<Record>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
) -> (JoinHandle<()>, Arc<AtomicBool>) {
let exit = Arc::new(AtomicBool::new(false));
let exit_ = exit.clone();
let poh_recorder = poh_recorder.clone();
let tick_producer = Builder::new()
.name("solana-simulate_poh".to_string())
.spawn(move || loop {
PohService::read_record_receiver_and_process(
&poh_recorder,
&record_receiver,
Duration::from_millis(10),
);
if exit_.load(Ordering::Relaxed) {
break;
}
});
(tick_producer.unwrap(), exit)
}
#[test]
fn test_bank_process_and_record_transactions_account_in_use() {
solana_logger::setup();
@ -2150,7 +2188,7 @@ mod tests {
{
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let (poh_recorder, _entry_receiver) = PohRecorder::new(
let (poh_recorder, _entry_receiver, record_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
@ -2161,21 +2199,27 @@ mod tests {
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
let recorder = poh_recorder.recorder();
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
poh_recorder.lock().unwrap().set_working_bank(working_bank);
let (poh_simulator, exit) = simulate_poh(record_receiver, &poh_recorder);
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
let (result, unprocessed) = BankingStage::process_and_record_transactions(
&bank,
&transactions,
&poh_recorder,
&recorder,
0,
None,
&gossip_vote_sender,
);
exit.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
assert!(result.is_ok());
assert_eq!(unprocessed.len(), 1);
}
@ -2245,7 +2289,7 @@ mod tests {
{
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let (poh_recorder, _entry_receiver) = PohRecorder::new(
let (poh_recorder, _entry_receiver, record_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
@ -2257,9 +2301,12 @@ mod tests {
&Arc::new(PohConfig::default()),
);
// Poh Recorder has not working bank, so should throw MaxHeightReached error on
// Poh Recorder has no working bank, so should throw MaxHeightReached error on
// record
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let recorder = poh_recorder.recorder();
let (poh_simulator, exit) =
simulate_poh(record_receiver, &Arc::new(Mutex::new(poh_recorder)));
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
@ -2268,7 +2315,7 @@ mod tests {
&bank,
&Instant::now(),
&transactions,
&poh_recorder,
&recorder,
None,
&gossip_vote_sender,
);
@ -2278,6 +2325,9 @@ mod tests {
retryable_txs.sort_unstable();
let expected: Vec<usize> = (0..transactions.len()).collect();
assert_eq!(retryable_txs, expected);
exit.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
}
Blockstore::destroy(&ledger_path).unwrap();
@ -2324,7 +2374,7 @@ mod tests {
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let blockstore = Arc::new(blockstore);
let (poh_recorder, _entry_receiver) = PohRecorder::new(
let (poh_recorder, _entry_receiver, record_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
@ -2335,8 +2385,11 @@ mod tests {
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
let recorder = poh_recorder.recorder();
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let (poh_simulator, exit) = simulate_poh(record_receiver, &poh_recorder);
poh_recorder.lock().unwrap().set_working_bank(working_bank);
let shreds = entries_to_test_shreds(entries, bank.slot(), 0, true, 0);
@ -2355,7 +2408,7 @@ mod tests {
let _ = BankingStage::process_and_record_transactions(
&bank,
&transactions,
&poh_recorder,
&recorder,
0,
Some(TransactionStatusSender {
sender: transaction_status_sender,
@ -2388,10 +2441,14 @@ mod tests {
assert_eq!(meta, None);
}
}
exit.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
}
Blockstore::destroy(&ledger_path).unwrap();
}
#[allow(clippy::type_complexity)]
fn setup_conflicting_transactions(
ledger_path: &Path,
) -> (
@ -2399,6 +2456,8 @@ mod tests {
Arc<Bank>,
Arc<Mutex<PohRecorder>>,
Receiver<WorkingBankEntry>,
JoinHandle<()>,
Arc<AtomicBool>,
) {
Blockstore::destroy(&ledger_path).unwrap();
let genesis_config_info = create_genesis_config(10_000);
@ -2410,7 +2469,7 @@ mod tests {
let blockstore =
Blockstore::open(&ledger_path).expect("Expected to be able to open database ledger");
let bank = Arc::new(Bank::new(&genesis_config));
let (poh_recorder, entry_receiver) = PohRecorder::new(
let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
@ -2432,15 +2491,25 @@ mod tests {
system_transaction::transfer(&mint_keypair, &pubkey1, 1, genesis_config.hash()),
system_transaction::transfer(&mint_keypair, &pubkey2, 1, genesis_config.hash()),
];
(transactions, bank, poh_recorder, entry_receiver)
let (poh_simulator, exit) = simulate_poh(record_receiver, &poh_recorder);
(
transactions,
bank,
poh_recorder,
entry_receiver,
poh_simulator,
exit,
)
}
#[test]
fn test_consume_buffered_packets() {
let ledger_path = get_tmp_ledger_path!();
{
let (transactions, bank, poh_recorder, _entry_receiver) =
let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator, exit) =
setup_conflicting_transactions(&ledger_path);
let recorder = poh_recorder.lock().unwrap().recorder();
let num_conflicting_transactions = transactions.len();
let mut packets_vec = to_packets_chunked(&transactions, num_conflicting_transactions);
assert_eq!(packets_vec.len(), 1);
@ -2468,6 +2537,7 @@ mod tests {
&gossip_vote_sender,
None::<Box<dyn Fn()>>,
None,
&recorder,
);
assert_eq!(buffered_packets[0].1.len(), num_conflicting_transactions);
// When the poh recorder has a bank, should process all non conflicting buffered packets.
@ -2483,6 +2553,7 @@ mod tests {
&gossip_vote_sender,
None::<Box<dyn Fn()>>,
None,
&recorder,
);
if num_expected_unprocessed == 0 {
assert!(buffered_packets.is_empty())
@ -2490,6 +2561,8 @@ mod tests {
assert_eq!(buffered_packets[0].1.len(), num_expected_unprocessed);
}
}
exit.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
}
Blockstore::destroy(&ledger_path).unwrap();
}
@ -2498,7 +2571,7 @@ mod tests {
fn test_consume_buffered_packets_interrupted() {
let ledger_path = get_tmp_ledger_path!();
{
let (transactions, bank, poh_recorder, _entry_receiver) =
let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator, exit) =
setup_conflicting_transactions(&ledger_path);
let num_conflicting_transactions = transactions.len();
let packets_vec = to_packets_chunked(&transactions, 1);
@ -2526,6 +2599,7 @@ mod tests {
let interrupted_iteration = 1;
poh_recorder.lock().unwrap().set_bank(&bank);
let poh_recorder_ = poh_recorder.clone();
let recorder = poh_recorder_.lock().unwrap().recorder();
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
// Start up thread to process the banks
let t_consume = Builder::new()
@ -2540,6 +2614,7 @@ mod tests {
&gossip_vote_sender,
test_fn,
None,
&recorder,
);
// Check everything is correct. All indexes after `interrupted_iteration`
@ -2573,6 +2648,8 @@ mod tests {
}
t_consume.join().unwrap();
exit.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
}
Blockstore::destroy(&ledger_path).unwrap();
}

View File

@ -51,6 +51,80 @@ type Result<T> = std::result::Result<T, PohRecorderError>;
pub type WorkingBankEntry = (Arc<Bank>, (Entry, u64));
pub type BankStart = (Arc<Bank>, Arc<Instant>);
pub struct Record {
pub mixin: Hash,
pub transactions: Vec<Transaction>,
pub slot: Slot,
pub sender: Sender<Result<()>>,
}
impl Record {
pub fn new(
mixin: Hash,
transactions: Vec<Transaction>,
slot: Slot,
sender: Sender<Result<()>>,
) -> Self {
Self {
mixin,
transactions,
slot,
sender,
}
}
}
pub struct TransactionRecorder {
// shared by all users of PohRecorder
pub record_sender: Sender<Record>,
// unique to this caller
pub result_sender: Sender<Result<()>>,
pub result_receiver: Receiver<Result<()>>,
}
impl Clone for TransactionRecorder {
fn clone(&self) -> Self {
TransactionRecorder::new(self.record_sender.clone())
}
}
impl TransactionRecorder {
pub fn new(record_sender: Sender<Record>) -> Self {
let (result_sender, result_receiver) = channel();
Self {
// shared
record_sender,
// unique to this caller
result_sender,
result_receiver,
}
}
pub fn record(
&self,
bank_slot: Slot,
mixin: Hash,
transactions: Vec<Transaction>,
) -> Result<()> {
let res = self.record_sender.send(Record::new(
mixin,
transactions,
bank_slot,
self.result_sender.clone(),
));
if res.is_err() {
// If the channel is dropped, then the validator is shutting down so return that we are hitting
// the max tick height to stop transaction processing and flush any transactions in the pipeline.
return Err(PohRecorderError::MaxHeightReached);
}
let res = self
.result_receiver
.recv_timeout(std::time::Duration::from_millis(2000));
match res {
Err(_err) => Err(PohRecorderError::MaxHeightReached),
Ok(result) => result,
}
}
}
#[derive(Clone)]
pub struct WorkingBank {
pub bank: Arc<Bank>,
@ -86,6 +160,7 @@ pub struct PohRecorder {
record_us: u64,
ticks_from_record: u64,
last_metric: Instant,
record_sender: Sender<Record>,
}
impl PohRecorder {
@ -162,6 +237,10 @@ impl PohRecorder {
self.ticks_per_slot
}
pub fn recorder(&self) -> TransactionRecorder {
TransactionRecorder::new(self.record_sender.clone())
}
fn is_same_fork_as_previous_leader(&self, slot: Slot) -> bool {
(slot.saturating_sub(NUM_CONSECUTIVE_LEADER_SLOTS)..slot).any(|slot| {
// Check if the last slot Poh reset to was any of the
@ -503,12 +582,13 @@ impl PohRecorder {
clear_bank_signal: Option<SyncSender<bool>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
poh_config: &Arc<PohConfig>,
) -> (Self, Receiver<WorkingBankEntry>) {
) -> (Self, Receiver<WorkingBankEntry>, Receiver<Record>) {
let poh = Arc::new(Mutex::new(Poh::new(
last_entry_hash,
poh_config.hashes_per_tick,
)));
let (sender, receiver) = channel();
let (record_sender, record_receiver) = channel();
let (leader_first_tick_height, leader_last_tick_height, grace_ticks) =
Self::compute_leader_slot_tick_heights(next_leader_slot, ticks_per_slot);
(
@ -539,8 +619,10 @@ impl PohRecorder {
tick_overhead_us: 0,
ticks_from_record: 0,
last_metric: Instant::now(),
record_sender,
},
receiver,
record_receiver,
)
}
@ -557,7 +639,7 @@ impl PohRecorder {
blockstore: &Arc<Blockstore>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
poh_config: &Arc<PohConfig>,
) -> (Self, Receiver<WorkingBankEntry>) {
) -> (Self, Receiver<WorkingBankEntry>, Receiver<Record>) {
Self::new_with_clear_signal(
tick_height,
last_entry_hash,
@ -609,7 +691,7 @@ mod tests {
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -636,7 +718,7 @@ mod tests {
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -662,7 +744,7 @@ mod tests {
{
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
Hash::default(),
0,
@ -690,7 +772,7 @@ mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new(&genesis_config));
let prev_hash = bank.last_blockhash();
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -726,7 +808,7 @@ mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new(&genesis_config));
let prev_hash = bank.last_blockhash();
let (mut poh_recorder, entry_receiver) = PohRecorder::new(
let (mut poh_recorder, entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -777,7 +859,7 @@ mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new(&genesis_config));
let prev_hash = bank.last_blockhash();
let (mut poh_recorder, entry_receiver) = PohRecorder::new(
let (mut poh_recorder, entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -826,7 +908,7 @@ mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new(&genesis_config));
let prev_hash = bank.last_blockhash();
let (mut poh_recorder, entry_receiver) = PohRecorder::new(
let (mut poh_recorder, entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -864,7 +946,7 @@ mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new(&genesis_config));
let prev_hash = bank.last_blockhash();
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -906,7 +988,7 @@ mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new(&genesis_config));
let prev_hash = bank.last_blockhash();
let (mut poh_recorder, entry_receiver) = PohRecorder::new(
let (mut poh_recorder, entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -952,7 +1034,7 @@ mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new(&genesis_config));
let prev_hash = bank.last_blockhash();
let (mut poh_recorder, entry_receiver) = PohRecorder::new(
let (mut poh_recorder, entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -996,7 +1078,7 @@ mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new(&genesis_config));
let prev_hash = bank.last_blockhash();
let (mut poh_recorder, entry_receiver) = PohRecorder::new(
let (mut poh_recorder, entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -1033,7 +1115,7 @@ mod tests {
{
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
Hash::default(),
0,
@ -1060,7 +1142,7 @@ mod tests {
{
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
Hash::default(),
0,
@ -1088,7 +1170,7 @@ mod tests {
{
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
Hash::default(),
0,
@ -1121,7 +1203,7 @@ mod tests {
.expect("Expected to be able to open database ledger");
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new(&genesis_config));
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
Hash::default(),
0,
@ -1155,18 +1237,19 @@ mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new(&genesis_config));
let (sender, receiver) = sync_channel(1);
let (mut poh_recorder, _entry_receiver) = PohRecorder::new_with_clear_signal(
0,
Hash::default(),
0,
None,
bank.ticks_per_slot(),
&Pubkey::default(),
&Arc::new(blockstore),
Some(sender),
&Arc::new(LeaderScheduleCache::default()),
&Arc::new(PohConfig::default()),
);
let (mut poh_recorder, _entry_receiver, _record_receiver) =
PohRecorder::new_with_clear_signal(
0,
Hash::default(),
0,
None,
bank.ticks_per_slot(),
&Pubkey::default(),
&Arc::new(blockstore),
Some(sender),
&Arc::new(LeaderScheduleCache::default()),
&Arc::new(PohConfig::default()),
);
poh_recorder.set_bank(&bank);
poh_recorder.clear_bank();
assert!(receiver.try_recv().is_ok());
@ -1189,7 +1272,7 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_config));
let prev_hash = bank.last_blockhash();
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -1238,7 +1321,7 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_config));
let prev_hash = bank.last_blockhash();
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -1300,7 +1383,7 @@ mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new(&genesis_config));
let prev_hash = bank.last_blockhash();
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -1429,7 +1512,7 @@ mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new(&genesis_config));
let prev_hash = bank.last_blockhash();
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
0,
@ -1497,7 +1580,7 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_config));
let genesis_hash = bank.last_blockhash();
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
bank.last_blockhash(),
0,

View File

@ -1,12 +1,13 @@
//! The `poh_service` module implements a service that records the passing of
//! "ticks", a measure of time in the PoH stream
use crate::poh_recorder::PohRecorder;
use crate::poh_recorder::{PohRecorder, Record};
use solana_ledger::poh::Poh;
use solana_measure::measure::Measure;
use solana_sdk::poh_config::PohConfig;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::{mpsc::Receiver, Arc, Mutex};
use std::thread::{self, sleep, Builder, JoinHandle};
use std::time::Instant;
use std::time::{Duration, Instant};
pub struct PohService {
tick_producer: JoinHandle<()>,
@ -24,6 +25,54 @@ pub const DEFAULT_PINNED_CPU_CORE: usize = 0;
const TARGET_SLOT_ADJUSTMENT_NS: u64 = 50_000_000;
#[derive(Debug)]
struct PohTiming {
num_ticks: u64,
num_hashes: u64,
total_sleep_us: u64,
total_lock_time_ns: u64,
total_hash_time_ns: u64,
total_tick_time_ns: u64,
last_metric: Instant,
}
impl PohTiming {
fn new() -> Self {
Self {
num_ticks: 0,
num_hashes: 0,
total_sleep_us: 0,
total_lock_time_ns: 0,
total_hash_time_ns: 0,
total_tick_time_ns: 0,
last_metric: Instant::now(),
}
}
fn report(&mut self, ticks_per_slot: u64) {
if self.last_metric.elapsed().as_millis() > 1000 {
let elapsed_us = self.last_metric.elapsed().as_micros() as u64;
let us_per_slot = (elapsed_us * ticks_per_slot) / self.num_ticks;
datapoint_info!(
"poh-service",
("ticks", self.num_ticks as i64, i64),
("hashes", self.num_hashes as i64, i64),
("elapsed_us", us_per_slot, i64),
("total_sleep_us", self.total_sleep_us, i64),
("total_tick_time_us", self.total_tick_time_ns / 1000, i64),
("total_lock_time_us", self.total_lock_time_ns / 1000, i64),
("total_hash_time_us", self.total_hash_time_ns / 1000, i64),
);
self.total_sleep_us = 0;
self.num_ticks = 0;
self.num_hashes = 0;
self.total_tick_time_ns = 0;
self.total_lock_time_ns = 0;
self.total_hash_time_ns = 0;
self.last_metric = Instant::now();
}
}
}
impl PohService {
pub fn new(
poh_recorder: Arc<Mutex<PohRecorder>>,
@ -32,6 +81,7 @@ impl PohService {
ticks_per_slot: u64,
pinned_cpu_core: usize,
hashes_per_batch: u64,
record_receiver: Receiver<Record>,
) -> Self {
let poh_exit_ = poh_exit.clone();
let poh_config = poh_config.clone();
@ -41,12 +91,18 @@ impl PohService {
solana_sys_tuner::request_realtime_poh();
if poh_config.hashes_per_tick.is_none() {
if poh_config.target_tick_count.is_none() {
Self::sleepy_tick_producer(poh_recorder, &poh_config, &poh_exit_);
Self::sleepy_tick_producer(
poh_recorder,
&poh_config,
&poh_exit_,
record_receiver,
);
} else {
Self::short_lived_sleepy_tick_producer(
poh_recorder,
&poh_config,
&poh_exit_,
record_receiver,
);
}
} else {
@ -69,6 +125,7 @@ impl PohService {
poh_config.target_tick_duration.as_nanos() as u64 - adjustment_per_tick,
ticks_per_slot,
hashes_per_batch,
record_receiver,
);
}
poh_exit_.store(true, Ordering::Relaxed);
@ -82,20 +139,53 @@ impl PohService {
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_config: &PohConfig,
poh_exit: &AtomicBool,
record_receiver: Receiver<Record>,
) {
while !poh_exit.load(Ordering::Relaxed) {
Self::read_record_receiver_and_process(
&poh_recorder,
&record_receiver,
Duration::from_millis(0),
);
sleep(poh_config.target_tick_duration);
poh_recorder.lock().unwrap().tick();
}
}
pub fn read_record_receiver_and_process(
poh_recorder: &Arc<Mutex<PohRecorder>>,
record_receiver: &Receiver<Record>,
timeout: Duration,
) {
let record = record_receiver.recv_timeout(timeout);
if let Ok(record) = record {
if record
.sender
.send(poh_recorder.lock().unwrap().record(
record.slot,
record.mixin,
record.transactions,
))
.is_err()
{
panic!("Error returning mixin hash");
}
}
}
fn short_lived_sleepy_tick_producer(
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_config: &PohConfig,
poh_exit: &AtomicBool,
record_receiver: Receiver<Record>,
) {
let mut warned = false;
for _ in 0..poh_config.target_tick_count.unwrap() {
Self::read_record_receiver_and_process(
&poh_recorder,
&record_receiver,
Duration::from_millis(0),
);
sleep(poh_config.target_tick_duration);
poh_recorder.lock().unwrap().tick();
if poh_exit.load(Ordering::Relaxed) && !warned {
@ -105,78 +195,121 @@ impl PohService {
}
}
fn record_or_hash(
next_record: &mut Option<Record>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
timing: &mut PohTiming,
record_receiver: &Receiver<Record>,
hashes_per_batch: u64,
poh: &Arc<Mutex<Poh>>,
) -> bool {
match next_record.take() {
Some(mut record) => {
// received message to record
// so, record for as long as we have queued up record requests
let mut lock_time = Measure::start("lock");
let mut poh_recorder_l = poh_recorder.lock().unwrap();
lock_time.stop();
timing.total_lock_time_ns += lock_time.as_ns();
loop {
let res = poh_recorder_l.record(
record.slot,
record.mixin,
std::mem::take(&mut record.transactions),
);
let _ = record.sender.send(res); // what do we do on failure here? Ignore for now.
timing.num_hashes += 1; // note: may have also ticked inside record
let new_record_result = record_receiver.try_recv();
match new_record_result {
Ok(new_record) => {
// we already have second request to record, so record again while we still have the mutex
record = new_record;
}
Err(_) => {
break;
}
}
}
// PohRecorder.record would have ticked if it needed to, so should_tick will be false
}
None => {
// did not receive instructions to record, so hash until we notice we've been asked to record (or we need to tick) and then remember what to record
let mut lock_time = Measure::start("lock");
let mut poh_l = poh.lock().unwrap();
lock_time.stop();
timing.total_lock_time_ns += lock_time.as_ns();
loop {
timing.num_hashes += hashes_per_batch;
let mut hash_time = Measure::start("hash");
let should_tick = poh_l.hash(hashes_per_batch);
hash_time.stop();
timing.total_hash_time_ns += hash_time.as_ns();
if should_tick {
return true; // nothing else can be done. tick required.
}
// check to see if a record request has been sent
let get_again = record_receiver.try_recv();
match get_again {
Ok(record) => {
// remember the record we just received as the next record to occur
*next_record = Some(record);
break;
}
Err(_) => {
continue;
}
}
}
}
};
false // should_tick = false for all code that reaches here
}
fn tick_producer(
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_exit: &AtomicBool,
target_tick_ns: u64,
ticks_per_slot: u64,
hashes_per_batch: u64,
record_receiver: Receiver<Record>,
) {
let poh = poh_recorder.lock().unwrap().poh.clone();
let mut now = Instant::now();
let mut last_metric = Instant::now();
let mut num_ticks = 0;
let mut num_hashes = 0;
let mut total_sleep_us = 0;
let mut total_lock_time_ns = 0;
let mut total_hash_time_ns = 0;
let mut total_tick_time_ns = 0;
let mut timing = PohTiming::new();
let mut next_record = None;
loop {
num_hashes += hashes_per_batch;
let should_tick = {
let mut lock_time = Measure::start("lock");
let mut poh_l = poh.lock().unwrap();
lock_time.stop();
total_lock_time_ns += lock_time.as_ns();
let mut hash_time = Measure::start("hash");
let r = poh_l.hash(hashes_per_batch);
hash_time.stop();
total_hash_time_ns += hash_time.as_ns();
r
};
let should_tick = Self::record_or_hash(
&mut next_record,
&poh_recorder,
&mut timing,
&record_receiver,
hashes_per_batch,
&poh,
);
if should_tick {
// Lock PohRecorder only for the final hash...
// Lock PohRecorder only for the final hash. record_or_hash will lock PohRecorder for record calls but not for hashing.
{
let mut lock_time = Measure::start("lock");
let mut poh_recorder_l = poh_recorder.lock().unwrap();
lock_time.stop();
total_lock_time_ns += lock_time.as_ns();
timing.total_lock_time_ns += lock_time.as_ns();
let mut tick_time = Measure::start("tick");
poh_recorder_l.tick();
tick_time.stop();
total_tick_time_ns += tick_time.as_ns();
timing.total_tick_time_ns += tick_time.as_ns();
}
num_ticks += 1;
timing.num_ticks += 1;
let elapsed_ns = now.elapsed().as_nanos() as u64;
// sleep is not accurate enough to get a predictable time.
// Kernel can not schedule the thread for a while.
while (now.elapsed().as_nanos() as u64) < target_tick_ns {
std::hint::spin_loop();
}
total_sleep_us += (now.elapsed().as_nanos() as u64 - elapsed_ns) / 1000;
timing.total_sleep_us += (now.elapsed().as_nanos() as u64 - elapsed_ns) / 1000;
now = Instant::now();
if last_metric.elapsed().as_millis() > 1000 {
let elapsed_us = last_metric.elapsed().as_micros() as u64;
let us_per_slot = (elapsed_us * ticks_per_slot) / num_ticks;
datapoint_info!(
"poh-service",
("ticks", num_ticks as i64, i64),
("hashes", num_hashes as i64, i64),
("elapsed_us", us_per_slot, i64),
("total_sleep_us", total_sleep_us, i64),
("total_tick_time_us", total_tick_time_ns / 1000, i64),
("total_lock_time_us", total_lock_time_ns / 1000, i64),
("total_hash_time_us", total_hash_time_ns / 1000, i64),
);
total_sleep_us = 0;
num_ticks = 0;
num_hashes = 0;
total_tick_time_ns = 0;
total_lock_time_ns = 0;
total_hash_time_ns = 0;
last_metric = Instant::now();
}
timing.report(ticks_per_slot);
if poh_exit.load(Ordering::Relaxed) {
break;
}
@ -225,7 +358,7 @@ mod tests {
target_tick_duration,
target_tick_count: None,
});
let (poh_recorder, entry_receiver) = PohRecorder::new(
let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new(
bank.tick_height(),
prev_hash,
bank.slot(),
@ -305,6 +438,7 @@ mod tests {
0,
DEFAULT_PINNED_CPU_CORE,
hashes_per_batch,
record_receiver,
);
poh_recorder.lock().unwrap().set_working_bank(working_bank);

View File

@ -801,7 +801,7 @@ mod test {
);
let bank = Arc::new(Bank::new(&genesis_config));
let (poh_recorder, _entry_receiver) = PohRecorder::new(
let (poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
bank.last_blockhash(),
0,

View File

@ -486,24 +486,25 @@ impl Validator {
);
let poh_config = Arc::new(genesis_config.poh_config.clone());
let (mut poh_recorder, entry_receiver) = PohRecorder::new_with_clear_signal(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
leader_schedule_cache.next_leader_slot(
&id,
let (mut poh_recorder, entry_receiver, record_receiver) =
PohRecorder::new_with_clear_signal(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
&bank,
Some(&blockstore),
GRACE_TICKS_FACTOR * MAX_GRACE_SLOTS,
),
bank.ticks_per_slot(),
&id,
&blockstore,
blockstore.new_shreds_signals.first().cloned(),
&leader_schedule_cache,
&poh_config,
);
leader_schedule_cache.next_leader_slot(
&id,
bank.slot(),
&bank,
Some(&blockstore),
GRACE_TICKS_FACTOR * MAX_GRACE_SLOTS,
),
bank.ticks_per_slot(),
&id,
&blockstore,
blockstore.new_shreds_signals.first().cloned(),
&leader_schedule_cache,
&poh_config,
);
if config.snapshot_config.is_some() {
poh_recorder.set_bank(&bank);
}
@ -644,6 +645,7 @@ impl Validator {
bank.ticks_per_slot(),
config.poh_pinned_cpu_core,
config.poh_hashes_per_batch,
record_receiver,
);
assert_eq!(
blockstore.new_shreds_signals.len(),