Fix channel panic in tests (#16503)
* Fix channel panic * Add exit signal to PohRecorder because Crossbeam doesnt drop objects inside dropped channel
This commit is contained in:
parent
eddfe06a00
commit
f0c150cfb9
|
@ -1390,6 +1390,7 @@ pub fn create_test_recorder(
|
|||
blockstore,
|
||||
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
|
||||
&poh_config,
|
||||
exit.clone(),
|
||||
);
|
||||
poh_recorder.set_bank(&bank);
|
||||
|
||||
|
@ -1435,10 +1436,7 @@ mod tests {
|
|||
use std::{
|
||||
net::SocketAddr,
|
||||
path::Path,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
mpsc::Receiver,
|
||||
},
|
||||
sync::atomic::{AtomicBool, Ordering},
|
||||
thread::sleep,
|
||||
};
|
||||
|
||||
|
@ -1795,11 +1793,12 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
let recorder = poh_recorder.recorder();
|
||||
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
|
||||
|
||||
let (poh_simulator, exit) = simulate_poh(record_receiver, &poh_recorder);
|
||||
let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
|
||||
|
||||
poh_recorder.lock().unwrap().set_working_bank(working_bank);
|
||||
let pubkey = solana_sdk::pubkey::new_rand();
|
||||
|
@ -1869,7 +1868,11 @@ mod tests {
|
|||
// Should receive nothing from PohRecorder b/c record failed
|
||||
assert!(entry_receiver.try_recv().is_err());
|
||||
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
poh_recorder
|
||||
.lock()
|
||||
.unwrap()
|
||||
.is_exited
|
||||
.store(true, Ordering::Relaxed);
|
||||
let _ = poh_simulator.join();
|
||||
}
|
||||
Blockstore::destroy(&ledger_path).unwrap();
|
||||
|
@ -2061,11 +2064,12 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
let recorder = poh_recorder.recorder();
|
||||
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
|
||||
|
||||
let (poh_simulator, exit) = simulate_poh(record_receiver, &poh_recorder);
|
||||
let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
|
||||
|
||||
poh_recorder.lock().unwrap().set_working_bank(working_bank);
|
||||
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
|
||||
|
@ -2120,7 +2124,11 @@ mod tests {
|
|||
Err(PohRecorderError::MaxHeightReached)
|
||||
);
|
||||
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
poh_recorder
|
||||
.lock()
|
||||
.unwrap()
|
||||
.is_exited
|
||||
.store(true, Ordering::Relaxed);
|
||||
let _ = poh_simulator.join();
|
||||
|
||||
assert_eq!(bank.get_balance(&pubkey), 1);
|
||||
|
@ -2129,12 +2137,11 @@ mod tests {
|
|||
}
|
||||
|
||||
fn simulate_poh(
|
||||
record_receiver: Receiver<Record>,
|
||||
record_receiver: CrossbeamReceiver<Record>,
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
) -> (JoinHandle<()>, Arc<AtomicBool>) {
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let exit_ = exit.clone();
|
||||
) -> JoinHandle<()> {
|
||||
let poh_recorder = poh_recorder.clone();
|
||||
let is_exited = poh_recorder.lock().unwrap().is_exited.clone();
|
||||
let tick_producer = Builder::new()
|
||||
.name("solana-simulate_poh".to_string())
|
||||
.spawn(move || loop {
|
||||
|
@ -2143,11 +2150,11 @@ mod tests {
|
|||
&record_receiver,
|
||||
Duration::from_millis(10),
|
||||
);
|
||||
if exit_.load(Ordering::Relaxed) {
|
||||
if is_exited.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
});
|
||||
(tick_producer.unwrap(), exit)
|
||||
tick_producer.unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -2188,13 +2195,14 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
let recorder = poh_recorder.recorder();
|
||||
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
|
||||
|
||||
poh_recorder.lock().unwrap().set_working_bank(working_bank);
|
||||
|
||||
let (poh_simulator, exit) = simulate_poh(record_receiver, &poh_recorder);
|
||||
let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
|
||||
|
||||
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
|
||||
|
||||
|
@ -2207,7 +2215,11 @@ mod tests {
|
|||
&gossip_vote_sender,
|
||||
);
|
||||
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
poh_recorder
|
||||
.lock()
|
||||
.unwrap()
|
||||
.is_exited
|
||||
.store(true, Ordering::Relaxed);
|
||||
let _ = poh_simulator.join();
|
||||
|
||||
assert!(result.is_ok());
|
||||
|
@ -2289,14 +2301,14 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
|
||||
// Poh Recorder has no working bank, so should throw MaxHeightReached error on
|
||||
// record
|
||||
let recorder = poh_recorder.recorder();
|
||||
|
||||
let (poh_simulator, exit) =
|
||||
simulate_poh(record_receiver, &Arc::new(Mutex::new(poh_recorder)));
|
||||
let poh_simulator = simulate_poh(record_receiver, &Arc::new(Mutex::new(poh_recorder)));
|
||||
|
||||
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
|
||||
|
||||
|
@ -2316,7 +2328,7 @@ mod tests {
|
|||
let expected: Vec<usize> = (0..transactions.len()).collect();
|
||||
assert_eq!(retryable_txs, expected);
|
||||
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
recorder.is_exited.store(true, Ordering::Relaxed);
|
||||
let _ = poh_simulator.join();
|
||||
}
|
||||
|
||||
|
@ -2374,11 +2386,12 @@ mod tests {
|
|||
&blockstore,
|
||||
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
let recorder = poh_recorder.recorder();
|
||||
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
|
||||
|
||||
let (poh_simulator, exit) = simulate_poh(record_receiver, &poh_recorder);
|
||||
let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
|
||||
|
||||
poh_recorder.lock().unwrap().set_working_bank(working_bank);
|
||||
|
||||
|
@ -2433,7 +2446,11 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
poh_recorder
|
||||
.lock()
|
||||
.unwrap()
|
||||
.is_exited
|
||||
.store(true, Ordering::Relaxed);
|
||||
let _ = poh_simulator.join();
|
||||
}
|
||||
Blockstore::destroy(&ledger_path).unwrap();
|
||||
|
@ -2448,7 +2465,6 @@ mod tests {
|
|||
Arc<Mutex<PohRecorder>>,
|
||||
Receiver<WorkingBankEntry>,
|
||||
JoinHandle<()>,
|
||||
Arc<AtomicBool>,
|
||||
) {
|
||||
Blockstore::destroy(&ledger_path).unwrap();
|
||||
let genesis_config_info = create_slow_genesis_config(10_000);
|
||||
|
@ -2460,6 +2476,7 @@ mod tests {
|
|||
let blockstore =
|
||||
Blockstore::open(&ledger_path).expect("Expected to be able to open database ledger");
|
||||
let bank = Arc::new(Bank::new_no_wallclock_throttle(&genesis_config));
|
||||
let exit = Arc::new(AtomicBool::default());
|
||||
let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new(
|
||||
bank.tick_height(),
|
||||
bank.last_blockhash(),
|
||||
|
@ -2470,6 +2487,7 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
|
||||
&Arc::new(PohConfig::default()),
|
||||
exit,
|
||||
);
|
||||
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
|
||||
|
||||
|
@ -2482,7 +2500,7 @@ mod tests {
|
|||
system_transaction::transfer(&mint_keypair, &pubkey1, 1, genesis_config.hash()),
|
||||
system_transaction::transfer(&mint_keypair, &pubkey2, 1, genesis_config.hash()),
|
||||
];
|
||||
let (poh_simulator, exit) = simulate_poh(record_receiver, &poh_recorder);
|
||||
let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
|
||||
|
||||
(
|
||||
transactions,
|
||||
|
@ -2490,7 +2508,6 @@ mod tests {
|
|||
poh_recorder,
|
||||
entry_receiver,
|
||||
poh_simulator,
|
||||
exit,
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -2498,7 +2515,7 @@ mod tests {
|
|||
fn test_consume_buffered_packets() {
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
{
|
||||
let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator, exit) =
|
||||
let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) =
|
||||
setup_conflicting_transactions(&ledger_path);
|
||||
let recorder = poh_recorder.lock().unwrap().recorder();
|
||||
let num_conflicting_transactions = transactions.len();
|
||||
|
@ -2552,7 +2569,11 @@ mod tests {
|
|||
assert_eq!(buffered_packets[0].1.len(), num_expected_unprocessed);
|
||||
}
|
||||
}
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
poh_recorder
|
||||
.lock()
|
||||
.unwrap()
|
||||
.is_exited
|
||||
.store(true, Ordering::Relaxed);
|
||||
let _ = poh_simulator.join();
|
||||
}
|
||||
Blockstore::destroy(&ledger_path).unwrap();
|
||||
|
@ -2562,7 +2583,7 @@ mod tests {
|
|||
fn test_consume_buffered_packets_interrupted() {
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
{
|
||||
let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator, exit) =
|
||||
let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) =
|
||||
setup_conflicting_transactions(&ledger_path);
|
||||
let num_conflicting_transactions = transactions.len();
|
||||
let packets_vec = to_packets_chunked(&transactions, 1);
|
||||
|
@ -2639,7 +2660,11 @@ mod tests {
|
|||
}
|
||||
|
||||
t_consume.join().unwrap();
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
poh_recorder
|
||||
.lock()
|
||||
.unwrap()
|
||||
.is_exited
|
||||
.store(true, Ordering::Relaxed);
|
||||
let _ = poh_simulator.join();
|
||||
}
|
||||
Blockstore::destroy(&ledger_path).unwrap();
|
||||
|
|
|
@ -11,6 +11,9 @@
|
|||
//! * recorded entry must be >= WorkingBank::min_tick_height && entry must be < WorkingBank::max_tick_height
|
||||
//!
|
||||
use crate::poh_service::PohService;
|
||||
use crossbeam_channel::{
|
||||
unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender,
|
||||
};
|
||||
use solana_ledger::blockstore::Blockstore;
|
||||
use solana_ledger::entry::Entry;
|
||||
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
|
||||
|
@ -24,9 +27,12 @@ use solana_sdk::pubkey::Pubkey;
|
|||
use solana_sdk::timing;
|
||||
use solana_sdk::transaction::Transaction;
|
||||
use std::cmp;
|
||||
use std::sync::mpsc::{channel, Receiver, SendError, Sender, SyncSender};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Instant;
|
||||
use std::sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
mpsc::{channel, Receiver, SendError, Sender, SyncSender},
|
||||
{Arc, Mutex},
|
||||
};
|
||||
use std::time::{Duration, Instant};
|
||||
use thiserror::Error;
|
||||
|
||||
pub const GRACE_TICKS_FACTOR: u64 = 2;
|
||||
|
@ -56,14 +62,14 @@ pub struct Record {
|
|||
pub mixin: Hash,
|
||||
pub transactions: Vec<Transaction>,
|
||||
pub slot: Slot,
|
||||
pub sender: Sender<Result<()>>,
|
||||
pub sender: CrossbeamSender<Result<()>>,
|
||||
}
|
||||
impl Record {
|
||||
pub fn new(
|
||||
mixin: Hash,
|
||||
transactions: Vec<Transaction>,
|
||||
slot: Slot,
|
||||
sender: Sender<Result<()>>,
|
||||
sender: CrossbeamSender<Result<()>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
mixin,
|
||||
|
@ -76,20 +82,23 @@ impl Record {
|
|||
|
||||
pub struct TransactionRecorder {
|
||||
// shared by all users of PohRecorder
|
||||
pub record_sender: Sender<Record>,
|
||||
pub record_sender: CrossbeamSender<Record>,
|
||||
pub is_exited: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl Clone for TransactionRecorder {
|
||||
fn clone(&self) -> Self {
|
||||
TransactionRecorder::new(self.record_sender.clone())
|
||||
TransactionRecorder::new(self.record_sender.clone(), self.is_exited.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl TransactionRecorder {
|
||||
pub fn new(record_sender: Sender<Record>) -> Self {
|
||||
pub fn new(record_sender: CrossbeamSender<Record>, is_exited: Arc<AtomicBool>) -> Self {
|
||||
Self {
|
||||
// shared
|
||||
record_sender,
|
||||
// shared
|
||||
is_exited,
|
||||
}
|
||||
}
|
||||
pub fn record(
|
||||
|
@ -99,7 +108,7 @@ impl TransactionRecorder {
|
|||
transactions: Vec<Transaction>,
|
||||
) -> Result<()> {
|
||||
// create a new channel so that there is only 1 sender and when it goes out of scope, the receiver fails
|
||||
let (result_sender, result_receiver) = channel();
|
||||
let (result_sender, result_receiver) = unbounded();
|
||||
let res =
|
||||
self.record_sender
|
||||
.send(Record::new(mixin, transactions, bank_slot, result_sender));
|
||||
|
@ -109,10 +118,26 @@ impl TransactionRecorder {
|
|||
return Err(PohRecorderError::MaxHeightReached);
|
||||
}
|
||||
// Besides validator exit, this timeout should primarily be seen to affect test execution environments where the various pieces can be shutdown abruptly
|
||||
let res = result_receiver.recv();
|
||||
let mut is_exited = false;
|
||||
loop {
|
||||
let res = result_receiver.recv_timeout(Duration::from_millis(1000));
|
||||
match res {
|
||||
Err(_err) => Err(PohRecorderError::MaxHeightReached),
|
||||
Ok(result) => result,
|
||||
Err(RecvTimeoutError::Timeout) => {
|
||||
if is_exited {
|
||||
return Err(PohRecorderError::MaxHeightReached);
|
||||
} else {
|
||||
// A result may have come in between when we timed out checking this
|
||||
// bool, so check the channel again, even if is_exited == true
|
||||
is_exited = self.is_exited.load(Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
Err(RecvTimeoutError::Disconnected) => {
|
||||
return Err(PohRecorderError::MaxHeightReached);
|
||||
}
|
||||
Ok(result) => {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -154,7 +179,8 @@ pub struct PohRecorder {
|
|||
record_us: u64,
|
||||
ticks_from_record: u64,
|
||||
last_metric: Instant,
|
||||
record_sender: Sender<Record>,
|
||||
record_sender: CrossbeamSender<Record>,
|
||||
pub is_exited: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl PohRecorder {
|
||||
|
@ -232,7 +258,7 @@ impl PohRecorder {
|
|||
}
|
||||
|
||||
pub fn recorder(&self) -> TransactionRecorder {
|
||||
TransactionRecorder::new(self.record_sender.clone())
|
||||
TransactionRecorder::new(self.record_sender.clone(), self.is_exited.clone())
|
||||
}
|
||||
|
||||
fn is_same_fork_as_previous_leader(&self, slot: Slot) -> bool {
|
||||
|
@ -601,7 +627,8 @@ impl PohRecorder {
|
|||
clear_bank_signal: Option<SyncSender<bool>>,
|
||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
||||
poh_config: &Arc<PohConfig>,
|
||||
) -> (Self, Receiver<WorkingBankEntry>, Receiver<Record>) {
|
||||
is_exited: Arc<AtomicBool>,
|
||||
) -> (Self, Receiver<WorkingBankEntry>, CrossbeamReceiver<Record>) {
|
||||
let tick_number = 0;
|
||||
let poh = Arc::new(Mutex::new(Poh::new_with_slot_info(
|
||||
last_entry_hash,
|
||||
|
@ -615,7 +642,7 @@ impl PohRecorder {
|
|||
poh_config.target_tick_duration.as_nanos() as u64,
|
||||
);
|
||||
let (sender, receiver) = channel();
|
||||
let (record_sender, record_receiver) = channel();
|
||||
let (record_sender, record_receiver) = unbounded();
|
||||
let (leader_first_tick_height, leader_last_tick_height, grace_ticks) =
|
||||
Self::compute_leader_slot_tick_heights(next_leader_slot, ticks_per_slot);
|
||||
(
|
||||
|
@ -649,6 +676,7 @@ impl PohRecorder {
|
|||
ticks_from_record: 0,
|
||||
last_metric: Instant::now(),
|
||||
record_sender,
|
||||
is_exited,
|
||||
},
|
||||
receiver,
|
||||
record_receiver,
|
||||
|
@ -658,6 +686,7 @@ impl PohRecorder {
|
|||
/// A recorder to synchronize PoH with the following data structures
|
||||
/// * bank - the LastId's queue is updated on `tick` and `record` events
|
||||
/// * sender - the Entry channel that outputs to the ledger
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
tick_height: u64,
|
||||
last_entry_hash: Hash,
|
||||
|
@ -668,7 +697,8 @@ impl PohRecorder {
|
|||
blockstore: &Arc<Blockstore>,
|
||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
||||
poh_config: &Arc<PohConfig>,
|
||||
) -> (Self, Receiver<WorkingBankEntry>, Receiver<Record>) {
|
||||
is_exited: Arc<AtomicBool>,
|
||||
) -> (Self, Receiver<WorkingBankEntry>, CrossbeamReceiver<Record>) {
|
||||
Self::new_with_clear_signal(
|
||||
tick_height,
|
||||
last_entry_hash,
|
||||
|
@ -680,6 +710,7 @@ impl PohRecorder {
|
|||
None,
|
||||
leader_schedule_cache,
|
||||
poh_config,
|
||||
is_exited,
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -730,6 +761,7 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::default()),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
poh_recorder.tick();
|
||||
assert_eq!(poh_recorder.tick_cache.len(), 1);
|
||||
|
@ -757,6 +789,7 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::default()),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
poh_recorder.tick();
|
||||
poh_recorder.tick();
|
||||
|
@ -783,6 +816,7 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::default()),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
poh_recorder.tick();
|
||||
assert_eq!(poh_recorder.tick_cache.len(), 1);
|
||||
|
@ -811,6 +845,7 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
|
||||
let start = Arc::new(Instant::now());
|
||||
|
@ -847,6 +882,7 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
|
||||
let start = Arc::new(Instant::now());
|
||||
|
@ -898,6 +934,7 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
|
||||
poh_recorder.tick();
|
||||
|
@ -947,6 +984,7 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
|
||||
let start = Arc::new(Instant::now());
|
||||
|
@ -985,6 +1023,7 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
|
||||
let start = Arc::new(Instant::now());
|
||||
|
@ -1027,6 +1066,7 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
|
||||
let start = Arc::new(Instant::now());
|
||||
|
@ -1073,6 +1113,7 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
|
||||
let start = Arc::new(Instant::now());
|
||||
|
@ -1117,6 +1158,7 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
|
||||
let start = Arc::new(Instant::now());
|
||||
|
@ -1154,6 +1196,7 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::default()),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
poh_recorder.tick();
|
||||
poh_recorder.tick();
|
||||
|
@ -1181,6 +1224,7 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::default()),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
poh_recorder.tick();
|
||||
poh_recorder.tick();
|
||||
|
@ -1209,6 +1253,7 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::default()),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
poh_recorder.tick();
|
||||
poh_recorder.tick();
|
||||
|
@ -1242,6 +1287,7 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
let start = Arc::new(Instant::now());
|
||||
let working_bank = WorkingBank {
|
||||
|
@ -1278,6 +1324,7 @@ mod tests {
|
|||
Some(sender),
|
||||
&Arc::new(LeaderScheduleCache::default()),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
poh_recorder.set_bank(&bank);
|
||||
poh_recorder.clear_bank();
|
||||
|
@ -1311,6 +1358,7 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
|
||||
let end_slot = 3;
|
||||
|
@ -1360,6 +1408,7 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&leader_schedule_cache,
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
|
||||
let bootstrap_validator_id = leader_schedule_cache.slot_leader_at(0, None).unwrap();
|
||||
|
@ -1422,6 +1471,7 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
|
||||
// Test that with no next leader slot, we don't reach the leader slot
|
||||
|
@ -1551,6 +1601,7 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
|
||||
// Test that with no leader slot, we don't reach the leader tick
|
||||
|
@ -1619,6 +1670,7 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
//create a new bank
|
||||
let bank = Arc::new(Bank::new_from_parent(&bank, &Pubkey::default(), 2));
|
||||
|
|
|
@ -1,11 +1,12 @@
|
|||
//! The `poh_service` module implements a service that records the passing of
|
||||
//! "ticks", a measure of time in the PoH stream
|
||||
use crate::poh_recorder::{PohRecorder, Record};
|
||||
use crossbeam_channel::Receiver;
|
||||
use solana_ledger::poh::Poh;
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_sdk::poh_config::PohConfig;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{mpsc::Receiver, Arc, Mutex};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread::{self, sleep, Builder, JoinHandle};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
|
@ -381,6 +382,8 @@ mod tests {
|
|||
target_tick_duration,
|
||||
target_tick_count: None,
|
||||
});
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new(
|
||||
bank.tick_height(),
|
||||
prev_hash,
|
||||
|
@ -391,9 +394,9 @@ mod tests {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
|
||||
&poh_config,
|
||||
exit.clone(),
|
||||
);
|
||||
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let start = Arc::new(Instant::now());
|
||||
let working_bank = WorkingBank {
|
||||
bank: bank.clone(),
|
||||
|
|
|
@ -331,7 +331,7 @@ mod test {
|
|||
system_program, system_transaction,
|
||||
timing::timestamp,
|
||||
};
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::{atomic::AtomicBool, mpsc::channel};
|
||||
|
||||
#[test]
|
||||
fn service_exit() {
|
||||
|
@ -811,6 +811,7 @@ mod test {
|
|||
&Arc::new(blockstore),
|
||||
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
|
||||
&Arc::new(PohConfig::default()),
|
||||
Arc::new(AtomicBool::default()),
|
||||
);
|
||||
|
||||
let node_keypair = Arc::new(Keypair::new());
|
||||
|
|
|
@ -510,6 +510,7 @@ impl Validator {
|
|||
blockstore.new_shreds_signals.first().cloned(),
|
||||
&leader_schedule_cache,
|
||||
&poh_config,
|
||||
exit.clone(),
|
||||
);
|
||||
if config.snapshot_config.is_some() {
|
||||
poh_recorder.set_bank(&bank);
|
||||
|
|
Loading…
Reference in New Issue