Move write_service and drain_service into new write_stage module

This commit is contained in:
Greg Fitzgerald 2018-05-14 16:27:40 -06:00
parent a604dcb4c4
commit 81706f2d75
4 changed files with 84 additions and 56 deletions

View File

@ -27,6 +27,7 @@ pub mod thin_client;
pub mod timing; pub mod timing;
pub mod transaction; pub mod transaction;
pub mod tvu; pub mod tvu;
pub mod write_stage;
extern crate bincode; extern crate bincode;
extern crate byteorder; extern crate byteorder;
extern crate chrono; extern crate chrono;

View File

@ -3,8 +3,6 @@
use bank::Bank; use bank::Bank;
use crdt::{Crdt, ReplicatedData}; use crdt::{Crdt, ReplicatedData};
use entry::Entry;
use entry_writer::EntryWriter;
use hash::Hash; use hash::Hash;
use packet; use packet;
use record_stage::RecordStage; use record_stage::RecordStage;
@ -14,12 +12,13 @@ use result::Result;
use sig_verify_stage::SigVerifyStage; use sig_verify_stage::SigVerifyStage;
use std::io::Write; use std::io::Write;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::AtomicBool;
use std::sync::mpsc::{channel, Receiver}; use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::thread::{spawn, JoinHandle}; use std::thread::JoinHandle;
use std::time::Duration; use std::time::Duration;
use streamer; use streamer;
use write_stage::WriteStage;
pub struct Rpu { pub struct Rpu {
bank: Arc<Bank>, bank: Arc<Bank>,
@ -37,29 +36,6 @@ impl Rpu {
} }
} }
fn write_service<W: Write + Send + 'static>(
bank: Arc<Bank>,
exit: Arc<AtomicBool>,
broadcast: streamer::BlobSender,
blob_recycler: packet::BlobRecycler,
writer: Mutex<W>,
entry_receiver: Receiver<Entry>,
) -> JoinHandle<()> {
spawn(move || loop {
let entry_writer = EntryWriter::new(&bank);
let _ = entry_writer.write_and_send_entries(
&broadcast,
&blob_recycler,
&writer,
&entry_receiver,
);
if exit.load(Ordering::Relaxed) {
info!("broadcat_service exiting");
break;
}
})
}
/// Create a UDP microservice that forwards messages the given Rpu. /// Create a UDP microservice that forwards messages the given Rpu.
/// This service is the network leader /// This service is the network leader
/// Set `exit` to shutdown its threads. /// Set `exit` to shutdown its threads.
@ -106,11 +82,9 @@ impl Rpu {
self.tick_duration, self.tick_duration,
); );
let (broadcast_sender, broadcast_receiver) = channel(); let write_stage = WriteStage::new(
let t_write = Self::write_service(
self.bank.clone(), self.bank.clone(),
exit.clone(), exit.clone(),
broadcast_sender,
blob_recycler.clone(), blob_recycler.clone(),
Mutex::new(writer), Mutex::new(writer),
record_stage.entry_receiver, record_stage.entry_receiver,
@ -122,7 +96,7 @@ impl Rpu {
exit.clone(), exit.clone(),
crdt.clone(), crdt.clone(),
blob_recycler.clone(), blob_recycler.clone(),
broadcast_receiver, write_stage.blob_receiver,
); );
let respond_socket = UdpSocket::bind(local.clone())?; let respond_socket = UdpSocket::bind(local.clone())?;
@ -137,7 +111,7 @@ impl Rpu {
t_receiver, t_receiver,
t_responder, t_responder,
request_stage.thread_hdl, request_stage.thread_hdl,
t_write, write_stage.thread_hdl,
t_gossip, t_gossip,
t_listen, t_listen,
t_broadcast, t_broadcast,

View File

@ -3,8 +3,6 @@
use bank::Bank; use bank::Bank;
use crdt::{Crdt, ReplicatedData}; use crdt::{Crdt, ReplicatedData};
use entry::Entry;
use entry_writer::EntryWriter;
use hash::Hash; use hash::Hash;
use ledger; use ledger;
use packet; use packet;
@ -15,11 +13,12 @@ use result::Result;
use sig_verify_stage::SigVerifyStage; use sig_verify_stage::SigVerifyStage;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver}; use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use std::time::Duration; use std::time::Duration;
use streamer; use streamer;
use write_stage::WriteStage;
pub struct Tvu { pub struct Tvu {
bank: Arc<Bank>, bank: Arc<Bank>,
@ -37,23 +36,6 @@ impl Tvu {
} }
} }
fn drain_service(
bank: Arc<Bank>,
exit: Arc<AtomicBool>,
entry_receiver: Receiver<Entry>,
) -> JoinHandle<()> {
spawn(move || {
let entry_writer = EntryWriter::new(&bank);
loop {
let _ = entry_writer.drain_entries(&entry_receiver);
if exit.load(Ordering::Relaxed) {
info!("drain_service exiting");
break;
}
}
})
}
/// Process verified blobs, already in order /// Process verified blobs, already in order
/// Respond with a signed hash of the state /// Respond with a signed hash of the state
fn replicate_state( fn replicate_state(
@ -188,8 +170,8 @@ impl Tvu {
obj.tick_duration, obj.tick_duration,
); );
let t_write = let write_stage =
Self::drain_service(obj.bank.clone(), exit.clone(), record_stage.entry_receiver); WriteStage::new_drain(obj.bank.clone(), exit.clone(), record_stage.entry_receiver);
let t_responder = streamer::responder( let t_responder = streamer::responder(
respond_socket, respond_socket,
@ -210,7 +192,7 @@ impl Tvu {
t_packet_receiver, t_packet_receiver,
t_responder, t_responder,
request_stage.thread_hdl, request_stage.thread_hdl,
t_write, write_stage.thread_hdl,
]; ];
threads.extend(sig_verify_stage.thread_hdls.into_iter()); threads.extend(sig_verify_stage.thread_hdls.into_iter());
Ok(threads) Ok(threads)

71
src/write_stage.rs Normal file
View File

@ -0,0 +1,71 @@
//! The `write_stage` module implements write stage of the RPU.
use bank::Bank;
use entry::Entry;
use entry_writer::EntryWriter;
use packet;
use std::io::Write;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, Mutex};
use std::thread::{spawn, JoinHandle};
use streamer;
pub struct WriteStage {
pub thread_hdl: JoinHandle<()>,
pub blob_receiver: streamer::BlobReceiver,
}
impl WriteStage {
/// Create a new Rpu that wraps the given Bank.
pub fn new<W: Write + Send + 'static>(
bank: Arc<Bank>,
exit: Arc<AtomicBool>,
blob_recycler: packet::BlobRecycler,
writer: Mutex<W>,
entry_receiver: Receiver<Entry>,
) -> Self {
let (blob_sender, blob_receiver) = channel();
let thread_hdl = spawn(move || loop {
let entry_writer = EntryWriter::new(&bank);
let _ = entry_writer.write_and_send_entries(
&blob_sender,
&blob_recycler,
&writer,
&entry_receiver,
);
if exit.load(Ordering::Relaxed) {
info!("broadcat_service exiting");
break;
}
});
WriteStage {
thread_hdl,
blob_receiver,
}
}
pub fn new_drain(
bank: Arc<Bank>,
exit: Arc<AtomicBool>,
entry_receiver: Receiver<Entry>,
) -> Self {
let (_blob_sender, blob_receiver) = channel();
let thread_hdl = spawn(move || {
let entry_writer = EntryWriter::new(&bank);
loop {
let _ = entry_writer.drain_entries(&entry_receiver);
if exit.load(Ordering::Relaxed) {
info!("drain_service exiting");
break;
}
}
});
WriteStage {
thread_hdl,
blob_receiver,
}
}
}