solana/src/replicate_stage.rs

53 lines
1.5 KiB
Rust
Raw Normal View History

2018-05-22 14:26:28 -07:00
//! The `replicate_stage` replicates transactions broadcast by the leader.
2018-05-22 15:30:46 -07:00
use bank::Bank;
use ledger;
2018-05-22 14:26:28 -07:00
use packet;
2018-05-22 15:30:46 -07:00
use result::Result;
2018-05-22 15:18:07 -07:00
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
2018-05-22 14:26:28 -07:00
use std::thread::{spawn, JoinHandle};
2018-05-22 15:30:46 -07:00
use std::time::Duration;
2018-05-22 14:26:28 -07:00
use streamer;
pub struct ReplicateStage {
pub thread_hdl: JoinHandle<()>,
}
impl ReplicateStage {
2018-05-22 15:30:46 -07:00
/// Process verified blobs, already in order
fn replicate_requests(
bank: &Arc<Bank>,
verified_receiver: &streamer::BlobReceiver,
blob_recycler: &packet::BlobRecycler,
) -> Result<()> {
let timer = Duration::new(1, 0);
let blobs = verified_receiver.recv_timeout(timer)?;
let entries = ledger::reconstruct_entries_from_blobs(&blobs);
let res = bank.process_entries(entries);
2018-05-22 15:30:46 -07:00
if res.is_err() {
error!("process_entries {} {:?}", blobs.len(), res);
2018-05-22 15:30:46 -07:00
}
res?;
for blob in blobs {
blob_recycler.recycle(blob);
}
Ok(())
}
2018-05-22 15:18:07 -07:00
pub fn new(
2018-05-22 15:30:46 -07:00
bank: Arc<Bank>,
2018-05-22 15:18:07 -07:00
exit: Arc<AtomicBool>,
window_receiver: streamer::BlobReceiver,
blob_recycler: packet::BlobRecycler,
) -> Self {
2018-05-22 14:26:28 -07:00
let thread_hdl = spawn(move || loop {
2018-05-22 15:30:46 -07:00
let e = Self::replicate_requests(&bank, &window_receiver, &blob_recycler);
2018-05-22 15:17:59 -07:00
if e.is_err() && exit.load(Ordering::Relaxed) {
2018-05-22 14:26:28 -07:00
break;
}
});
2018-05-22 15:18:07 -07:00
ReplicateStage { thread_hdl }
2018-05-22 14:26:28 -07:00
}
}