solana/src/rpu.rs

108 lines
3.1 KiB
Rust
Raw Normal View History

2018-05-12 09:53:25 -07:00
//! The `rpu` module implements the Request Processing Unit, a
//! 5-stage transaction processing pipeline in software.
2018-03-29 11:20:54 -07:00
2018-05-14 14:33:11 -07:00
use bank::Bank;
2018-04-28 00:31:20 -07:00
use crdt::{Crdt, ReplicatedData};
2018-05-14 13:35:25 -07:00
use hash::Hash;
2018-03-26 21:07:11 -07:00
use packet;
use record_stage::RecordStage;
use request_processor::RequestProcessor;
use request_stage::RequestStage;
use sig_verify_stage::SigVerifyStage;
2018-05-11 11:38:52 -07:00
use std::io::Write;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
2018-04-18 12:02:54 -07:00
use std::sync::{Arc, Mutex, RwLock};
use std::thread::JoinHandle;
2018-05-14 13:35:25 -07:00
use std::time::Duration;
use streamer;
use write_stage::WriteStage;
2018-02-28 09:07:54 -08:00
2018-05-12 09:53:25 -07:00
pub struct Rpu {
pub thread_hdls: Vec<JoinHandle<()>>,
}
2018-05-12 09:53:25 -07:00
impl Rpu {
2018-05-15 09:45:33 -07:00
pub fn new<W: Write + Send + 'static>(
bank: Bank,
start_hash: Hash,
tick_duration: Option<Duration>,
me: ReplicatedData,
requests_socket: UdpSocket,
broadcast_socket: UdpSocket,
respond_socket: UdpSocket,
gossip: UdpSocket,
exit: Arc<AtomicBool>,
writer: W,
) -> Self {
2018-05-15 09:44:47 -07:00
let bank = Arc::new(bank);
let packet_recycler = packet::PacketRecycler::default();
let (packet_sender, packet_receiver) = channel();
2018-05-11 15:41:35 -07:00
let t_receiver = streamer::receiver(
requests_socket,
exit.clone(),
packet_recycler.clone(),
packet_sender,
);
2018-03-26 21:07:11 -07:00
let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver);
2018-03-26 21:07:11 -07:00
2018-05-11 19:50:50 -07:00
let blob_recycler = packet::BlobRecycler::default();
2018-05-15 09:44:47 -07:00
let request_processor = RequestProcessor::new(bank.clone());
2018-05-12 09:31:28 -07:00
let request_stage = RequestStage::new(
request_processor,
2018-05-11 19:50:50 -07:00
exit.clone(),
sig_verify_stage.verified_receiver,
2018-05-11 19:50:50 -07:00
packet_recycler.clone(),
blob_recycler.clone(),
);
2018-05-15 09:38:17 -07:00
let record_stage =
RecordStage::new(request_stage.signal_receiver, &start_hash, tick_duration);
let write_stage = WriteStage::new(
2018-05-15 09:44:47 -07:00
bank.clone(),
2018-05-11 19:11:25 -07:00
exit.clone(),
blob_recycler.clone(),
Mutex::new(writer),
record_stage.entry_receiver,
2018-05-11 19:11:25 -07:00
);
2018-04-28 00:31:20 -07:00
2018-05-15 08:17:48 -07:00
let crdt = Arc::new(RwLock::new(Crdt::new(me)));
let t_gossip = Crdt::gossip(crdt.clone(), exit.clone());
let window = streamer::default_window();
let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone());
2018-04-28 00:31:20 -07:00
let t_broadcast = streamer::broadcaster(
broadcast_socket,
exit.clone(),
crdt.clone(),
2018-05-12 19:00:22 -07:00
window,
2018-04-28 00:31:20 -07:00
blob_recycler.clone(),
write_stage.blob_receiver,
2018-04-28 00:31:20 -07:00
);
2018-05-11 19:11:25 -07:00
let t_responder = streamer::responder(
respond_socket,
2018-04-28 00:31:20 -07:00
exit.clone(),
blob_recycler.clone(),
request_stage.blob_receiver,
2018-04-28 00:31:20 -07:00
);
2018-05-15 09:44:47 -07:00
let mut thread_hdls = vec![
2018-04-28 00:31:20 -07:00
t_receiver,
t_responder,
2018-05-12 09:31:28 -07:00
request_stage.thread_hdl,
write_stage.thread_hdl,
2018-04-28 00:31:20 -07:00
t_gossip,
t_listen,
t_broadcast,
];
2018-05-15 09:44:47 -07:00
thread_hdls.extend(sig_verify_stage.thread_hdls.into_iter());
Rpu { thread_hdls }
2018-03-26 21:07:11 -07:00
}
2018-04-11 13:05:29 -07:00
}