Create Poh Service (#1604)

* Create new Poh Service, replace tick generation in BankingStage
This commit is contained in:
carllin 2018-10-25 14:56:21 -07:00 committed by GitHub
parent 02cfa76916
commit 55833e20b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 191 additions and 139 deletions

View File

@ -20,6 +20,7 @@ use mint::Mint;
use native_loader;
use payment_plan::Payment;
use poh_recorder::PohRecorder;
use poh_service::NUM_TICKS_PER_SECOND;
use rayon::prelude::*;
use rpc::RpcSignatureStatus;
use signature::Keypair;
@ -49,7 +50,6 @@ use window::WINDOW_SIZE;
/// but requires clients to update its `last_id` more frequently. Raising the value
/// lengthens the time a client must wait to be certain a missing transaction will
/// not be processed by the network.
pub const NUM_TICKS_PER_SECOND: usize = 10;
pub const MAX_ENTRY_IDS: usize = NUM_TICKS_PER_SECOND * 120;
pub const VERIFY_BLOCK_SIZE: usize = 16;

View File

@ -2,7 +2,7 @@
//! to contruct a software pipeline. The stage uses all available CPU cores and
//! can do its processing in parallel with signature verification on the GPU.
use bank::{Bank, NUM_TICKS_PER_SECOND};
use bank::Bank;
use bincode::deserialize;
use counter::Counter;
use entry::Entry;
@ -10,15 +10,15 @@ use hash::Hash;
use log::Level;
use packet::Packets;
use poh_recorder::{PohRecorder, PohRecorderError};
use poh_service::{Config, PohService};
use rayon::prelude::*;
use result::{Error, Result};
use service::Service;
use sigverify_stage::VerifiedPackets;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
use std::sync::{Arc, Mutex};
use std::thread::sleep;
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use std::time::Instant;
@ -38,23 +38,9 @@ pub const NUM_THREADS: usize = 10;
pub struct BankingStage {
/// Handle to the stage's thread.
bank_thread_hdls: Vec<JoinHandle<Option<BankingStageReturnType>>>,
tick_producer: JoinHandle<Option<BankingStageReturnType>>,
poh_service: PohService,
}
pub enum Config {
/// * `Tick` - Run full PoH thread. Tick is a rough estimate of how many hashes to roll before transmitting a new entry.
Tick(usize),
/// * `Sleep`- Low power mode. Sleep is a rough estimate of how long to sleep before rolling 1 poh once and producing 1
/// tick.
Sleep(Duration),
}
impl Default for Config {
fn default() -> Config {
// TODO: Change this to Tick to enable PoH
Config::Sleep(Duration::from_millis(1000 / NUM_TICKS_PER_SECOND as u64))
}
}
impl BankingStage {
/// Create the stage using `bank`. Exit when `verified_receiver` is dropped.
pub fn new(
@ -67,49 +53,28 @@ impl BankingStage {
) -> (Self, Receiver<Vec<Entry>>) {
let (entry_sender, entry_receiver) = channel();
let shared_verified_receiver = Arc::new(Mutex::new(verified_receiver));
let poh = PohRecorder::new(
let poh_recorder = PohRecorder::new(
bank.clone(),
entry_sender,
*last_entry_id,
tick_height,
max_tick_height,
false,
vec![],
);
let tick_poh = poh.clone();
// Tick producer is a headless producer, so when it exits it should notify the banking stage.
// Since channel are not used to talk between these threads an AtomicBool is used as a
// signal.
let poh_exit = Arc::new(AtomicBool::new(false));
let banking_exit = poh_exit.clone();
// Single thread to generate entries from many banks.
// This thread talks to poh_service and broadcasts the entries once they have been recorded.
// Once an entry has been recorded, its last_id is registered with the bank.
let tick_producer = Builder::new()
.name("solana-banking-stage-tick_producer".to_string())
.spawn(move || {
let mut tick_poh_ = tick_poh;
let return_value = match Self::tick_producer(&mut tick_poh_, &config, &poh_exit) {
Err(Error::SendError) => Some(BankingStageReturnType::ChannelDisconnected),
Err(e) => {
error!(
"solana-banking-stage-tick_producer unexpected error {:?}",
e
);
None
}
Ok(x) => x,
};
debug!("tick producer exiting");
poh_exit.store(true, Ordering::Relaxed);
return_value
}).unwrap();
let poh_service = PohService::new(poh_recorder.clone(), config);
// Many banks that process transactions in parallel.
let bank_thread_hdls: Vec<JoinHandle<Option<BankingStageReturnType>>> = (0..NUM_THREADS)
.map(|_| {
let thread_bank = bank.clone();
let thread_verified_receiver = shared_verified_receiver.clone();
let thread_poh = poh.clone();
let thread_banking_exit = banking_exit.clone();
let thread_poh_recorder = poh_recorder.clone();
let thread_banking_exit = poh_service.poh_exit.clone();
Builder::new()
.name("solana-banking-stage-tx".to_string())
.spawn(move || {
@ -117,7 +82,7 @@ impl BankingStage {
if let Err(e) = Self::process_packets(
&thread_bank,
&thread_verified_receiver,
&thread_poh,
&thread_poh_recorder,
) {
debug!("got error {:?}", e);
match e {
@ -138,7 +103,6 @@ impl BankingStage {
}
}
if thread_banking_exit.load(Ordering::Relaxed) {
debug!("tick service exited");
break None;
}
};
@ -150,7 +114,7 @@ impl BankingStage {
(
BankingStage {
bank_thread_hdls,
tick_producer,
poh_service,
},
entry_receiver,
)
@ -168,47 +132,6 @@ impl BankingStage {
}).collect()
}
fn tick_producer(
poh: &mut PohRecorder,
config: &Config,
poh_exit: &AtomicBool,
) -> Result<Option<BankingStageReturnType>> {
loop {
match *config {
Config::Tick(num) => {
for _ in 0..num {
match poh.hash() {
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) => {
return Ok(Some(BankingStageReturnType::LeaderRotation));
}
Err(e) => {
return Err(e);
}
_ => (),
}
}
}
Config::Sleep(duration) => {
sleep(duration);
}
}
match poh.tick() {
Ok(height) if Some(height) == poh.max_tick_height => {
// CASE 1: We were successful in recording the last tick, so exit
return Ok(Some(BankingStageReturnType::LeaderRotation));
}
Ok(_) => (),
Err(e) => {
return Err(e);
}
};
if poh_exit.load(Ordering::Relaxed) {
debug!("tick service exited");
return Ok(None);
}
}
}
fn process_transactions(
bank: &Arc<Bank>,
transactions: &[Transaction],
@ -306,9 +229,16 @@ impl Service for BankingStage {
}
}
let tick_return_value = self.tick_producer.join()?;
if tick_return_value.is_some() {
return_value = tick_return_value;
let poh_return_value = self.poh_service.join()?;
match poh_return_value {
Ok(_) => (),
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) => {
return_value = Some(BankingStageReturnType::LeaderRotation);
}
Err(Error::SendError) => {
return_value = Some(BankingStageReturnType::ChannelDisconnected);
}
Err(_) => (),
}
Ok(return_value)

View File

@ -49,6 +49,7 @@ pub mod packet;
pub mod payment_plan;
pub mod poh;
pub mod poh_recorder;
pub mod poh_service;
pub mod recvmmsg;
pub mod replicate_stage;
pub mod replicator;

View File

@ -12,11 +12,14 @@ use transaction::Transaction;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum PohRecorderError {
InvalidCallingObject,
MaxHeightReached,
}
#[derive(Clone)]
pub struct PohRecorder {
is_virtual: bool,
virtual_tick_entries: Arc<Mutex<Vec<Entry>>>,
poh: Arc<Mutex<Poh>>,
bank: Arc<Bank>,
sender: Sender<Vec<Entry>>,
@ -27,6 +30,51 @@ pub struct PohRecorder {
}
impl PohRecorder {
pub fn hash(&self) -> Result<()> {
// TODO: amortize the cost of this lock by doing the loop in here for
// some min amount of hashes
let mut poh = self.poh.lock().unwrap();
if self.is_max_tick_height_reached(&*poh) {
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
} else {
poh.hash();
Ok(())
}
}
pub fn tick(&mut self) -> Result<()> {
// Register and send the entry out while holding the lock if the max PoH height
// hasn't been reached.
// This guarantees PoH order and Entry production and banks LastId queue is the same
let mut poh = self.poh.lock().unwrap();
if self.is_max_tick_height_reached(&*poh) {
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
} else if self.is_virtual {
self.generate_and_store_tick(&mut *poh);
Ok(())
} else {
self.register_and_send_tick(&mut *poh)?;
Ok(())
}
}
pub fn record(&self, mixin: Hash, txs: Vec<Transaction>) -> Result<()> {
if self.is_virtual {
return Err(Error::PohRecorderError(
PohRecorderError::InvalidCallingObject,
));
}
// Register and send the entry out while holding the lock.
// This guarantees PoH order and Entry production and banks LastId queue is the same.
let mut poh = self.poh.lock().unwrap();
if self.is_max_tick_height_reached(&*poh) {
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
} else {
self.record_and_send_txs(&mut *poh, mixin, txs)?;
Ok(())
}
}
/// A recorder to synchronize PoH with the following data structures
/// * bank - the LastId's queue is updated on `tick` and `record` events
/// * sender - the Entry channel that outputs to the ledger
@ -36,54 +84,31 @@ impl PohRecorder {
last_entry_id: Hash,
tick_height: u64,
max_tick_height: Option<u64>,
is_virtual: bool,
virtual_tick_entries: Vec<Entry>,
) -> Self {
let poh = Arc::new(Mutex::new(Poh::new(last_entry_id, tick_height)));
let virtual_tick_entries = Arc::new(Mutex::new(virtual_tick_entries));
PohRecorder {
poh,
bank,
sender,
max_tick_height,
is_virtual,
virtual_tick_entries,
}
}
pub fn hash(&self) -> Result<()> {
// TODO: amortize the cost of this lock by doing the loop in here for
// some min amount of hashes
let mut poh = self.poh.lock().unwrap();
if self.check_max_tick_height_reached(&*poh) {
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
} else {
poh.hash();
Ok(())
fn generate_tick_entry(&self, poh: &mut Poh) -> Entry {
let tick = poh.tick();
Entry {
num_hashes: tick.num_hashes,
id: tick.id,
transactions: vec![],
}
}
pub fn tick(&mut self) -> Result<u64> {
// Register and send the entry out while holding the lock if the max PoH height
// hasn't been reached.
// This guarantees PoH order and Entry production and banks LastId queue is the same
let mut poh = self.poh.lock().unwrap();
if self.check_max_tick_height_reached(&*poh) {
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
} else {
self.register_and_send_tick(&mut *poh)?;
Ok(poh.tick_height)
}
}
pub fn record(&self, mixin: Hash, txs: Vec<Transaction>) -> Result<()> {
// Register and send the entry out while holding the lock.
// This guarantees PoH order and Entry production and banks LastId queue is the same.
let mut poh = self.poh.lock().unwrap();
if self.check_max_tick_height_reached(&*poh) {
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
} else {
self.record_and_send_txs(&mut *poh, mixin, txs)?;
Ok(())
}
}
fn check_max_tick_height_reached(&self, poh: &Poh) -> bool {
fn is_max_tick_height_reached(&self, poh: &Poh) -> bool {
if let Some(max_tick_height) = self.max_tick_height {
poh.tick_height >= max_tick_height
} else {
@ -103,15 +128,15 @@ impl PohRecorder {
Ok(())
}
fn generate_and_store_tick(&self, poh: &mut Poh) {
let tick_entry = self.generate_tick_entry(poh);
self.virtual_tick_entries.lock().unwrap().push(tick_entry);
}
fn register_and_send_tick(&self, poh: &mut Poh) -> Result<()> {
let tick = poh.tick();
self.bank.register_entry_id(&tick.id);
let entry = Entry {
num_hashes: tick.num_hashes,
id: tick.id,
transactions: vec![],
};
self.sender.send(vec![entry])?;
let tick_entry = self.generate_tick_entry(poh);
self.bank.register_entry_id(&tick_entry.id);
self.sender.send(vec![tick_entry])?;
Ok(())
}
}
@ -131,7 +156,8 @@ mod tests {
let bank = Arc::new(Bank::new(&mint));
let last_id = bank.last_id();
let (entry_sender, entry_receiver) = channel();
let mut poh_recorder = PohRecorder::new(bank, entry_sender, last_id, 0, None);
let mut poh_recorder =
PohRecorder::new(bank, entry_sender, last_id, 0, None, false, vec![]);
//send some data
let h1 = hash(b"hello world!");

94
src/poh_service.rs Normal file
View File

@ -0,0 +1,94 @@
//! The `poh_service` module implements a service that records the passing of
//! "ticks", a measure of time in the PoH stream
use poh_recorder::PohRecorder;
use result::Result;
use service::Service;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::sleep;
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
pub const NUM_TICKS_PER_SECOND: usize = 10;
#[derive(Copy, Clone)]
pub enum Config {
/// * `Tick` - Run full PoH thread. Tick is a rough estimate of how many hashes to roll before transmitting a new entry.
Tick(usize),
/// * `Sleep`- Low power mode. Sleep is a rough estimate of how long to sleep before rolling 1 poh once and producing 1
/// tick.
Sleep(Duration),
}
impl Default for Config {
fn default() -> Config {
// TODO: Change this to Tick to enable PoH
Config::Sleep(Duration::from_millis(1000 / NUM_TICKS_PER_SECOND as u64))
}
}
pub struct PohService {
tick_producer: JoinHandle<Result<()>>,
pub poh_exit: Arc<AtomicBool>,
}
impl PohService {
pub fn exit(&self) -> () {
self.poh_exit.store(true, Ordering::Relaxed);
}
pub fn close(self) -> thread::Result<Result<()>> {
self.exit();
self.join()
}
pub fn new(poh_recorder: PohRecorder, config: Config) -> Self {
// PohService is a headless producer, so when it exits it should notify the banking stage.
// Since channel are not used to talk between these threads an AtomicBool is used as a
// signal.
let poh_exit = Arc::new(AtomicBool::new(false));
let poh_exit_ = poh_exit.clone();
// Single thread to generate ticks
let tick_producer = Builder::new()
.name("solana-poh-service-tick_producer".to_string())
.spawn(move || {
let mut poh_recorder_ = poh_recorder;
let return_value = Self::tick_producer(&mut poh_recorder_, config, &poh_exit_);
poh_exit_.store(true, Ordering::Relaxed);
return_value
}).unwrap();
PohService {
tick_producer,
poh_exit,
}
}
fn tick_producer(poh: &mut PohRecorder, config: Config, poh_exit: &AtomicBool) -> Result<()> {
loop {
match config {
Config::Tick(num) => {
for _ in 0..num {
poh.hash()?;
}
}
Config::Sleep(duration) => {
sleep(duration);
}
}
poh.tick()?;
if poh_exit.load(Ordering::Relaxed) {
debug!("tick service exited");
return Ok(());
}
}
}
}
impl Service for PohService {
type JoinReturnType = Result<()>;
fn join(self) -> thread::Result<Result<()>> {
self.tick_producer.join()
}
}

View File

@ -26,13 +26,14 @@
//! ```
use bank::Bank;
use banking_stage::{BankingStage, BankingStageReturnType, Config};
use banking_stage::{BankingStage, BankingStageReturnType};
use cluster_info::ClusterInfo;
use entry::Entry;
use fetch_stage::FetchStage;
use hash::Hash;
use leader_vote_stage::LeaderVoteStage;
use ledger_write_stage::LedgerWriteStage;
use poh_service::Config;
use service::Service;
use signature::Keypair;
use sigverify_stage::SigVerifyStage;