Return output receivers from each stage

Reaching into the stages' structs for their receivers is, in hindsight,
more awkward than returning multiple values from constructors. By
returning the receiver, the caller can name the receiver whatever it
wants (as you would with any return value), and doesn't need to
reach into the struct for the field (which is super awkward in
combination with move semantics).
This commit is contained in:
Greg Fitzgerald 2018-07-02 15:45:16 -06:00 committed by Greg Fitzgerald
parent 6ee45d282e
commit 5d17c2b58f
12 changed files with 71 additions and 94 deletions

View File

@ -23,9 +23,6 @@ use transaction::Transaction;
pub struct BankingStage { pub struct BankingStage {
/// Handle to the stage's thread. /// Handle to the stage's thread.
pub thread_hdl: JoinHandle<()>, pub thread_hdl: JoinHandle<()>,
/// Output receiver for the following stage.
pub signal_receiver: Receiver<Signal>,
} }
impl BankingStage { impl BankingStage {
@ -38,7 +35,7 @@ impl BankingStage {
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>, verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>,
packet_recycler: PacketRecycler, packet_recycler: PacketRecycler,
) -> Self { ) -> (Self, Receiver<Signal>) {
let (signal_sender, signal_receiver) = channel(); let (signal_sender, signal_receiver) = channel();
let thread_hdl = Builder::new() let thread_hdl = Builder::new()
.name("solana-banking-stage".to_string()) .name("solana-banking-stage".to_string())
@ -56,10 +53,7 @@ impl BankingStage {
} }
}) })
.unwrap(); .unwrap();
BankingStage { (BankingStage { thread_hdl }, signal_receiver)
thread_hdl,
signal_receiver,
}
} }
/// Convert the transactions from a blob of binary data to a vector of transactions and /// Convert the transactions from a blob of binary data to a vector of transactions and

View File

@ -9,19 +9,22 @@ use std::thread::JoinHandle;
use streamer::{self, BlobReceiver}; use streamer::{self, BlobReceiver};
pub struct BlobFetchStage { pub struct BlobFetchStage {
pub blob_receiver: BlobReceiver,
pub thread_hdls: Vec<JoinHandle<()>>, pub thread_hdls: Vec<JoinHandle<()>>,
} }
impl BlobFetchStage { impl BlobFetchStage {
pub fn new(socket: UdpSocket, exit: Arc<AtomicBool>, blob_recycler: BlobRecycler) -> Self { pub fn new(
socket: UdpSocket,
exit: Arc<AtomicBool>,
blob_recycler: BlobRecycler,
) -> (Self, BlobReceiver) {
Self::new_multi_socket(vec![socket], exit, blob_recycler) Self::new_multi_socket(vec![socket], exit, blob_recycler)
} }
pub fn new_multi_socket( pub fn new_multi_socket(
sockets: Vec<UdpSocket>, sockets: Vec<UdpSocket>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
blob_recycler: BlobRecycler, blob_recycler: BlobRecycler,
) -> Self { ) -> (Self, BlobReceiver) {
let (blob_sender, blob_receiver) = channel(); let (blob_sender, blob_receiver) = channel();
let thread_hdls: Vec<_> = sockets let thread_hdls: Vec<_> = sockets
.into_iter() .into_iter()
@ -35,9 +38,6 @@ impl BlobFetchStage {
}) })
.collect(); .collect();
BlobFetchStage { (BlobFetchStage { thread_hdls }, blob_receiver)
blob_receiver,
thread_hdls,
}
} }
} }

View File

@ -9,19 +9,22 @@ use std::thread::JoinHandle;
use streamer::{self, PacketReceiver}; use streamer::{self, PacketReceiver};
pub struct FetchStage { pub struct FetchStage {
pub packet_receiver: PacketReceiver,
pub thread_hdls: Vec<JoinHandle<()>>, pub thread_hdls: Vec<JoinHandle<()>>,
} }
impl FetchStage { impl FetchStage {
pub fn new(socket: UdpSocket, exit: Arc<AtomicBool>, packet_recycler: PacketRecycler) -> Self { pub fn new(
socket: UdpSocket,
exit: Arc<AtomicBool>,
packet_recycler: PacketRecycler,
) -> (Self, PacketReceiver) {
Self::new_multi_socket(vec![socket], exit, packet_recycler) Self::new_multi_socket(vec![socket], exit, packet_recycler)
} }
pub fn new_multi_socket( pub fn new_multi_socket(
sockets: Vec<UdpSocket>, sockets: Vec<UdpSocket>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
packet_recycler: PacketRecycler, packet_recycler: PacketRecycler,
) -> Self { ) -> (Self, PacketReceiver) {
let (packet_sender, packet_receiver) = channel(); let (packet_sender, packet_receiver) = channel();
let thread_hdls: Vec<_> = sockets let thread_hdls: Vec<_> = sockets
.into_iter() .into_iter()
@ -35,9 +38,6 @@ impl FetchStage {
}) })
.collect(); .collect();
FetchStage { (FetchStage { thread_hdls }, packet_receiver)
packet_receiver,
thread_hdls,
}
} }
} }

