Use PohRecorder as the Poh synchronization point. (#2926)
Cleanup poh_recorder and poh_service. * ticks are sent only if poh.tick_height > WorkingBank::min_tick_height and <= WorkingBank::max_tick_height * entries are recorded only if poh.tick_height >= WorkingBank::min_tick_height and < WorkingBank::max_tick_height
This commit is contained in:
parent
ba7d121724
commit
c65046e1a2
|
@ -19,7 +19,7 @@ use solana_sdk::hash::Hash;
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use solana_sdk::timing::{self, duration_as_us, MAX_ENTRY_IDS};
|
use solana_sdk::timing::{self, duration_as_us, MAX_ENTRY_IDS};
|
||||||
use solana_sdk::transaction::Transaction;
|
use solana_sdk::transaction::Transaction;
|
||||||
use std::sync::atomic::AtomicBool;
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
|
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::thread::{self, Builder, JoinHandle};
|
use std::thread::{self, Builder, JoinHandle};
|
||||||
|
@ -37,6 +37,7 @@ pub struct BankingStage {
|
||||||
bank_thread_hdls: Vec<JoinHandle<UnprocessedPackets>>,
|
bank_thread_hdls: Vec<JoinHandle<UnprocessedPackets>>,
|
||||||
poh_service: PohService,
|
poh_service: PohService,
|
||||||
leader_confirmation_service: LeaderConfirmationService,
|
leader_confirmation_service: LeaderConfirmationService,
|
||||||
|
poh_exit: Arc<AtomicBool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BankingStage {
|
impl BankingStage {
|
||||||
|
@ -59,19 +60,18 @@ impl BankingStage {
|
||||||
max_tick_height,
|
max_tick_height,
|
||||||
};
|
};
|
||||||
|
|
||||||
let poh_recorder = PohRecorder::new(bank.tick_height(), *last_entry_id);
|
let poh_recorder = Arc::new(Mutex::new(PohRecorder::new(
|
||||||
|
bank.tick_height(),
|
||||||
|
*last_entry_id,
|
||||||
|
)));
|
||||||
|
|
||||||
// Single thread to generate entries from many banks.
|
// Single thread to generate entries from many banks.
|
||||||
// This thread talks to poh_service and broadcasts the entries once they have been recorded.
|
// This thread talks to poh_service and broadcasts the entries once they have been recorded.
|
||||||
// Once an entry has been recorded, its last_id is registered with the bank.
|
// Once an entry has been recorded, its last_id is registered with the bank.
|
||||||
let poh_exit = Arc::new(AtomicBool::new(false));
|
let poh_exit = Arc::new(AtomicBool::new(false));
|
||||||
|
|
||||||
let (poh_service, leader_sender) =
|
poh_recorder.lock().unwrap().set_working_bank(working_bank);
|
||||||
PohService::new(poh_recorder.clone(), config, poh_exit.clone());
|
let poh_service = PohService::new(poh_recorder.clone(), config, poh_exit.clone());
|
||||||
|
|
||||||
leader_sender
|
|
||||||
.send(working_bank.clone())
|
|
||||||
.expect("failed to send leader to poh_service");
|
|
||||||
|
|
||||||
// Single thread to compute confirmation
|
// Single thread to compute confirmation
|
||||||
let leader_confirmation_service =
|
let leader_confirmation_service =
|
||||||
|
@ -83,7 +83,6 @@ impl BankingStage {
|
||||||
let thread_bank = bank.clone();
|
let thread_bank = bank.clone();
|
||||||
let thread_verified_receiver = shared_verified_receiver.clone();
|
let thread_verified_receiver = shared_verified_receiver.clone();
|
||||||
let thread_poh_recorder = poh_recorder.clone();
|
let thread_poh_recorder = poh_recorder.clone();
|
||||||
let thread_leader = working_bank.clone();
|
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.name("solana-banking-stage-tx".to_string())
|
.name("solana-banking-stage-tx".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
|
@ -93,7 +92,6 @@ impl BankingStage {
|
||||||
&thread_bank,
|
&thread_bank,
|
||||||
&thread_verified_receiver,
|
&thread_verified_receiver,
|
||||||
&thread_poh_recorder,
|
&thread_poh_recorder,
|
||||||
&thread_leader,
|
|
||||||
) {
|
) {
|
||||||
Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (),
|
Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (),
|
||||||
Ok(more_unprocessed_packets) => {
|
Ok(more_unprocessed_packets) => {
|
||||||
|
@ -115,6 +113,7 @@ impl BankingStage {
|
||||||
bank_thread_hdls,
|
bank_thread_hdls,
|
||||||
poh_service,
|
poh_service,
|
||||||
leader_confirmation_service,
|
leader_confirmation_service,
|
||||||
|
poh_exit,
|
||||||
},
|
},
|
||||||
entry_receiver,
|
entry_receiver,
|
||||||
)
|
)
|
||||||
|
@ -135,8 +134,7 @@ impl BankingStage {
|
||||||
fn record_transactions(
|
fn record_transactions(
|
||||||
txs: &[Transaction],
|
txs: &[Transaction],
|
||||||
results: &[bank::Result<()>],
|
results: &[bank::Result<()>],
|
||||||
poh: &PohRecorder,
|
poh: &Arc<Mutex<PohRecorder>>,
|
||||||
working_bank: &WorkingBank,
|
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let processed_transactions: Vec<_> = results
|
let processed_transactions: Vec<_> = results
|
||||||
.iter()
|
.iter()
|
||||||
|
@ -158,7 +156,7 @@ impl BankingStage {
|
||||||
if !processed_transactions.is_empty() {
|
if !processed_transactions.is_empty() {
|
||||||
let hash = Transaction::hash(&processed_transactions);
|
let hash = Transaction::hash(&processed_transactions);
|
||||||
// record and unlock will unlock all the successfull transactions
|
// record and unlock will unlock all the successfull transactions
|
||||||
poh.record(hash, processed_transactions, working_bank)?;
|
poh.lock().unwrap().record(hash, processed_transactions)?;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -166,8 +164,7 @@ impl BankingStage {
|
||||||
pub fn process_and_record_transactions(
|
pub fn process_and_record_transactions(
|
||||||
bank: &Bank,
|
bank: &Bank,
|
||||||
txs: &[Transaction],
|
txs: &[Transaction],
|
||||||
poh: &PohRecorder,
|
poh: &Arc<Mutex<PohRecorder>>,
|
||||||
working_bank: &WorkingBank,
|
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
// Once accounts are locked, other threads cannot encode transactions that will modify the
|
// Once accounts are locked, other threads cannot encode transactions that will modify the
|
||||||
|
@ -186,7 +183,7 @@ impl BankingStage {
|
||||||
|
|
||||||
let record_time = {
|
let record_time = {
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
Self::record_transactions(txs, &results, poh, working_bank)?;
|
Self::record_transactions(txs, &results, poh)?;
|
||||||
now.elapsed()
|
now.elapsed()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -219,8 +216,7 @@ impl BankingStage {
|
||||||
fn process_transactions(
|
fn process_transactions(
|
||||||
bank: &Arc<Bank>,
|
bank: &Arc<Bank>,
|
||||||
transactions: &[Transaction],
|
transactions: &[Transaction],
|
||||||
poh: &PohRecorder,
|
poh: &Arc<Mutex<PohRecorder>>,
|
||||||
working_bank: &WorkingBank,
|
|
||||||
) -> Result<(usize)> {
|
) -> Result<(usize)> {
|
||||||
let mut chunk_start = 0;
|
let mut chunk_start = 0;
|
||||||
while chunk_start != transactions.len() {
|
while chunk_start != transactions.len() {
|
||||||
|
@ -230,7 +226,6 @@ impl BankingStage {
|
||||||
bank,
|
bank,
|
||||||
&transactions[chunk_start..chunk_end],
|
&transactions[chunk_start..chunk_end],
|
||||||
poh,
|
poh,
|
||||||
working_bank,
|
|
||||||
);
|
);
|
||||||
if let Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) = result {
|
if let Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) = result {
|
||||||
break;
|
break;
|
||||||
|
@ -245,8 +240,7 @@ impl BankingStage {
|
||||||
pub fn process_packets(
|
pub fn process_packets(
|
||||||
bank: &Arc<Bank>,
|
bank: &Arc<Bank>,
|
||||||
verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>,
|
verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>,
|
||||||
poh: &PohRecorder,
|
poh: &Arc<Mutex<PohRecorder>>,
|
||||||
working_bank: &WorkingBank,
|
|
||||||
) -> Result<UnprocessedPackets> {
|
) -> Result<UnprocessedPackets> {
|
||||||
let recv_start = Instant::now();
|
let recv_start = Instant::now();
|
||||||
let mms = verified_receiver
|
let mms = verified_receiver
|
||||||
|
@ -297,8 +291,7 @@ impl BankingStage {
|
||||||
|
|
||||||
debug!("verified transactions {}", verified_transactions.len());
|
debug!("verified transactions {}", verified_transactions.len());
|
||||||
|
|
||||||
let processed =
|
let processed = Self::process_transactions(bank, &verified_transactions, poh)?;
|
||||||
Self::process_transactions(bank, &verified_transactions, poh, working_bank)?;
|
|
||||||
if processed < verified_transactions.len() {
|
if processed < verified_transactions.len() {
|
||||||
bank_shutdown = true;
|
bank_shutdown = true;
|
||||||
// Collect any unprocessed transactions in this batch for forwarding
|
// Collect any unprocessed transactions in this batch for forwarding
|
||||||
|
@ -348,8 +341,9 @@ impl Service for BankingStage {
|
||||||
for bank_thread_hdl in self.bank_thread_hdls {
|
for bank_thread_hdl in self.bank_thread_hdls {
|
||||||
bank_thread_hdl.join()?;
|
bank_thread_hdl.join()?;
|
||||||
}
|
}
|
||||||
|
self.poh_exit.store(true, Ordering::Relaxed);
|
||||||
self.leader_confirmation_service.join()?;
|
self.leader_confirmation_service.join()?;
|
||||||
let _ = self.poh_service.join()?;
|
self.poh_service.join()?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -610,7 +604,10 @@ mod tests {
|
||||||
max_tick_height: std::u64::MAX,
|
max_tick_height: std::u64::MAX,
|
||||||
};
|
};
|
||||||
|
|
||||||
let poh_recorder = PohRecorder::new(bank.tick_height(), bank.last_id());
|
let poh_recorder = Arc::new(Mutex::new(PohRecorder::new(
|
||||||
|
bank.tick_height(),
|
||||||
|
bank.last_id(),
|
||||||
|
)));
|
||||||
let pubkey = Keypair::new().pubkey();
|
let pubkey = Keypair::new().pubkey();
|
||||||
|
|
||||||
let transactions = vec![
|
let transactions = vec![
|
||||||
|
@ -619,8 +616,8 @@ mod tests {
|
||||||
];
|
];
|
||||||
|
|
||||||
let mut results = vec![Ok(()), Ok(())];
|
let mut results = vec![Ok(()), Ok(())];
|
||||||
BankingStage::record_transactions(&transactions, &results, &poh_recorder, &working_bank)
|
poh_recorder.lock().unwrap().set_working_bank(working_bank);
|
||||||
.unwrap();
|
BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap();
|
||||||
let entries = entry_receiver.recv().unwrap();
|
let entries = entry_receiver.recv().unwrap();
|
||||||
assert_eq!(entries[0].transactions.len(), transactions.len());
|
assert_eq!(entries[0].transactions.len(), transactions.len());
|
||||||
|
|
||||||
|
@ -629,15 +626,13 @@ mod tests {
|
||||||
1,
|
1,
|
||||||
ProgramError::ResultWithNegativeTokens,
|
ProgramError::ResultWithNegativeTokens,
|
||||||
));
|
));
|
||||||
BankingStage::record_transactions(&transactions, &results, &poh_recorder, &working_bank)
|
BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap();
|
||||||
.unwrap();
|
|
||||||
let entries = entry_receiver.recv().unwrap();
|
let entries = entry_receiver.recv().unwrap();
|
||||||
assert_eq!(entries[0].transactions.len(), transactions.len());
|
assert_eq!(entries[0].transactions.len(), transactions.len());
|
||||||
|
|
||||||
// Other BankErrors should not be recorded
|
// Other BankErrors should not be recorded
|
||||||
results[0] = Err(BankError::AccountNotFound);
|
results[0] = Err(BankError::AccountNotFound);
|
||||||
BankingStage::record_transactions(&transactions, &results, &poh_recorder, &working_bank)
|
BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap();
|
||||||
.unwrap();
|
|
||||||
let entries = entry_receiver.recv().unwrap();
|
let entries = entry_receiver.recv().unwrap();
|
||||||
assert_eq!(entries[0].transactions.len(), transactions.len() - 1);
|
assert_eq!(entries[0].transactions.len(), transactions.len() - 1);
|
||||||
}
|
}
|
||||||
|
@ -663,16 +658,14 @@ mod tests {
|
||||||
min_tick_height: bank.tick_height(),
|
min_tick_height: bank.tick_height(),
|
||||||
max_tick_height: bank.tick_height() + 1,
|
max_tick_height: bank.tick_height() + 1,
|
||||||
};
|
};
|
||||||
let poh_recorder = PohRecorder::new(bank.tick_height(), bank.last_id());
|
let poh_recorder = Arc::new(Mutex::new(PohRecorder::new(
|
||||||
|
bank.tick_height(),
|
||||||
|
bank.last_id(),
|
||||||
|
)));
|
||||||
|
poh_recorder.lock().unwrap().set_working_bank(working_bank);
|
||||||
|
|
||||||
BankingStage::process_and_record_transactions(
|
BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder).unwrap();
|
||||||
&bank,
|
poh_recorder.lock().unwrap().tick();
|
||||||
&transactions,
|
|
||||||
&poh_recorder,
|
|
||||||
&working_bank,
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
poh_recorder.tick(&working_bank).unwrap();
|
|
||||||
|
|
||||||
let mut need_tick = true;
|
let mut need_tick = true;
|
||||||
// read entries until I find mine, might be ticks...
|
// read entries until I find mine, might be ticks...
|
||||||
|
@ -697,12 +690,7 @@ mod tests {
|
||||||
)];
|
)];
|
||||||
|
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
BankingStage::process_and_record_transactions(
|
BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder,),
|
||||||
&bank,
|
|
||||||
&transactions,
|
|
||||||
&poh_recorder,
|
|
||||||
&working_bank
|
|
||||||
),
|
|
||||||
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
|
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
use solana_sdk::hash::{hash, hashv, Hash};
|
use solana_sdk::hash::{hash, hashv, Hash};
|
||||||
|
|
||||||
pub struct Poh {
|
pub struct Poh {
|
||||||
id: Hash,
|
pub id: Hash,
|
||||||
num_hashes: u64,
|
num_hashes: u64,
|
||||||
pub tick_height: u64,
|
pub tick_height: u64,
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,15 @@
|
||||||
//! The `poh_recorder` module provides an object for synchronizing with Proof of History.
|
//! The `poh_recorder` module provides an object for synchronizing with Proof of History.
|
||||||
//! It synchronizes PoH, bank's register_tick and the ledger
|
//! It synchronizes PoH, bank's register_tick and the ledger
|
||||||
//!
|
//!
|
||||||
|
//! PohRecorder will send ticks or entries to a WorkingBank, if the current range of ticks is
|
||||||
|
//! within the specified WorkingBank range.
|
||||||
|
//!
|
||||||
|
//! For Ticks:
|
||||||
|
//! * tick must be > WorkingBank::min_tick_height && tick must be <= WorkingBank::man_tick_height
|
||||||
|
//!
|
||||||
|
//! For Entries:
|
||||||
|
//! * recorded entry must be >= WorkingBank::min_tick_height && entry must be < WorkingBank::man_tick_height
|
||||||
|
//!
|
||||||
use crate::entry::Entry;
|
use crate::entry::Entry;
|
||||||
use crate::poh::Poh;
|
use crate::poh::Poh;
|
||||||
use crate::result::{Error, Result};
|
use crate::result::{Error, Result};
|
||||||
|
@ -8,7 +17,7 @@ use solana_runtime::bank::Bank;
|
||||||
use solana_sdk::hash::Hash;
|
use solana_sdk::hash::Hash;
|
||||||
use solana_sdk::transaction::Transaction;
|
use solana_sdk::transaction::Transaction;
|
||||||
use std::sync::mpsc::Sender;
|
use std::sync::mpsc::Sender;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::Arc;
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||||
pub enum PohRecorderError {
|
pub enum PohRecorderError {
|
||||||
|
@ -25,118 +34,153 @@ pub struct WorkingBank {
|
||||||
pub max_tick_height: u64,
|
pub max_tick_height: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct PohRecorder {
|
pub struct PohRecorder {
|
||||||
poh: Arc<Mutex<Poh>>,
|
poh: Poh,
|
||||||
tick_cache: Arc<Mutex<Vec<Entry>>>,
|
tick_cache: Vec<(Entry, u64)>,
|
||||||
|
working_bank: Option<WorkingBank>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PohRecorder {
|
impl PohRecorder {
|
||||||
pub fn hash(&self) {
|
pub fn clear_bank(&mut self) {
|
||||||
// TODO: amortize the cost of this lock by doing the loop in here for
|
self.working_bank = None;
|
||||||
// some min amount of hashes
|
|
||||||
let mut poh = self.poh.lock().unwrap();
|
|
||||||
|
|
||||||
poh.hash();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn flush_cache(&self, working_bank: &WorkingBank) -> Result<()> {
|
pub fn hash(&mut self) {
|
||||||
|
// TODO: amortize the cost of this lock by doing the loop in here for
|
||||||
|
// some min amount of hashes
|
||||||
|
self.poh.hash();
|
||||||
|
}
|
||||||
|
|
||||||
|
// synchronize PoH with a bank
|
||||||
|
pub fn reset(&mut self, tick_height: u64, last_id: Hash) {
|
||||||
let mut cache = vec![];
|
let mut cache = vec![];
|
||||||
std::mem::swap(&mut cache, &mut self.tick_cache.lock().unwrap());
|
info!(
|
||||||
if !cache.is_empty() {
|
"reset poh from: {},{} to: {},{}",
|
||||||
|
self.poh.id, self.poh.tick_height, last_id, tick_height,
|
||||||
|
);
|
||||||
|
std::mem::swap(&mut cache, &mut self.tick_cache);
|
||||||
|
self.poh = Poh::new(last_id, tick_height);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_working_bank(&mut self, working_bank: WorkingBank) {
|
||||||
|
self.working_bank = Some(working_bank);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flush cache will delay flushing the cache for a bank until it past the WorkingBank::min_tick_height
|
||||||
|
// On a record flush will flush the cache at the WorkingBank::min_tick_height, since a record
|
||||||
|
// occurs after the min_tick_height was generated
|
||||||
|
fn flush_cache(&mut self, tick: bool) -> Result<()> {
|
||||||
|
// check_tick_height is called before flush cache, so it cannot overrun the bank
|
||||||
|
// so a bank that is so late that it's slot fully generated before it starts recording
|
||||||
|
// will fail instead of broadcasting any ticks
|
||||||
|
let working_bank = self
|
||||||
|
.working_bank
|
||||||
|
.as_ref()
|
||||||
|
.ok_or(Error::PohRecorderError(PohRecorderError::MaxHeightReached))?;
|
||||||
|
if self.poh.tick_height < working_bank.min_tick_height {
|
||||||
|
return Err(Error::PohRecorderError(
|
||||||
|
PohRecorderError::MinHeightNotReached,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if tick && self.poh.tick_height == working_bank.min_tick_height {
|
||||||
|
return Err(Error::PohRecorderError(
|
||||||
|
PohRecorderError::MinHeightNotReached,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let cnt = self
|
||||||
|
.tick_cache
|
||||||
|
.iter()
|
||||||
|
.take_while(|x| x.1 <= working_bank.max_tick_height)
|
||||||
|
.count();
|
||||||
|
let e = if cnt > 0 {
|
||||||
|
trace!(
|
||||||
|
"flush_cache: {} {} sending: {}",
|
||||||
|
working_bank.bank.tick_height(),
|
||||||
|
working_bank.max_tick_height,
|
||||||
|
cnt,
|
||||||
|
);
|
||||||
|
let cache: Vec<Entry> = self.tick_cache[..cnt].iter().map(|x| x.0.clone()).collect();
|
||||||
for t in &cache {
|
for t in &cache {
|
||||||
working_bank.bank.register_tick(&t.id);
|
working_bank.bank.register_tick(&t.id);
|
||||||
}
|
}
|
||||||
working_bank.sender.send(cache)?;
|
working_bank.sender.send(cache)
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
};
|
||||||
|
if self.poh.tick_height >= working_bank.max_tick_height {
|
||||||
|
info!("poh_record: max_tick_height reached, setting working bank to None");
|
||||||
|
self.working_bank = None;
|
||||||
}
|
}
|
||||||
|
if e.is_err() {
|
||||||
|
info!("WorkingBank::sender disconnected {:?}", e);
|
||||||
|
//revert the cache, but clear the working bank
|
||||||
|
self.working_bank = None;
|
||||||
|
} else {
|
||||||
|
//commit the flush
|
||||||
|
let _ = self.tick_cache.drain(..cnt);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn tick(&self, working_bank: &WorkingBank) -> Result<()> {
|
pub fn tick(&mut self) {
|
||||||
// Register and send the entry out while holding the lock if the max PoH height
|
// Register and send the entry out while holding the lock if the max PoH height
|
||||||
// hasn't been reached.
|
// hasn't been reached.
|
||||||
// This guarantees PoH order and Entry production and banks LastId queue is the same
|
// This guarantees PoH order and Entry production and banks LastId queue is the same
|
||||||
let mut poh = self.poh.lock().unwrap();
|
let tick = self.generate_tick();
|
||||||
|
trace!("tick {}", tick.1);
|
||||||
Self::check_tick_height(&poh, working_bank).map_err(|e| {
|
self.tick_cache.push(tick);
|
||||||
let tick = Self::generate_tick(&mut poh);
|
let _ = self.flush_cache(true);
|
||||||
self.tick_cache.lock().unwrap().push(tick);
|
|
||||||
e
|
|
||||||
})?;
|
|
||||||
;
|
|
||||||
self.flush_cache(working_bank)?;
|
|
||||||
|
|
||||||
Self::register_and_send_tick(&mut *poh, working_bank)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn record(
|
pub fn record(&mut self, mixin: Hash, txs: Vec<Transaction>) -> Result<()> {
|
||||||
&self,
|
|
||||||
mixin: Hash,
|
|
||||||
txs: Vec<Transaction>,
|
|
||||||
working_bank: &WorkingBank,
|
|
||||||
) -> Result<()> {
|
|
||||||
// Register and send the entry out while holding the lock.
|
// Register and send the entry out while holding the lock.
|
||||||
// This guarantees PoH order and Entry production and banks LastId queue is the same.
|
// This guarantees PoH order and Entry production and banks LastId queue is the same.
|
||||||
let mut poh = self.poh.lock().unwrap();
|
self.flush_cache(false)?;
|
||||||
|
self.record_and_send_txs(mixin, txs)
|
||||||
Self::check_tick_height(&poh, working_bank)?;
|
|
||||||
self.flush_cache(working_bank)?;
|
|
||||||
|
|
||||||
Self::record_and_send_txs(&mut *poh, mixin, txs, working_bank)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A recorder to synchronize PoH with the following data structures
|
/// A recorder to synchronize PoH with the following data structures
|
||||||
/// * bank - the LastId's queue is updated on `tick` and `record` events
|
/// * bank - the LastId's queue is updated on `tick` and `record` events
|
||||||
/// * sender - the Entry channel that outputs to the ledger
|
/// * sender - the Entry channel that outputs to the ledger
|
||||||
pub fn new(tick_height: u64, last_entry_id: Hash) -> Self {
|
pub fn new(tick_height: u64, last_entry_id: Hash) -> Self {
|
||||||
let poh = Arc::new(Mutex::new(Poh::new(last_entry_id, tick_height)));
|
let poh = Poh::new(last_entry_id, tick_height);
|
||||||
let tick_cache = Arc::new(Mutex::new(vec![]));
|
PohRecorder {
|
||||||
PohRecorder { poh, tick_cache }
|
poh,
|
||||||
}
|
tick_cache: vec![],
|
||||||
|
working_bank: None,
|
||||||
fn check_tick_height(poh: &Poh, working_bank: &WorkingBank) -> Result<()> {
|
|
||||||
if poh.tick_height < working_bank.min_tick_height {
|
|
||||||
Err(Error::PohRecorderError(
|
|
||||||
PohRecorderError::MinHeightNotReached,
|
|
||||||
))
|
|
||||||
} else if poh.tick_height >= working_bank.max_tick_height {
|
|
||||||
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
|
|
||||||
} else {
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn record_and_send_txs(
|
fn record_and_send_txs(&mut self, mixin: Hash, txs: Vec<Transaction>) -> Result<()> {
|
||||||
poh: &mut Poh,
|
let working_bank = self
|
||||||
mixin: Hash,
|
.working_bank
|
||||||
txs: Vec<Transaction>,
|
.as_ref()
|
||||||
working_bank: &WorkingBank,
|
.ok_or(Error::PohRecorderError(PohRecorderError::MaxHeightReached))?;
|
||||||
) -> Result<()> {
|
let entry = self.poh.record(mixin);
|
||||||
let entry = poh.record(mixin);
|
|
||||||
assert!(!txs.is_empty(), "Entries without transactions are used to track real-time passing in the ledger and can only be generated with PohRecorder::tick function");
|
assert!(!txs.is_empty(), "Entries without transactions are used to track real-time passing in the ledger and can only be generated with PohRecorder::tick function");
|
||||||
let entry = Entry {
|
let entry = Entry {
|
||||||
num_hashes: entry.num_hashes,
|
num_hashes: entry.num_hashes,
|
||||||
id: entry.id,
|
id: entry.id,
|
||||||
transactions: txs,
|
transactions: txs,
|
||||||
};
|
};
|
||||||
|
trace!("sending entry {}", entry.is_tick());
|
||||||
working_bank.sender.send(vec![entry])?;
|
working_bank.sender.send(vec![entry])?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn generate_tick(poh: &mut Poh) -> Entry {
|
fn generate_tick(&mut self) -> (Entry, u64) {
|
||||||
let tick = poh.tick();
|
let tick = self.poh.tick();
|
||||||
|
assert_ne!(tick.tick_height, 0);
|
||||||
|
(
|
||||||
Entry {
|
Entry {
|
||||||
num_hashes: tick.num_hashes,
|
num_hashes: tick.num_hashes,
|
||||||
id: tick.id,
|
id: tick.id,
|
||||||
transactions: vec![],
|
transactions: vec![],
|
||||||
}
|
},
|
||||||
}
|
tick.tick_height,
|
||||||
|
)
|
||||||
fn register_and_send_tick(poh: &mut Poh, working_bank: &WorkingBank) -> Result<()> {
|
|
||||||
let tick = Self::generate_tick(poh);
|
|
||||||
working_bank.bank.register_tick(&tick.id);
|
|
||||||
working_bank.sender.send(vec![tick])?;
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,51 +194,145 @@ mod tests {
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_poh_recorder() {
|
fn test_poh_recorder_no_zero_tick() {
|
||||||
|
let prev_id = Hash::default();
|
||||||
|
let mut poh_recorder = PohRecorder::new(0, prev_id);
|
||||||
|
poh_recorder.tick();
|
||||||
|
assert_eq!(poh_recorder.tick_cache.len(), 1);
|
||||||
|
assert_eq!(poh_recorder.tick_cache[0].1, 1);
|
||||||
|
assert_eq!(poh_recorder.poh.tick_height, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_poh_recorder_tick_height_is_last_tick() {
|
||||||
|
let prev_id = Hash::default();
|
||||||
|
let mut poh_recorder = PohRecorder::new(0, prev_id);
|
||||||
|
poh_recorder.tick();
|
||||||
|
poh_recorder.tick();
|
||||||
|
assert_eq!(poh_recorder.tick_cache.len(), 2);
|
||||||
|
assert_eq!(poh_recorder.tick_cache[1].1, 2);
|
||||||
|
assert_eq!(poh_recorder.poh.tick_height, 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_poh_recorder_reset_clears_cache() {
|
||||||
|
let mut poh_recorder = PohRecorder::new(0, Hash::default());
|
||||||
|
poh_recorder.tick();
|
||||||
|
assert_eq!(poh_recorder.tick_cache.len(), 1);
|
||||||
|
poh_recorder.reset(0, Hash::default());
|
||||||
|
assert_eq!(poh_recorder.tick_cache.len(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_poh_recorder_clear() {
|
||||||
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
|
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
|
||||||
let bank = Arc::new(Bank::new(&genesis_block));
|
let bank = Arc::new(Bank::new(&genesis_block));
|
||||||
let prev_id = bank.last_id();
|
let prev_id = bank.last_id();
|
||||||
let (entry_sender, entry_receiver) = channel();
|
let (entry_sender, _) = channel();
|
||||||
let poh_recorder = PohRecorder::new(0, prev_id);
|
let mut poh_recorder = PohRecorder::new(0, prev_id);
|
||||||
|
|
||||||
let working_bank = WorkingBank {
|
let working_bank = WorkingBank {
|
||||||
bank,
|
bank,
|
||||||
sender: entry_sender,
|
sender: entry_sender,
|
||||||
min_tick_height: 0,
|
min_tick_height: 2,
|
||||||
max_tick_height: 2,
|
max_tick_height: 3,
|
||||||
};
|
};
|
||||||
|
poh_recorder.set_working_bank(working_bank);
|
||||||
|
assert!(poh_recorder.working_bank.is_some());
|
||||||
|
poh_recorder.clear_bank();
|
||||||
|
assert!(poh_recorder.working_bank.is_none());
|
||||||
|
}
|
||||||
|
|
||||||
//send some data
|
#[test]
|
||||||
let h1 = hash(b"hello world!");
|
fn test_poh_recorder_tick_sent_after_min() {
|
||||||
|
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
|
||||||
|
let bank = Arc::new(Bank::new(&genesis_block));
|
||||||
|
let prev_id = bank.last_id();
|
||||||
|
let (entry_sender, entry_receiver) = channel();
|
||||||
|
let mut poh_recorder = PohRecorder::new(0, prev_id);
|
||||||
|
|
||||||
|
let working_bank = WorkingBank {
|
||||||
|
bank,
|
||||||
|
sender: entry_sender,
|
||||||
|
min_tick_height: 2,
|
||||||
|
max_tick_height: 3,
|
||||||
|
};
|
||||||
|
poh_recorder.set_working_bank(working_bank);
|
||||||
|
poh_recorder.tick();
|
||||||
|
poh_recorder.tick();
|
||||||
|
//tick height equal to min_tick_height
|
||||||
|
//no tick has been sent
|
||||||
|
assert_eq!(poh_recorder.tick_cache.last().unwrap().1, 2);
|
||||||
|
assert!(entry_receiver.try_recv().is_err());
|
||||||
|
|
||||||
|
// all ticks are sent after height > min
|
||||||
|
poh_recorder.tick();
|
||||||
|
assert_eq!(poh_recorder.poh.tick_height, 3);
|
||||||
|
assert_eq!(poh_recorder.tick_cache.len(), 0);
|
||||||
|
let e = entry_receiver.recv().expect("recv 1");
|
||||||
|
assert_eq!(e.len(), 3);
|
||||||
|
assert!(poh_recorder.working_bank.is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_poh_recorder_tick_sent_upto_and_including_max() {
|
||||||
|
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
|
||||||
|
let bank = Arc::new(Bank::new(&genesis_block));
|
||||||
|
let prev_id = bank.last_id();
|
||||||
|
let (entry_sender, entry_receiver) = channel();
|
||||||
|
let mut poh_recorder = PohRecorder::new(0, prev_id);
|
||||||
|
|
||||||
|
poh_recorder.tick();
|
||||||
|
poh_recorder.tick();
|
||||||
|
poh_recorder.tick();
|
||||||
|
poh_recorder.tick();
|
||||||
|
assert_eq!(poh_recorder.tick_cache.last().unwrap().1, 4);
|
||||||
|
assert_eq!(poh_recorder.poh.tick_height, 4);
|
||||||
|
|
||||||
|
let working_bank = WorkingBank {
|
||||||
|
bank,
|
||||||
|
sender: entry_sender,
|
||||||
|
min_tick_height: 2,
|
||||||
|
max_tick_height: 3,
|
||||||
|
};
|
||||||
|
poh_recorder.set_working_bank(working_bank);
|
||||||
|
poh_recorder.tick();
|
||||||
|
|
||||||
|
assert_eq!(poh_recorder.poh.tick_height, 5);
|
||||||
|
assert!(poh_recorder.working_bank.is_none());
|
||||||
|
let e = entry_receiver.recv().expect("recv 1");
|
||||||
|
assert_eq!(e.len(), 3);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_poh_recorder_record_to_early() {
|
||||||
|
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
|
||||||
|
let bank = Arc::new(Bank::new(&genesis_block));
|
||||||
|
let prev_id = bank.last_id();
|
||||||
|
let (entry_sender, entry_receiver) = channel();
|
||||||
|
let mut poh_recorder = PohRecorder::new(0, prev_id);
|
||||||
|
|
||||||
|
let working_bank = WorkingBank {
|
||||||
|
bank,
|
||||||
|
sender: entry_sender,
|
||||||
|
min_tick_height: 2,
|
||||||
|
max_tick_height: 3,
|
||||||
|
};
|
||||||
|
poh_recorder.set_working_bank(working_bank);
|
||||||
|
poh_recorder.tick();
|
||||||
let tx = test_tx();
|
let tx = test_tx();
|
||||||
poh_recorder
|
let h1 = hash(b"hello world!");
|
||||||
.record(h1, vec![tx.clone()], &working_bank)
|
assert!(poh_recorder.record(h1, vec![tx.clone()]).is_err());
|
||||||
.unwrap();
|
assert!(entry_receiver.try_recv().is_err());
|
||||||
//get some events
|
|
||||||
let _e = entry_receiver.recv().unwrap();
|
|
||||||
|
|
||||||
poh_recorder.tick(&working_bank).unwrap();
|
|
||||||
let _e = entry_receiver.recv().unwrap();
|
|
||||||
|
|
||||||
poh_recorder.tick(&working_bank).unwrap();
|
|
||||||
let _e = entry_receiver.recv().unwrap();
|
|
||||||
|
|
||||||
// max tick height reached
|
|
||||||
assert!(poh_recorder.tick(&working_bank).is_err());
|
|
||||||
assert!(poh_recorder.record(h1, vec![tx], &working_bank).is_err());
|
|
||||||
|
|
||||||
//make sure it handles channel close correctly
|
|
||||||
drop(entry_receiver);
|
|
||||||
assert!(poh_recorder.tick(&working_bank).is_err());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_poh_recorder_tick_cache() {
|
fn test_poh_recorder_record_at_min_passes() {
|
||||||
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
|
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
|
||||||
let bank = Arc::new(Bank::new(&genesis_block));
|
let bank = Arc::new(Bank::new(&genesis_block));
|
||||||
let prev_id = bank.last_id();
|
let prev_id = bank.last_id();
|
||||||
let (entry_sender, entry_receiver) = channel();
|
let (entry_sender, entry_receiver) = channel();
|
||||||
let poh_recorder = PohRecorder::new(0, prev_id);
|
let mut poh_recorder = PohRecorder::new(0, prev_id);
|
||||||
|
|
||||||
let working_bank = WorkingBank {
|
let working_bank = WorkingBank {
|
||||||
bank,
|
bank,
|
||||||
|
@ -202,50 +340,72 @@ mod tests {
|
||||||
min_tick_height: 1,
|
min_tick_height: 1,
|
||||||
max_tick_height: 2,
|
max_tick_height: 2,
|
||||||
};
|
};
|
||||||
|
poh_recorder.set_working_bank(working_bank);
|
||||||
|
poh_recorder.tick();
|
||||||
|
assert_eq!(poh_recorder.tick_cache.len(), 1);
|
||||||
|
assert_eq!(poh_recorder.poh.tick_height, 1);
|
||||||
|
let tx = test_tx();
|
||||||
|
let h1 = hash(b"hello world!");
|
||||||
|
assert!(poh_recorder.record(h1, vec![tx.clone()]).is_ok());
|
||||||
|
assert_eq!(poh_recorder.tick_cache.len(), 0);
|
||||||
|
|
||||||
// tick should be cached
|
//tick in the cache + entry
|
||||||
assert!(poh_recorder.tick(&working_bank).is_err());
|
let e = entry_receiver.recv().expect("recv 1");
|
||||||
assert!(entry_receiver.try_recv().is_err());
|
assert_eq!(e.len(), 1);
|
||||||
|
assert!(e[0].is_tick());
|
||||||
// working_bank should be at the right height
|
let e = entry_receiver.recv().expect("recv 2");
|
||||||
poh_recorder.tick(&working_bank).unwrap();
|
assert!(!e[0].is_tick());
|
||||||
|
|
||||||
let entries = entry_receiver.recv().unwrap();
|
|
||||||
assert_eq!(entries.len(), 1);
|
|
||||||
let entries = entry_receiver.recv().unwrap();
|
|
||||||
assert_eq!(entries.len(), 1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_poh_recorder_tick_cache_old_working_bank() {
|
fn test_poh_recorder_record_at_max_fails() {
|
||||||
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
|
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
|
||||||
let bank = Arc::new(Bank::new(&genesis_block));
|
let bank = Arc::new(Bank::new(&genesis_block));
|
||||||
let prev_id = bank.last_id();
|
let prev_id = bank.last_id();
|
||||||
let (entry_sender, entry_receiver) = channel();
|
let (entry_sender, entry_receiver) = channel();
|
||||||
let poh_recorder = PohRecorder::new(0, prev_id);
|
let mut poh_recorder = PohRecorder::new(0, prev_id);
|
||||||
|
|
||||||
let working_bank = WorkingBank {
|
let working_bank = WorkingBank {
|
||||||
bank,
|
bank,
|
||||||
sender: entry_sender,
|
sender: entry_sender,
|
||||||
min_tick_height: 1,
|
min_tick_height: 1,
|
||||||
max_tick_height: 1,
|
max_tick_height: 2,
|
||||||
};
|
};
|
||||||
|
poh_recorder.set_working_bank(working_bank);
|
||||||
|
poh_recorder.tick();
|
||||||
|
poh_recorder.tick();
|
||||||
|
assert_eq!(poh_recorder.poh.tick_height, 2);
|
||||||
|
let tx = test_tx();
|
||||||
|
let h1 = hash(b"hello world!");
|
||||||
|
assert!(poh_recorder.record(h1, vec![tx.clone()]).is_err());
|
||||||
|
|
||||||
// tick should be cached
|
let e = entry_receiver.recv().expect("recv 1");
|
||||||
assert_matches!(
|
assert_eq!(e.len(), 2);
|
||||||
poh_recorder.tick(&working_bank),
|
assert!(e[0].is_tick());
|
||||||
Err(Error::PohRecorderError(
|
assert!(e[1].is_tick());
|
||||||
PohRecorderError::MinHeightNotReached
|
}
|
||||||
))
|
|
||||||
);
|
|
||||||
|
|
||||||
// working_bank should be past MaxHeight
|
#[test]
|
||||||
assert_matches!(
|
fn test_poh_cache_on_disconnect() {
|
||||||
poh_recorder.tick(&working_bank),
|
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
|
||||||
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
|
let bank = Arc::new(Bank::new(&genesis_block));
|
||||||
);
|
let prev_id = bank.last_id();
|
||||||
assert_eq!(poh_recorder.tick_cache.lock().unwrap().len(), 2);
|
let (entry_sender, entry_receiver) = channel();
|
||||||
|
let mut poh_recorder = PohRecorder::new(0, prev_id);
|
||||||
|
|
||||||
assert!(entry_receiver.try_recv().is_err());
|
let working_bank = WorkingBank {
|
||||||
|
bank,
|
||||||
|
sender: entry_sender,
|
||||||
|
min_tick_height: 2,
|
||||||
|
max_tick_height: 3,
|
||||||
|
};
|
||||||
|
poh_recorder.set_working_bank(working_bank);
|
||||||
|
poh_recorder.tick();
|
||||||
|
poh_recorder.tick();
|
||||||
|
assert_eq!(poh_recorder.poh.tick_height, 2);
|
||||||
|
drop(entry_receiver);
|
||||||
|
poh_recorder.tick();
|
||||||
|
assert!(poh_recorder.working_bank.is_none());
|
||||||
|
assert_eq!(poh_recorder.tick_cache.len(), 3);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,13 +1,11 @@
|
||||||
//! The `poh_service` module implements a service that records the passing of
|
//! The `poh_service` module implements a service that records the passing of
|
||||||
//! "ticks", a measure of time in the PoH stream
|
//! "ticks", a measure of time in the PoH stream
|
||||||
|
|
||||||
use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBank};
|
use crate::poh_recorder::PohRecorder;
|
||||||
use crate::result::{Error, Result};
|
|
||||||
use crate::service::Service;
|
use crate::service::Service;
|
||||||
use solana_sdk::timing::NUM_TICKS_PER_SECOND;
|
use solana_sdk::timing::NUM_TICKS_PER_SECOND;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::sync::Arc;
|
|
||||||
use std::thread::{self, sleep, Builder, JoinHandle};
|
use std::thread::{self, sleep, Builder, JoinHandle};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
@ -29,7 +27,7 @@ impl Default for PohServiceConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct PohService {
|
pub struct PohService {
|
||||||
tick_producer: JoinHandle<Result<()>>,
|
tick_producer: JoinHandle<()>,
|
||||||
poh_exit: Arc<AtomicBool>,
|
poh_exit: Arc<AtomicBool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -38,95 +36,64 @@ impl PohService {
|
||||||
self.poh_exit.store(true, Ordering::Relaxed);
|
self.poh_exit.store(true, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn close(self) -> thread::Result<Result<()>> {
|
pub fn close(self) -> thread::Result<()> {
|
||||||
self.exit();
|
self.exit();
|
||||||
self.join()
|
self.join()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new(
|
pub fn new(
|
||||||
poh_recorder: PohRecorder,
|
poh_recorder: Arc<Mutex<PohRecorder>>,
|
||||||
config: PohServiceConfig,
|
config: PohServiceConfig,
|
||||||
poh_exit: Arc<AtomicBool>,
|
poh_exit: Arc<AtomicBool>,
|
||||||
) -> (Self, Sender<WorkingBank>) {
|
) -> Self {
|
||||||
// PohService is a headless producer, so when it exits it should notify the banking stage.
|
// PohService is a headless producer, so when it exits it should notify the banking stage.
|
||||||
// Since channel are not used to talk between these threads an AtomicBool is used as a
|
// Since channel are not used to talk between these threads an AtomicBool is used as a
|
||||||
// signal.
|
// signal.
|
||||||
let poh_exit_ = poh_exit.clone();
|
let poh_exit_ = poh_exit.clone();
|
||||||
let (working_bank_sender, working_bank_receiver) = channel();
|
|
||||||
// Single thread to generate ticks
|
// Single thread to generate ticks
|
||||||
let tick_producer = Builder::new()
|
let tick_producer = Builder::new()
|
||||||
.name("solana-poh-service-tick_producer".to_string())
|
.name("solana-poh-service-tick_producer".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
let mut poh_recorder = poh_recorder;
|
let poh_recorder = poh_recorder;
|
||||||
let working_bank_receiver = working_bank_receiver;
|
Self::tick_producer(&poh_recorder, config, &poh_exit_);
|
||||||
let return_value = Self::tick_producer(
|
|
||||||
&working_bank_receiver,
|
|
||||||
&mut poh_recorder,
|
|
||||||
config,
|
|
||||||
&poh_exit_,
|
|
||||||
);
|
|
||||||
poh_exit_.store(true, Ordering::Relaxed);
|
poh_exit_.store(true, Ordering::Relaxed);
|
||||||
return_value
|
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
(
|
|
||||||
Self {
|
Self {
|
||||||
tick_producer,
|
tick_producer,
|
||||||
poh_exit,
|
poh_exit,
|
||||||
},
|
}
|
||||||
working_bank_sender,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn tick_producer(
|
fn tick_producer(
|
||||||
working_bank_receiver: &Receiver<WorkingBank>,
|
poh: &Arc<Mutex<PohRecorder>>,
|
||||||
poh: &mut PohRecorder,
|
|
||||||
config: PohServiceConfig,
|
config: PohServiceConfig,
|
||||||
poh_exit: &AtomicBool,
|
poh_exit: &AtomicBool,
|
||||||
) -> Result<()> {
|
) {
|
||||||
let mut working_bank = None;
|
|
||||||
loop {
|
loop {
|
||||||
if working_bank.is_none() {
|
|
||||||
let result = working_bank_receiver.try_recv();
|
|
||||||
working_bank = match result {
|
|
||||||
Err(TryRecvError::Empty) => None,
|
|
||||||
_ => Some(result?),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
match config {
|
match config {
|
||||||
PohServiceConfig::Tick(num) => {
|
PohServiceConfig::Tick(num) => {
|
||||||
for _ in 1..num {
|
for _ in 1..num {
|
||||||
poh.hash();
|
poh.lock().unwrap().hash();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
PohServiceConfig::Sleep(duration) => {
|
PohServiceConfig::Sleep(duration) => {
|
||||||
sleep(duration);
|
sleep(duration);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let result = if let Some(ref current_leader) = working_bank {
|
poh.lock().unwrap().tick();
|
||||||
poh.tick(current_leader)
|
|
||||||
} else {
|
|
||||||
Ok(())
|
|
||||||
};
|
|
||||||
match result {
|
|
||||||
Err(Error::PohRecorderError(PohRecorderError::MinHeightNotReached)) => (),
|
|
||||||
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) => {
|
|
||||||
working_bank = None;
|
|
||||||
}
|
|
||||||
e => e?,
|
|
||||||
};
|
|
||||||
if poh_exit.load(Ordering::Relaxed) {
|
if poh_exit.load(Ordering::Relaxed) {
|
||||||
return Ok(());
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Service for PohService {
|
impl Service for PohService {
|
||||||
type JoinReturnType = Result<()>;
|
type JoinReturnType = ();
|
||||||
|
|
||||||
fn join(self) -> thread::Result<Result<()>> {
|
fn join(self) -> thread::Result<()> {
|
||||||
self.tick_producer.join()
|
self.tick_producer.join()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -134,6 +101,8 @@ impl Service for PohService {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::poh_recorder::WorkingBank;
|
||||||
|
use crate::result::Result;
|
||||||
use crate::test_tx::test_tx;
|
use crate::test_tx::test_tx;
|
||||||
use solana_runtime::bank::Bank;
|
use solana_runtime::bank::Bank;
|
||||||
use solana_sdk::genesis_block::GenesisBlock;
|
use solana_sdk::genesis_block::GenesisBlock;
|
||||||
|
@ -147,7 +116,7 @@ mod tests {
|
||||||
let bank = Arc::new(Bank::new(&genesis_block));
|
let bank = Arc::new(Bank::new(&genesis_block));
|
||||||
let prev_id = bank.last_id();
|
let prev_id = bank.last_id();
|
||||||
let (entry_sender, entry_receiver) = channel();
|
let (entry_sender, entry_receiver) = channel();
|
||||||
let poh_recorder = PohRecorder::new(bank.tick_height(), prev_id);
|
let poh_recorder = Arc::new(Mutex::new(PohRecorder::new(bank.tick_height(), prev_id)));
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let working_bank = WorkingBank {
|
let working_bank = WorkingBank {
|
||||||
bank: bank.clone(),
|
bank: bank.clone(),
|
||||||
|
@ -158,7 +127,6 @@ mod tests {
|
||||||
|
|
||||||
let entry_producer: JoinHandle<Result<()>> = {
|
let entry_producer: JoinHandle<Result<()>> = {
|
||||||
let poh_recorder = poh_recorder.clone();
|
let poh_recorder = poh_recorder.clone();
|
||||||
let working_bank = working_bank.clone();
|
|
||||||
let exit = exit.clone();
|
let exit = exit.clone();
|
||||||
|
|
||||||
Builder::new()
|
Builder::new()
|
||||||
|
@ -168,7 +136,7 @@ mod tests {
|
||||||
// send some data
|
// send some data
|
||||||
let h1 = hash(b"hello world!");
|
let h1 = hash(b"hello world!");
|
||||||
let tx = test_tx();
|
let tx = test_tx();
|
||||||
poh_recorder.record(h1, vec![tx], &working_bank).unwrap();
|
poh_recorder.lock().unwrap().record(h1, vec![tx]).unwrap();
|
||||||
|
|
||||||
if exit.load(Ordering::Relaxed) {
|
if exit.load(Ordering::Relaxed) {
|
||||||
break Ok(());
|
break Ok(());
|
||||||
|
@ -179,15 +147,12 @@ mod tests {
|
||||||
};
|
};
|
||||||
|
|
||||||
const HASHES_PER_TICK: u64 = 2;
|
const HASHES_PER_TICK: u64 = 2;
|
||||||
let (poh_service, working_bank_sender) = PohService::new(
|
let poh_service = PohService::new(
|
||||||
poh_recorder.clone(),
|
poh_recorder.clone(),
|
||||||
PohServiceConfig::Tick(HASHES_PER_TICK as usize),
|
PohServiceConfig::Tick(HASHES_PER_TICK as usize),
|
||||||
Arc::new(AtomicBool::new(false)),
|
Arc::new(AtomicBool::new(false)),
|
||||||
);
|
);
|
||||||
|
poh_recorder.lock().unwrap().set_working_bank(working_bank);
|
||||||
working_bank_sender
|
|
||||||
.send(working_bank.clone())
|
|
||||||
.expect("send");
|
|
||||||
|
|
||||||
// get some events
|
// get some events
|
||||||
let mut hashes = 0;
|
let mut hashes = 0;
|
||||||
|
@ -230,7 +195,7 @@ mod tests {
|
||||||
let bank = Arc::new(Bank::new(&genesis_block));
|
let bank = Arc::new(Bank::new(&genesis_block));
|
||||||
let prev_id = bank.last_id();
|
let prev_id = bank.last_id();
|
||||||
let (entry_sender, entry_receiver) = channel();
|
let (entry_sender, entry_receiver) = channel();
|
||||||
let poh_recorder = PohRecorder::new(bank.tick_height(), prev_id);
|
let poh_recorder = Arc::new(Mutex::new(PohRecorder::new(bank.tick_height(), prev_id)));
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let working_bank = WorkingBank {
|
let working_bank = WorkingBank {
|
||||||
bank: bank.clone(),
|
bank: bank.clone(),
|
||||||
|
@ -239,22 +204,20 @@ mod tests {
|
||||||
max_tick_height: bank.tick_height() + 5,
|
max_tick_height: bank.tick_height() + 5,
|
||||||
};
|
};
|
||||||
|
|
||||||
let (poh_service, working_bank_sender) = PohService::new(
|
let poh_service = PohService::new(
|
||||||
poh_recorder.clone(),
|
poh_recorder.clone(),
|
||||||
PohServiceConfig::default(),
|
PohServiceConfig::default(),
|
||||||
Arc::new(AtomicBool::new(false)),
|
Arc::new(AtomicBool::new(false)),
|
||||||
);
|
);
|
||||||
|
|
||||||
working_bank_sender.send(working_bank).expect("send");
|
poh_recorder.lock().unwrap().set_working_bank(working_bank);
|
||||||
|
|
||||||
// all 5 ticks are expected
|
// all 5 ticks are expected, there is no tick 0
|
||||||
// First 3 ticks must be sent all at once, since bank shouldn't see them until
|
// First 4 ticks must be sent all at once, since bank shouldn't see them until
|
||||||
// the bank's min_tick_height(3) is reached.
|
// the after bank's min_tick_height(3) is reached.
|
||||||
let entries = entry_receiver.recv().unwrap();
|
let entries = entry_receiver.recv().expect("recv 1");
|
||||||
assert_eq!(entries.len(), 3);
|
assert_eq!(entries.len(), 4);
|
||||||
let entries = entry_receiver.recv().unwrap();
|
let entries = entry_receiver.recv().expect("recv 2");
|
||||||
assert_eq!(entries.len(), 1);
|
|
||||||
let entries = entry_receiver.recv().unwrap();
|
|
||||||
assert_eq!(entries.len(), 1);
|
assert_eq!(entries.len(), 1);
|
||||||
|
|
||||||
//WorkingBank should be dropped by the PohService thread as well
|
//WorkingBank should be dropped by the PohService thread as well
|
||||||
|
|
Loading…
Reference in New Issue