solana/src/rpu.rs

141 lines
4.1 KiB
Rust
Raw Normal View History

2018-05-12 09:53:25 -07:00
//! The `rpu` module implements the Request Processing Unit, a
//! 5-stage transaction processing pipeline in software.
2018-03-29 11:20:54 -07:00
2018-05-14 14:33:11 -07:00
use bank::Bank;
2018-04-28 00:31:20 -07:00
use crdt::{Crdt, ReplicatedData};
2018-05-14 13:35:25 -07:00
use hash::Hash;
2018-03-26 21:07:11 -07:00
use packet;
use record_stage::RecordStage;
use request_processor::RequestProcessor;
use request_stage::RequestStage;
use sig_verify_stage::SigVerifyStage;
2018-05-11 11:38:52 -07:00
use std::io::Write;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
2018-04-18 12:02:54 -07:00
use std::sync::{Arc, Mutex, RwLock};
use std::thread::JoinHandle;
2018-05-14 13:35:25 -07:00
use std::time::Duration;
use streamer;
use write_stage::WriteStage;
2018-02-28 09:07:54 -08:00
2018-05-12 09:53:25 -07:00
pub struct Rpu {
2018-05-14 14:33:11 -07:00
bank: Arc<Bank>,
pub thread_hdls: Vec<JoinHandle<()>>,
}
2018-05-12 09:53:25 -07:00
impl Rpu {
2018-05-14 14:33:11 -07:00
/// Create a new Rpu that wraps the given Bank.
pub fn new1<W: Write + Send + 'static>(
bank: Bank,
start_hash: Hash,
tick_duration: Option<Duration>,
me: ReplicatedData,
requests_socket: UdpSocket,
broadcast_socket: UdpSocket,
respond_socket: UdpSocket,
gossip: UdpSocket,
exit: Arc<AtomicBool>,
writer: W,
) -> Self {
2018-05-15 09:33:16 -07:00
let mut rpu = Rpu {
bank: Arc::new(bank),
thread_hdls: vec![],
};
let thread_hdls = rpu.serve(
2018-05-15 09:38:17 -07:00
start_hash,
tick_duration,
me,
requests_socket,
broadcast_socket,
respond_socket,
gossip,
exit,
writer,
);
rpu.thread_hdls.extend(thread_hdls);
rpu
}
2018-05-12 09:53:25 -07:00
/// Create a UDP microservice that forwards messages the given Rpu.
2018-04-17 19:26:19 -07:00
/// This service is the network leader
2018-03-29 11:20:54 -07:00
/// Set `exit` to shutdown its threads.
2018-04-28 00:31:20 -07:00
pub fn serve<W: Write + Send + 'static>(
2018-05-11 23:19:12 -07:00
&self,
2018-05-15 09:38:17 -07:00
start_hash: Hash,
tick_duration: Option<Duration>,
2018-04-28 00:31:20 -07:00
me: ReplicatedData,
2018-05-11 15:41:35 -07:00
requests_socket: UdpSocket,
broadcast_socket: UdpSocket,
respond_socket: UdpSocket,
2018-04-28 00:31:20 -07:00
gossip: UdpSocket,
2018-03-22 13:05:23 -07:00
exit: Arc<AtomicBool>,
2018-04-28 00:31:20 -07:00
writer: W,
) -> Vec<JoinHandle<()>> {
let packet_recycler = packet::PacketRecycler::default();
let (packet_sender, packet_receiver) = channel();
2018-05-11 15:41:35 -07:00
let t_receiver = streamer::receiver(
requests_socket,
exit.clone(),
packet_recycler.clone(),
packet_sender,
);
2018-03-26 21:07:11 -07:00
let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver);
2018-03-26 21:07:11 -07:00
2018-05-11 19:50:50 -07:00
let blob_recycler = packet::BlobRecycler::default();
2018-05-14 14:33:11 -07:00
let request_processor = RequestProcessor::new(self.bank.clone());
2018-05-12 09:31:28 -07:00
let request_stage = RequestStage::new(
request_processor,
2018-05-11 19:50:50 -07:00
exit.clone(),
sig_verify_stage.verified_receiver,
2018-05-11 19:50:50 -07:00
packet_recycler.clone(),
blob_recycler.clone(),
);
2018-05-15 09:38:17 -07:00
let record_stage =
RecordStage::new(request_stage.signal_receiver, &start_hash, tick_duration);
let write_stage = WriteStage::new(
2018-05-14 14:33:11 -07:00
self.bank.clone(),
2018-05-11 19:11:25 -07:00
exit.clone(),
blob_recycler.clone(),
Mutex::new(writer),
record_stage.entry_receiver,
2018-05-11 19:11:25 -07:00
);
2018-04-28 00:31:20 -07:00
2018-05-15 08:17:48 -07:00
let crdt = Arc::new(RwLock::new(Crdt::new(me)));
let t_gossip = Crdt::gossip(crdt.clone(), exit.clone());
let window = streamer::default_window();
let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone());
2018-04-28 00:31:20 -07:00
let t_broadcast = streamer::broadcaster(
broadcast_socket,
exit.clone(),
crdt.clone(),
2018-05-12 19:00:22 -07:00
window,
2018-04-28 00:31:20 -07:00
blob_recycler.clone(),
write_stage.blob_receiver,
2018-04-28 00:31:20 -07:00
);
2018-05-11 19:11:25 -07:00
let t_responder = streamer::responder(
respond_socket,
2018-04-28 00:31:20 -07:00
exit.clone(),
blob_recycler.clone(),
request_stage.blob_receiver,
2018-04-28 00:31:20 -07:00
);
let mut threads = vec![
2018-04-28 00:31:20 -07:00
t_receiver,
t_responder,
2018-05-12 09:31:28 -07:00
request_stage.thread_hdl,
write_stage.thread_hdl,
2018-04-28 00:31:20 -07:00
t_gossip,
t_listen,
t_broadcast,
];
threads.extend(sig_verify_stage.thread_hdls.into_iter());
threads
2018-03-26 21:07:11 -07:00
}
2018-04-11 13:05:29 -07:00
}