View File

@ -20,14 +20,16 @@ pub enum Signal {
} }
pub struct RecordStage { pub struct RecordStage {
pub entry_receiver: Receiver<Vec<Entry>>,
pub thread_hdl: JoinHandle<()>, pub thread_hdl: JoinHandle<()>,
} }
impl RecordStage { impl RecordStage {
/// A background thread that will continue tagging received Transaction messages and /// A background thread that will continue tagging received Transaction messages and
/// sending back Entry messages until either the receiver or sender channel is closed. /// sending back Entry messages until either the receiver or sender channel is closed.
pub fn new(signal_receiver: Receiver<Signal>, start_hash: &Hash) -> Self { pub fn new(
signal_receiver: Receiver<Signal>,
start_hash: &Hash,
) -> (Self, Receiver<Vec<Entry>>) {
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let start_hash = start_hash.clone(); let start_hash = start_hash.clone();
@ -39,10 +41,7 @@ impl RecordStage {
}) })
.unwrap(); .unwrap();
RecordStage { (RecordStage { thread_hdl }, entry_receiver)
entry_receiver,
thread_hdl,
}
} }
/// Same as `RecordStage::new`, but will automatically produce entries every `tick_duration`. /// Same as `RecordStage::new`, but will automatically produce entries every `tick_duration`.
@ -50,7 +49,7 @@ impl RecordStage {
signal_receiver: Receiver<Signal>, signal_receiver: Receiver<Signal>,
start_hash: &Hash, start_hash: &Hash,
tick_duration: Duration, tick_duration: Duration,
) -> Self { ) -> (Self, Receiver<Vec<Entry>>) {
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let start_hash = start_hash.clone(); let start_hash = start_hash.clone();
@ -74,10 +73,7 @@ impl RecordStage {
}) })
.unwrap(); .unwrap();
RecordStage { (RecordStage { thread_hdl }, entry_receiver)
entry_receiver,
thread_hdl,
}
} }
fn process_signal( fn process_signal(
@ -140,7 +136,7 @@ mod tests {
fn test_historian() { fn test_historian() {
let (tx_sender, tx_receiver) = channel(); let (tx_sender, tx_receiver) = channel();
let zero = Hash::default(); let zero = Hash::default();
let record_stage = RecordStage::new(tx_receiver, &zero); let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, &zero);
tx_sender.send(Signal::Tick).unwrap(); tx_sender.send(Signal::Tick).unwrap();
sleep(Duration::new(0, 1_000_000)); sleep(Duration::new(0, 1_000_000));
@ -148,9 +144,9 @@ mod tests {
sleep(Duration::new(0, 1_000_000)); sleep(Duration::new(0, 1_000_000));
tx_sender.send(Signal::Tick).unwrap(); tx_sender.send(Signal::Tick).unwrap();
let entry0 = record_stage.entry_receiver.recv().unwrap()[0].clone(); let entry0 = entry_receiver.recv().unwrap()[0].clone();
let entry1 = record_stage.entry_receiver.recv().unwrap()[0].clone(); let entry1 = entry_receiver.recv().unwrap()[0].clone();
let entry2 = record_stage.entry_receiver.recv().unwrap()[0].clone(); let entry2 = entry_receiver.recv().unwrap()[0].clone();
assert_eq!(entry0.num_hashes, 0); assert_eq!(entry0.num_hashes, 0);
assert_eq!(entry1.num_hashes, 0); assert_eq!(entry1.num_hashes, 0);
@ -166,8 +162,8 @@ mod tests {
fn test_historian_closed_sender() { fn test_historian_closed_sender() {
let (tx_sender, tx_receiver) = channel(); let (tx_sender, tx_receiver) = channel();
let zero = Hash::default(); let zero = Hash::default();
let record_stage = RecordStage::new(tx_receiver, &zero); let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, &zero);
drop(record_stage.entry_receiver); drop(entry_receiver);
tx_sender.send(Signal::Tick).unwrap(); tx_sender.send(Signal::Tick).unwrap();
assert_eq!(record_stage.thread_hdl.join().unwrap(), ()); assert_eq!(record_stage.thread_hdl.join().unwrap(), ());
} }
@ -176,7 +172,7 @@ mod tests {
fn test_transactions() { fn test_transactions() {
let (tx_sender, signal_receiver) = channel(); let (tx_sender, signal_receiver) = channel();
let zero = Hash::default(); let zero = Hash::default();
let record_stage = RecordStage::new(signal_receiver, &zero); let (_record_stage, entry_receiver) = RecordStage::new(signal_receiver, &zero);
let alice_keypair = KeyPair::new(); let alice_keypair = KeyPair::new();
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let tx0 = Transaction::new(&alice_keypair, bob_pubkey, 1, zero); let tx0 = Transaction::new(&alice_keypair, bob_pubkey, 1, zero);
@ -185,7 +181,7 @@ mod tests {
.send(Signal::Transactions(vec![tx0, tx1])) .send(Signal::Transactions(vec![tx0, tx1]))
.unwrap(); .unwrap();
drop(tx_sender); drop(tx_sender);
let entries: Vec<_> = record_stage.entry_receiver.iter().collect(); let entries: Vec<_> = entry_receiver.iter().collect();
assert_eq!(entries.len(), 1); assert_eq!(entries.len(), 1);
} }
@ -193,12 +189,12 @@ mod tests {
fn test_clock() { fn test_clock() {
let (tx_sender, tx_receiver) = channel(); let (tx_sender, tx_receiver) = channel();
let zero = Hash::default(); let zero = Hash::default();
let record_stage = let (_record_stage, entry_receiver) =
RecordStage::new_with_clock(tx_receiver, &zero, Duration::from_millis(20)); RecordStage::new_with_clock(tx_receiver, &zero, Duration::from_millis(20));
sleep(Duration::from_millis(900)); sleep(Duration::from_millis(900));
tx_sender.send(Signal::Tick).unwrap(); tx_sender.send(Signal::Tick).unwrap();
drop(tx_sender); drop(tx_sender);
let entries: Vec<_> = record_stage.entry_receiver.iter().flat_map(|x| x).collect(); let entries: Vec<_> = entry_receiver.iter().flat_map(|x| x).collect();
assert!(entries.len() > 1); assert!(entries.len() > 1);
// Ensure the ID is not the seed. // Ensure the ID is not the seed.

View File

@ -17,7 +17,6 @@ use timing;
pub struct RequestStage { pub struct RequestStage {
pub thread_hdl: JoinHandle<()>, pub thread_hdl: JoinHandle<()>,
pub blob_receiver: BlobReceiver,
pub request_processor: Arc<RequestProcessor>, pub request_processor: Arc<RequestProcessor>,
} }
@ -85,7 +84,7 @@ impl RequestStage {
packet_receiver: Receiver<SharedPackets>, packet_receiver: Receiver<SharedPackets>,
packet_recycler: PacketRecycler, packet_recycler: PacketRecycler,
blob_recycler: BlobRecycler, blob_recycler: BlobRecycler,
) -> Self { ) -> (Self, BlobReceiver) {
let request_processor = Arc::new(request_processor); let request_processor = Arc::new(request_processor);
let request_processor_ = request_processor.clone(); let request_processor_ = request_processor.clone();
let (blob_sender, blob_receiver) = channel(); let (blob_sender, blob_receiver) = channel();
@ -106,10 +105,12 @@ impl RequestStage {
} }
}) })
.unwrap(); .unwrap();
RequestStage { (
thread_hdl, RequestStage {
thread_hdl,
request_processor,
},
blob_receiver, blob_receiver,
request_processor, )
}
} }
} }

View File

@ -56,7 +56,7 @@ impl Rpu {
let blob_recycler = BlobRecycler::default(); let blob_recycler = BlobRecycler::default();
let request_processor = RequestProcessor::new(bank.clone()); let request_processor = RequestProcessor::new(bank.clone());
let request_stage = RequestStage::new( let (request_stage, blob_receiver) = RequestStage::new(
request_processor, request_processor,
exit.clone(), exit.clone(),
packet_receiver, packet_receiver,
@ -68,7 +68,7 @@ impl Rpu {
respond_socket, respond_socket,
exit.clone(), exit.clone(),
blob_recycler.clone(), blob_recycler.clone(),
request_stage.blob_receiver, blob_receiver,
); );
let thread_hdls = vec![t_receiver, t_responder, request_stage.thread_hdl]; let thread_hdls = vec![t_receiver, t_responder, request_stage.thread_hdl];

View File

@ -63,7 +63,7 @@ impl Server {
thread_hdls.extend(rpu.thread_hdls); thread_hdls.extend(rpu.thread_hdls);
let blob_recycler = BlobRecycler::default(); let blob_recycler = BlobRecycler::default();
let tpu = Tpu::new( let (tpu, blob_receiver) = Tpu::new(
bank.clone(), bank.clone(),
tick_duration, tick_duration,
transactions_socket, transactions_socket,
@ -92,7 +92,7 @@ impl Server {
window, window,
entry_height, entry_height,
blob_recycler.clone(), blob_recycler.clone(),
tpu.blob_receiver, blob_receiver,
); );
thread_hdls.extend(vec![t_broadcast]); thread_hdls.extend(vec![t_broadcast]);

View File

@ -18,18 +18,17 @@ use streamer::{self, PacketReceiver};
use timing; use timing;
pub struct SigVerifyStage { pub struct SigVerifyStage {
pub verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>,
pub thread_hdls: Vec<JoinHandle<()>>, pub thread_hdls: Vec<JoinHandle<()>>,
} }
impl SigVerifyStage { impl SigVerifyStage {
pub fn new(exit: Arc<AtomicBool>, packet_receiver: Receiver<SharedPackets>) -> Self { pub fn new(
exit: Arc<AtomicBool>,
packet_receiver: Receiver<SharedPackets>,
) -> (Self, Receiver<Vec<(SharedPackets, Vec<u8>)>>) {
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let thread_hdls = Self::verifier_services(exit, packet_receiver, verified_sender); let thread_hdls = Self::verifier_services(exit, packet_receiver, verified_sender);
SigVerifyStage { (SigVerifyStage { thread_hdls }, verified_receiver)
thread_hdls,
verified_receiver,
}
} }
fn verify_batch(batch: Vec<SharedPackets>) -> Vec<(SharedPackets, Vec<u8>)> { fn verify_batch(batch: Vec<SharedPackets>) -> Vec<(SharedPackets, Vec<u8>)> {

View File

@ -41,7 +41,6 @@ use streamer::BlobReceiver;
use write_stage::WriteStage; use write_stage::WriteStage;
pub struct Tpu { pub struct Tpu {
pub blob_receiver: BlobReceiver,
pub thread_hdls: Vec<JoinHandle<()>>, pub thread_hdls: Vec<JoinHandle<()>>,
} }
@ -53,36 +52,35 @@ impl Tpu {
blob_recycler: BlobRecycler, blob_recycler: BlobRecycler,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
writer: W, writer: W,
) -> Self { ) -> (Self, BlobReceiver) {
let packet_recycler = PacketRecycler::default(); let packet_recycler = PacketRecycler::default();
let fetch_stage = let (fetch_stage, packet_receiver) =
FetchStage::new(transactions_socket, exit.clone(), packet_recycler.clone()); FetchStage::new(transactions_socket, exit.clone(), packet_recycler.clone());
let sigverify_stage = SigVerifyStage::new(exit.clone(), fetch_stage.packet_receiver); let (sigverify_stage, verified_receiver) =
SigVerifyStage::new(exit.clone(), packet_receiver);
let banking_stage = BankingStage::new( let (banking_stage, signal_receiver) = BankingStage::new(
bank.clone(), bank.clone(),
exit.clone(), exit.clone(),
sigverify_stage.verified_receiver, verified_receiver,
packet_recycler.clone(), packet_recycler.clone(),
); );
let record_stage = match tick_duration { let (record_stage, entry_receiver) = match tick_duration {
Some(tick_duration) => RecordStage::new_with_clock( Some(tick_duration) => {
banking_stage.signal_receiver, RecordStage::new_with_clock(signal_receiver, &bank.last_id(), tick_duration)
&bank.last_id(), }
tick_duration, None => RecordStage::new(signal_receiver, &bank.last_id()),
),
None => RecordStage::new(banking_stage.signal_receiver, &bank.last_id()),
}; };
let write_stage = WriteStage::new( let (write_stage, blob_receiver) = WriteStage::new(
bank.clone(), bank.clone(),
exit.clone(), exit.clone(),
blob_recycler.clone(), blob_recycler.clone(),
writer, writer,
record_stage.entry_receiver, entry_receiver,
); );
let mut thread_hdls = vec![ let mut thread_hdls = vec![
banking_stage.thread_hdl, banking_stage.thread_hdl,
@ -91,9 +89,6 @@ impl Tpu {
]; ];
thread_hdls.extend(fetch_stage.thread_hdls.into_iter()); thread_hdls.extend(fetch_stage.thread_hdls.into_iter());
thread_hdls.extend(sigverify_stage.thread_hdls.into_iter()); thread_hdls.extend(sigverify_stage.thread_hdls.into_iter());
Tpu { (Tpu { thread_hdls }, blob_receiver)
blob_receiver: write_stage.blob_receiver,
thread_hdls,
}
} }
} }

View File

@ -73,7 +73,7 @@ impl Tvu {
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> Self { ) -> Self {
let blob_recycler = BlobRecycler::default(); let blob_recycler = BlobRecycler::default();
let fetch_stage = BlobFetchStage::new_multi_socket( let (fetch_stage, blob_receiver) = BlobFetchStage::new_multi_socket(
vec![replicate_socket, repair_socket], vec![replicate_socket, repair_socket],
exit.clone(), exit.clone(),
blob_recycler.clone(), blob_recycler.clone(),
@ -81,17 +81,17 @@ impl Tvu {
//TODO //TODO
//the packets coming out of blob_receiver need to be sent to the GPU and verified //the packets coming out of blob_receiver need to be sent to the GPU and verified
//then sent to the window, which does the erasure coding reconstruction //then sent to the window, which does the erasure coding reconstruction
let window_stage = WindowStage::new( let (window_stage, blob_receiver) = WindowStage::new(
crdt, crdt,
window, window,
entry_height, entry_height,
retransmit_socket, retransmit_socket,
exit.clone(), exit.clone(),
blob_recycler.clone(), blob_recycler.clone(),
fetch_stage.blob_receiver, blob_receiver,
); );
let replicate_stage = ReplicateStage::new(bank, exit, window_stage.blob_receiver); let replicate_stage = ReplicateStage::new(bank, exit, blob_receiver);
let mut threads = vec![replicate_stage.thread_hdl]; let mut threads = vec![replicate_stage.thread_hdl];
threads.extend(fetch_stage.thread_hdls.into_iter()); threads.extend(fetch_stage.thread_hdls.into_iter());

View File

@ -10,7 +10,6 @@ use std::thread::JoinHandle;
use streamer::{self, BlobReceiver, Window}; use streamer::{self, BlobReceiver, Window};
pub struct WindowStage { pub struct WindowStage {
pub blob_receiver: BlobReceiver,
pub thread_hdls: Vec<JoinHandle<()>>, pub thread_hdls: Vec<JoinHandle<()>>,
} }
@ -23,7 +22,7 @@ impl WindowStage {
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
blob_recycler: BlobRecycler, blob_recycler: BlobRecycler,
fetch_stage_receiver: BlobReceiver, fetch_stage_receiver: BlobReceiver,
) -> Self { ) -> (Self, BlobReceiver) {
let (retransmit_sender, retransmit_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel();
let t_retransmit = streamer::retransmitter( let t_retransmit = streamer::retransmitter(
@ -46,9 +45,6 @@ impl WindowStage {
); );
let thread_hdls = vec![t_retransmit, t_window]; let thread_hdls = vec![t_retransmit, t_window];
WindowStage { (WindowStage { thread_hdls }, blob_receiver)
blob_receiver,
thread_hdls,
}
} }
} }

View File

@ -19,7 +19,6 @@ use streamer::{BlobReceiver, BlobSender};
pub struct WriteStage { pub struct WriteStage {
pub thread_hdl: JoinHandle<()>, pub thread_hdl: JoinHandle<()>,
pub blob_receiver: BlobReceiver,
} }
impl WriteStage { impl WriteStage {
@ -50,7 +49,7 @@ impl WriteStage {
blob_recycler: BlobRecycler, blob_recycler: BlobRecycler,
writer: W, writer: W,
entry_receiver: Receiver<Vec<Entry>>, entry_receiver: Receiver<Vec<Entry>>,
) -> Self { ) -> (Self, BlobReceiver) {
let (blob_sender, blob_receiver) = channel(); let (blob_sender, blob_receiver) = channel();
let thread_hdl = Builder::new() let thread_hdl = Builder::new()
.name("solana-writer".to_string()) .name("solana-writer".to_string())
@ -71,9 +70,6 @@ impl WriteStage {
}) })
.unwrap(); .unwrap();
WriteStage { (WriteStage { thread_hdl }, blob_receiver)
thread_hdl,
blob_receiver,
}
} }
} }